diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/dag_defs.py b/examples/experimental/dagster-airlift/dagster_airlift/core/dag_defs.py index 53c142c63d486..955161aced9af 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/dag_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/dag_defs.py @@ -1,47 +1,30 @@ -from typing import Dict, Mapping, Sequence, Union +from typing import Dict, Mapping from dagster import AssetsDefinition, AssetSpec, Definitions -from typing_extensions import TypeAlias from dagster_airlift.core.utils import DAG_ID_TAG, TASK_ID_TAG -CoercibleToDefs: TypeAlias = Union[AssetsDefinition, AssetSpec, Definitions] - class TaskDefs: - def __init__(self, task_id: str, defs_list: Sequence[CoercibleToDefs]): + def __init__(self, task_id: str, defs: Definitions): self.task_id = task_id - self.defs_list = defs_list - - -def apply_tags_to_all_specs( - defs_list: Sequence[CoercibleToDefs], tags: Dict[str, str] -) -> Definitions: - new_defs = [] - new_assets_defs = [] - new_specs = [] - for def_ish in defs_list: - if isinstance(def_ish, AssetSpec): - new_specs.append(spec_with_tags(def_ish, tags)) - elif isinstance(def_ish, AssetsDefinition): - new_assets_defs.append(assets_def_with_af_tags(def_ish, tags)) - else: - more_new_assets_defs = [] - for assets_def in def_ish.get_asset_graph().assets_defs: - more_new_assets_defs.append(assets_def_with_af_tags(assets_def, tags)) - new_defs.append( - Definitions( - assets=more_new_assets_defs, - resources=def_ish.resources, - sensors=def_ish.sensors, - schedules=def_ish.schedules, - jobs=def_ish.jobs, - loggers=def_ish.loggers, - executor=def_ish.executor, - asset_checks=def_ish.asset_checks, - ) - ) - return Definitions.merge(Definitions(assets=new_specs + new_assets_defs), *new_defs) + self.defs = defs + + +def apply_tags_to_all_specs(defs: Definitions, tags: Dict[str, str]) -> Definitions: + return Definitions( + assets=[ + assets_def_with_af_tags(assets_def, tags) + for assets_def in defs.get_asset_graph().assets_defs + ], + resources=defs.resources, + sensors=defs.sensors, + schedules=defs.schedules, + jobs=defs.jobs, + loggers=defs.loggers, + executor=defs.executor, + asset_checks=defs.asset_checks, + ) def spec_with_tags(spec: AssetSpec, tags: Mapping[str, str]) -> "AssetSpec": @@ -68,22 +51,22 @@ def dag_defs(dag_id: str, *defs: TaskDefs) -> Definitions: .. code-block:: python defs = dag_defs( "dag_one", - task_defs("task_one", AssetSpec(key="asset_one")), - task_defs("task_two", AssetSpec(key="asset_two"), AssetSpec(key="asset_three")), + task_defs("task_one", Definitions(assets=[AssetSpec(key="asset_one"]))), + task_defs("task_two", Definitions(assets=[AssetSpec(key="asset_two"), AssetSpec(key="asset_three")])), ) """ defs_to_merge = [] for task_def in defs: defs_to_merge.append( apply_tags_to_all_specs( - task_def.defs_list, + defs=task_def.defs, tags={DAG_ID_TAG: dag_id, TASK_ID_TAG: task_def.task_id}, ) ) return Definitions.merge(*defs_to_merge) -def task_defs(task_id, *defs: Union[AssetsDefinition, Definitions, AssetSpec]) -> TaskDefs: +def task_defs(task_id, defs: Definitions) -> TaskDefs: """Associate a set of definitions with a particular task in Airflow that is being tracked by Airlift tooling. """ diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_dag_defs.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_dag_defs.py index a4a49bc0a20e8..80c51b732943f 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_dag_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_dag_defs.py @@ -4,6 +4,10 @@ from dagster_airlift.core.utils import DAG_ID_TAG, TASK_ID_TAG +def from_specs(*specs: AssetSpec) -> Definitions: + return Definitions(assets=specs) + + def asset_spec(defs: Definitions, key: CoercibleToAssetKey) -> AssetSpec: ak = AssetKey.from_coercible(key) return defs.get_assets_def(ak).get_asset_spec(ak) @@ -12,7 +16,7 @@ def asset_spec(defs: Definitions, key: CoercibleToAssetKey) -> AssetSpec: def test_dag_def_spec() -> None: defs = dag_defs( "dag_one", - task_defs("task_one", AssetSpec(key="asset_one")), + task_defs("task_one", from_specs(AssetSpec(key="asset_one"))), ) assert asset_spec(defs, "asset_one").tags[DAG_ID_TAG] == "dag_one" assert asset_spec(defs, "asset_one").tags[TASK_ID_TAG] == "task_one" @@ -21,8 +25,8 @@ def test_dag_def_spec() -> None: def test_dag_def_multi_tasks_multi_specs() -> None: defs = dag_defs( "dag_one", - task_defs("task_one", AssetSpec(key="asset_one")), - task_defs("task_two", AssetSpec(key="asset_two"), AssetSpec(key="asset_three")), + task_defs("task_one", from_specs(AssetSpec(key="asset_one"))), + task_defs("task_two", from_specs(AssetSpec(key="asset_two"), AssetSpec(key="asset_three"))), ) assert asset_spec(defs, "asset_one").tags[DAG_ID_TAG] == "dag_one" assert asset_spec(defs, "asset_one").tags[TASK_ID_TAG] == "task_one" @@ -38,7 +42,7 @@ def an_asset() -> None: ... defs = dag_defs( "dag_one", - task_defs("task_one", an_asset), + task_defs("task_one", Definitions([an_asset])), ) assert asset_spec(defs, "asset_one").tags[DAG_ID_TAG] == "dag_one" assert asset_spec(defs, "asset_one").tags[TASK_ID_TAG] == "task_one" diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/observe.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/observe.py index bfda6e11742b9..92ca102ad9924 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/observe.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/observe.py @@ -52,20 +52,13 @@ def freshness_defs() -> Definitions: defs=Definitions.merge( dag_defs( "load_lakehouse", - task_defs( - "load_iris", - *specs_from_lakehouse( - csv_path=CSV_PATH, - ), - ), + task_defs("load_iris", Definitions(assets=specs_from_lakehouse(csv_path=CSV_PATH))), ), dag_defs( "dbt_dag", task_defs( "build_dbt_models", - *build_dbt_asset_specs( - manifest=dbt_manifest_path(), - ), + Definitions(assets=build_dbt_asset_specs(manifest=dbt_manifest_path())), ), ), lakehouse_existence_check_defs( diff --git a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/observe.py b/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/observe.py index eba46285dc362..cd1093902e8b1 100644 --- a/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/observe.py +++ b/examples/experimental/dagster-airlift/examples/simple-migration/simple_migration/dagster_defs/observe.py @@ -21,8 +21,8 @@ airflow_instance=airflow_instance, defs=dag_defs( "simple", - task_defs("t1", Definitions([a1])), - task_defs("t2", Definitions([a2, a3])), - task_defs("t3", Definitions([a4])), + task_defs("t1", Definitions(assets=[a1])), + task_defs("t2", Definitions(assets=[a2, a3])), + task_defs("t3", Definitions(assets=[a4])), ), )