diff --git a/docs/content/api/modules.json.gz b/docs/content/api/modules.json.gz index 61fd3bb3ca2e4..0e6e3156f7d60 100644 Binary files a/docs/content/api/modules.json.gz and b/docs/content/api/modules.json.gz differ diff --git a/docs/content/api/searchindex.json.gz b/docs/content/api/searchindex.json.gz index 5d1768af29bbd..4b8e3d2c268af 100644 Binary files a/docs/content/api/searchindex.json.gz and b/docs/content/api/searchindex.json.gz differ diff --git a/docs/content/api/sections.json.gz b/docs/content/api/sections.json.gz index 255819e998ecb..4b922c6dff464 100644 Binary files a/docs/content/api/sections.json.gz and b/docs/content/api/sections.json.gz differ diff --git a/docs/next/public/objects.inv b/docs/next/public/objects.inv index f2d55b4609921..dcf6a751e8f2e 100644 Binary files a/docs/next/public/objects.inv and b/docs/next/public/objects.inv differ diff --git a/docs/sphinx/sections/api/apidocs/assets.rst b/docs/sphinx/sections/api/apidocs/assets.rst index 7f2bf40a642b3..70cb6f96bc4cf 100644 --- a/docs/sphinx/sections/api/apidocs/assets.rst +++ b/docs/sphinx/sections/api/apidocs/assets.rst @@ -20,6 +20,8 @@ Refer to the `Asset definitions Sequence[AssetSpec]: ... + + +@overload +def map_asset_specs( + func: Callable[[AssetSpec], AssetSpec], iterable: Iterable["AssetsDefinition"] +) -> Sequence["AssetsDefinition"]: ... + + +@overload +def map_asset_specs( + func: Callable[[AssetSpec], AssetSpec], iterable: Iterable[Union["AssetsDefinition", AssetSpec]] +) -> Sequence[Union["AssetsDefinition", AssetSpec]]: ... + + +def map_asset_specs( + func: Callable[[AssetSpec], AssetSpec], iterable: Iterable[Union["AssetsDefinition", AssetSpec]] +) -> Sequence[Union["AssetsDefinition", AssetSpec]]: + """Map a function over a sequence of AssetSpecs or AssetsDefinitions, replacing specs in the sequence + or specs in an AssetsDefinitions with the result of the function. + + Args: + func (Callable[[AssetSpec], AssetSpec]): The function to apply to each AssetSpec. + iterable (Iterable[Union[AssetsDefinition, AssetSpec]]): The sequence of AssetSpecs or AssetsDefinitions. + + Returns: + Sequence[Union[AssetsDefinition, AssetSpec]]: A sequence of AssetSpecs or AssetsDefinitions with the function applied + to each spec. + + Examples: + .. code-block:: python + + from dagster import AssetSpec, map_asset_specs + + asset_specs = [ + AssetSpec(key="my_asset"), + AssetSpec(key="my_asset_2"), + ] + + mapped_specs = map_asset_specs(lambda spec: spec.replace_attributes(owners=["nelson@hooli.com"]), asset_specs) + + """ + from dagster._core.definitions.assets import AssetsDefinition + + return [ + obj.map_asset_specs(func) if isinstance(obj, AssetsDefinition) else func(obj) + for obj in iterable + ] diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_asset_spec.py b/python_modules/dagster/dagster_tests/definitions_tests/test_asset_spec.py index 71d036e590db9..ddbe373d04b22 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_asset_spec.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_asset_spec.py @@ -1,7 +1,11 @@ +from typing import cast + +import dagster as dg import pytest from dagster import AssetSpec, AutoMaterializePolicy, AutomationCondition from dagster._core.definitions.asset_dep import AssetDep from dagster._core.definitions.asset_key import AssetKey +from dagster._core.definitions.assets import AssetsDefinition from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError @@ -142,3 +146,87 @@ def test_merge_attributes_deps() -> None: spec_new_dep = new_spec.merge_attributes(deps={AssetKey("baz")}) assert spec_new_dep.deps == [AssetDep(AssetKey("bar")), AssetDep(AssetKey("baz"))] + + +def test_map_asset_specs_basic_specs() -> None: + specs = [ + AssetSpec(key="foo"), + AssetSpec(key="bar"), + ] + + mapped_specs = dg.map_asset_specs( + lambda spec: spec.replace_attributes(owners=["ben@dagsterlabs.com"]), specs + ) + + assert all(spec.owners == ["ben@dagsterlabs.com"] for spec in mapped_specs) + + +def test_map_asset_specs_basic_defs() -> None: + @dg.asset + def my_asset(): + pass + + @dg.asset + def my_other_asset(): + pass + + assets = [my_asset, my_other_asset] + + mapped_assets = dg.map_asset_specs( + lambda spec: spec.replace_attributes(owners=["ben@dagsterlabs.com"]), assets + ) + + assert all( + spec.owners == ["ben@dagsterlabs.com"] for asset in mapped_assets for spec in asset.specs + ) + + +def test_map_asset_specs_mixed_specs_defs() -> None: + @dg.asset + def my_asset(): + pass + + spec_and_defs = [ + my_asset, + AssetSpec(key="bar"), + ] + + mapped_specs_and_defs = dg.map_asset_specs( + lambda spec: spec.replace_attributes(owners=["ben@dagsterlabs.com"]), spec_and_defs + ) + + assert all( + spec.owners == ["ben@dagsterlabs.com"] + for spec in cast(AssetsDefinition, mapped_specs_and_defs[0]).specs + ) + assert cast(AssetSpec, mapped_specs_and_defs[1]).owners == ["ben@dagsterlabs.com"] + + +def test_map_asset_specs_multi_asset() -> None: + @dg.multi_asset( + specs=[ + AssetSpec(key="foo"), + AssetSpec(key="bar"), + ] + ) + def my_multi_asset(): + pass + + @dg.multi_asset( + specs=[ + AssetSpec(key="baz"), + AssetSpec(key="qux"), + ] + ) + def my_other_multi_asset(): + pass + + assets = [my_multi_asset, my_other_multi_asset] + + mapped_assets = dg.map_asset_specs( + lambda spec: spec.replace_attributes(owners=["ben@dagsterlabs.com"]), assets + ) + + assert all( + spec.owners == ["ben@dagsterlabs.com"] for asset in mapped_assets for spec in asset.specs + )