diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py b/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py index d1f0b6a36a7a0..2e7d412532e9b 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py @@ -13,6 +13,7 @@ build_airflow_polling_sensor_defs as build_airflow_polling_sensor_defs, ) from .top_level_dag_def_api import ( + assets_with_dag_mappings as assets_with_dag_mappings, assets_with_task_mappings as assets_with_task_mappings, dag_defs as dag_defs, task_defs as task_defs, diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/top_level_dag_def_api.py b/examples/experimental/dagster-airlift/dagster_airlift/core/top_level_dag_def_api.py index 0283b64bcf93d..edf645b96857f 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/top_level_dag_def_api.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/top_level_dag_def_api.py @@ -7,7 +7,7 @@ _check as check, ) -from dagster_airlift.core.utils import metadata_for_task_mapping +from dagster_airlift.core.utils import metadata_for_dag_mapping, metadata_for_task_mapping class TaskDefs: @@ -74,7 +74,7 @@ def assets_with_task_mappings( Concretely this adds metadata to all asset specs in the provided definitions with the provided dag_id and task_id. The dag_id comes from the dag_id argument; the task_id comes from the key of the provided task_mappings dictionary. - There is a single metadata key "airlift/task_mapping" that is used to store + There is a single metadata key "airlift/task-mapping" that is used to store this information. It is a list of dictionaries with keys "dag_id" and "task_id". Example: @@ -106,6 +106,51 @@ def asset_one() -> None: ... return assets_list +def assets_with_dag_mappings( + dag_mappings: Mapping[str, Iterable[Union[AssetsDefinition, AssetSpec]]], +) -> Sequence[Union[AssetsDefinition, AssetSpec]]: + """Modify assets to be associated with a particular dag in Airlift tooling. + + Used in concert with `build_defs_from_airflow_instance` to observe an airflow + instance to monitor the dags that are associated with the assets and + keep their materialization histories up to date. + + In contrast with `assets_with_task_mappings`, which maps assets on a per-task basis, this is used in concert with + `proxying_to_dagster` dag-level mappings where an entire dag is migrated at once. + + Concretely this adds metadata to all asset specs in the provided definitions + with the provided dag_id. The dag_id comes from the key of the provided dag_mappings dictionary. + There is a single metadata key "airlift/dag-mapping" that is used to store + this information. It is a list of strings, where each string is a dag_id which the asset is associated with. + + Example: + .. code-block:: python + from dagster import AssetSpec, Definitions, asset + from dagster_airlift.core import assets_with_dag_mappings + + @asset + def asset_one() -> None: ... + + defs = Definitions( + assets=assets_with_dag_mappings( + dag_mappings={ + "dag_one": [asset_one], + "dag_two": [AssetSpec(key="asset_two"), AssetSpec(key="asset_three")], + }, + ) + ) + """ + assets_list = [] + for dag_id, assets in dag_mappings.items(): + assets_list.extend( + apply_metadata_to_assets( + assets, + metadata_for_dag_mapping(dag_id=dag_id), + ) + ) + return assets_list + + def dag_defs(dag_id: str, *defs: TaskDefs) -> Definitions: """Construct a Dagster :py:class:`Definitions` object with definitions associated with a particular Dag in Airflow that is being tracked by Airlift tooling. diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_dag_defs.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_dag_defs.py index 7e3ec4163d275..52ee73a3546f9 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_dag_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_dag_defs.py @@ -1,7 +1,12 @@ from dagster import AssetKey, AssetSpec, Definitions, multi_asset from dagster._core.definitions.asset_key import CoercibleToAssetKey -from dagster_airlift.constants import TASK_MAPPING_METADATA_KEY -from dagster_airlift.core import assets_with_task_mappings, dag_defs, task_defs +from dagster_airlift.constants import DAG_MAPPING_METADATA_KEY, TASK_MAPPING_METADATA_KEY +from dagster_airlift.core import ( + assets_with_dag_mappings, + assets_with_task_mappings, + dag_defs, + task_defs, +) def from_specs(*specs: AssetSpec) -> Definitions: @@ -19,6 +24,12 @@ def has_single_task_handle(spec: AssetSpec, dag_id: str, task_id: str) -> bool: return task_handle_dict["dag_id"] == dag_id and task_handle_dict["task_id"] == task_id +def has_single_dag_handle(spec: AssetSpec, dag_id: str) -> bool: + assert len(spec.metadata[DAG_MAPPING_METADATA_KEY]) == 1 + mapping = next(iter(spec.metadata[DAG_MAPPING_METADATA_KEY])) + return mapping == {"dag_id": dag_id} + + def test_dag_def_spec() -> None: defs = dag_defs( "dag_one", @@ -87,3 +98,15 @@ def an_asset() -> None: ... ) ) assert has_single_task_handle(asset_spec(defs, "asset_one"), "dag_one", "task_one") + + +def test_dag_mappings_assets_def() -> None: + @multi_asset(specs=[AssetSpec(key="asset_one")]) + def an_asset() -> None: ... + + defs = Definitions( + assets=assets_with_dag_mappings( + {"dag_one": [an_asset]}, + ) + ) + assert has_single_dag_handle(asset_spec(defs, "asset_one"), "dag_one")