Skip to content

Commit

Permalink
Change task_defs to only accept Definitions (#23934)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Change `task_defs` to only accept a single `Definitions` argument, to
mirror `dag_defs`. This is more consistent and simplifies the
implementation.

## How I Tested These Changes

BK

## Changelog [New | Bug | Docs]

NOCHANGELOG
  • Loading branch information
schrockn authored Aug 26, 2024
1 parent 3f6617a commit 5798cb5
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,47 +1,30 @@
from typing import Dict, Mapping, Sequence, Union
from typing import Dict, Mapping

from dagster import AssetsDefinition, AssetSpec, Definitions
from typing_extensions import TypeAlias

from dagster_airlift.core.utils import DAG_ID_TAG, TASK_ID_TAG

CoercibleToDefs: TypeAlias = Union[AssetsDefinition, AssetSpec, Definitions]


class TaskDefs:
def __init__(self, task_id: str, defs_list: Sequence[CoercibleToDefs]):
def __init__(self, task_id: str, defs: Definitions):
self.task_id = task_id
self.defs_list = defs_list


def apply_tags_to_all_specs(
defs_list: Sequence[CoercibleToDefs], tags: Dict[str, str]
) -> Definitions:
new_defs = []
new_assets_defs = []
new_specs = []
for def_ish in defs_list:
if isinstance(def_ish, AssetSpec):
new_specs.append(spec_with_tags(def_ish, tags))
elif isinstance(def_ish, AssetsDefinition):
new_assets_defs.append(assets_def_with_af_tags(def_ish, tags))
else:
more_new_assets_defs = []
for assets_def in def_ish.get_asset_graph().assets_defs:
more_new_assets_defs.append(assets_def_with_af_tags(assets_def, tags))
new_defs.append(
Definitions(
assets=more_new_assets_defs,
resources=def_ish.resources,
sensors=def_ish.sensors,
schedules=def_ish.schedules,
jobs=def_ish.jobs,
loggers=def_ish.loggers,
executor=def_ish.executor,
asset_checks=def_ish.asset_checks,
)
)
return Definitions.merge(Definitions(assets=new_specs + new_assets_defs), *new_defs)
self.defs = defs


def apply_tags_to_all_specs(defs: Definitions, tags: Dict[str, str]) -> Definitions:
return Definitions(
assets=[
assets_def_with_af_tags(assets_def, tags)
for assets_def in defs.get_asset_graph().assets_defs
],
resources=defs.resources,
sensors=defs.sensors,
schedules=defs.schedules,
jobs=defs.jobs,
loggers=defs.loggers,
executor=defs.executor,
asset_checks=defs.asset_checks,
)


def spec_with_tags(spec: AssetSpec, tags: Mapping[str, str]) -> "AssetSpec":
Expand All @@ -68,22 +51,22 @@ def dag_defs(dag_id: str, *defs: TaskDefs) -> Definitions:
.. code-block:: python
defs = dag_defs(
"dag_one",
task_defs("task_one", AssetSpec(key="asset_one")),
task_defs("task_two", AssetSpec(key="asset_two"), AssetSpec(key="asset_three")),
task_defs("task_one", Definitions(assets=[AssetSpec(key="asset_one"]))),
task_defs("task_two", Definitions(assets=[AssetSpec(key="asset_two"), AssetSpec(key="asset_three")])),
)
"""
defs_to_merge = []
for task_def in defs:
defs_to_merge.append(
apply_tags_to_all_specs(
task_def.defs_list,
defs=task_def.defs,
tags={DAG_ID_TAG: dag_id, TASK_ID_TAG: task_def.task_id},
)
)
return Definitions.merge(*defs_to_merge)


def task_defs(task_id, *defs: Union[AssetsDefinition, Definitions, AssetSpec]) -> TaskDefs:
def task_defs(task_id, defs: Definitions) -> TaskDefs:
"""Associate a set of definitions with a particular task in Airflow that is being tracked
by Airlift tooling.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from dagster_airlift.core.utils import DAG_ID_TAG, TASK_ID_TAG


def from_specs(*specs: AssetSpec) -> Definitions:
return Definitions(assets=specs)


def asset_spec(defs: Definitions, key: CoercibleToAssetKey) -> AssetSpec:
ak = AssetKey.from_coercible(key)
return defs.get_assets_def(ak).get_asset_spec(ak)
Expand All @@ -12,7 +16,7 @@ def asset_spec(defs: Definitions, key: CoercibleToAssetKey) -> AssetSpec:
def test_dag_def_spec() -> None:
defs = dag_defs(
"dag_one",
task_defs("task_one", AssetSpec(key="asset_one")),
task_defs("task_one", from_specs(AssetSpec(key="asset_one"))),
)
assert asset_spec(defs, "asset_one").tags[DAG_ID_TAG] == "dag_one"
assert asset_spec(defs, "asset_one").tags[TASK_ID_TAG] == "task_one"
Expand All @@ -21,8 +25,8 @@ def test_dag_def_spec() -> None:
def test_dag_def_multi_tasks_multi_specs() -> None:
defs = dag_defs(
"dag_one",
task_defs("task_one", AssetSpec(key="asset_one")),
task_defs("task_two", AssetSpec(key="asset_two"), AssetSpec(key="asset_three")),
task_defs("task_one", from_specs(AssetSpec(key="asset_one"))),
task_defs("task_two", from_specs(AssetSpec(key="asset_two"), AssetSpec(key="asset_three"))),
)
assert asset_spec(defs, "asset_one").tags[DAG_ID_TAG] == "dag_one"
assert asset_spec(defs, "asset_one").tags[TASK_ID_TAG] == "task_one"
Expand All @@ -38,7 +42,7 @@ def an_asset() -> None: ...

defs = dag_defs(
"dag_one",
task_defs("task_one", an_asset),
task_defs("task_one", Definitions([an_asset])),
)
assert asset_spec(defs, "asset_one").tags[DAG_ID_TAG] == "dag_one"
assert asset_spec(defs, "asset_one").tags[TASK_ID_TAG] == "task_one"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,13 @@ def freshness_defs() -> Definitions:
defs=Definitions.merge(
dag_defs(
"load_lakehouse",
task_defs(
"load_iris",
*specs_from_lakehouse(
csv_path=CSV_PATH,
),
),
task_defs("load_iris", Definitions(assets=specs_from_lakehouse(csv_path=CSV_PATH))),
),
dag_defs(
"dbt_dag",
task_defs(
"build_dbt_models",
*build_dbt_asset_specs(
manifest=dbt_manifest_path(),
),
Definitions(assets=build_dbt_asset_specs(manifest=dbt_manifest_path())),
),
),
lakehouse_existence_check_defs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
airflow_instance=airflow_instance,
defs=dag_defs(
"simple",
task_defs("t1", Definitions([a1])),
task_defs("t2", Definitions([a2, a3])),
task_defs("t3", Definitions([a4])),
task_defs("t1", Definitions(assets=[a1])),
task_defs("t2", Definitions(assets=[a2, a3])),
task_defs("t3", Definitions(assets=[a4])),
),
)

0 comments on commit 5798cb5

Please sign in to comment.