Skip to content

Commit

Permalink
Add dag_defs and task_defs to composably add tags to airlifted defint…
Browse files Browse the repository at this point in the history
…ions (#23877)

## Summary & Motivation

This adds `dag_defs` and `task_defs` in order to manage the airlift
mapping process.

e.g.

```python
    defs = dag_defs(
        "dag_one",
        task_defs("task_one", AssetSpec(key="asset_one")),
        task_defs("task_two", AssetSpec(key="asset_two"), AssetSpec(key="asset_three")),
    )
```

`task_defs` takes a list where each element can be a AssetSpec,
AssetsDefinition, or Definition. It attaches task_id and dag_id as tags
expected by Airlift.

This is designed such one a dag migration is complete, you can delete
all references to `dag_defs` and `task_defs` and use the argument to
`task_defs` to `combine_defs` without modification.

## How I Tested These Changes

BK

## Changelog [New | Bug | Docs]

NOCHANGELOG
  • Loading branch information
schrockn authored Aug 24, 2024
1 parent e70adb9 commit a7d42f6
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from ..migration_state import load_migration_state_from_yaml as load_migration_state_from_yaml
from .basic_auth import BasicAuthBackend as BasicAuthBackend
from .dag_defs import (
dag_defs as dag_defs,
task_defs as task_defs,
)
from .def_factory import (
DefsFactory as DefsFactory,
defs_from_factories as defs_from_factories,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import Dict, Mapping, Sequence, Union

from dagster import AssetsDefinition, AssetSpec, Definitions
from typing_extensions import TypeAlias

from dagster_airlift.core.defs_builders import combine_defs
from dagster_airlift.core.utils import DAG_ID_TAG, TASK_ID_TAG

CoercibleToDefs: TypeAlias = Union[AssetsDefinition, AssetSpec, Definitions]


class TaskDefs:
def __init__(self, task_id: str, defs_list: Sequence[CoercibleToDefs]):
self.task_id = task_id
self.defs_list = defs_list


def apply_tags_to_all_specs(
defs_list: Sequence[CoercibleToDefs], tags: Dict[str, str]
) -> Definitions:
new_defs = []
for def_ish in defs_list:
if isinstance(def_ish, AssetSpec):
new_defs.append(spec_with_tags(def_ish, tags))
elif isinstance(def_ish, AssetsDefinition):
new_defs.append(assets_def_with_af_tags(def_ish, tags))
else:
new_assets_defs = []
for assets_def in def_ish.get_asset_graph().assets_defs:
new_assets_defs.append(assets_def_with_af_tags(assets_def, tags))
new_defs.append(
Definitions(
assets=new_assets_defs,
resources=def_ish.resources,
sensors=def_ish.sensors,
schedules=def_ish.schedules,
jobs=def_ish.jobs,
loggers=def_ish.loggers,
executor=def_ish.executor,
asset_checks=def_ish.asset_checks,
)
)
return combine_defs(*new_defs)


def spec_with_tags(spec: AssetSpec, tags: Mapping[str, str]) -> "AssetSpec":
return spec._replace(tags={**spec.tags, **tags})


def assets_def_with_af_tags(
assets_def: AssetsDefinition, tags: Mapping[str, str]
) -> AssetsDefinition:
return assets_def.map_asset_specs(lambda spec: spec_with_tags(spec, tags))


def dag_defs(dag_id: str, *defs: TaskDefs) -> Definitions:
"""Construct a Dagster :py:class:`Definitions` object with definitions
associated with a particular Dag in Airflow that is being tracked by Airlift tooling.
Concretely this adds tags to all asset specs in the provided definitions
with the provided dag_id and task_id. Dag id is tagged with the
"airlift/dag_id" key and task id is tagged with the "airlift/task_id" key.
Used in concert with :py:func:`task_defs`.
Example:
.. code-block:: python
defs = dag_defs(
"dag_one",
task_defs("task_one", AssetSpec(key="asset_one")),
task_defs("task_two", AssetSpec(key="asset_two"), AssetSpec(key="asset_three")),
)
"""
defs_to_merge = []
for task_def in defs:
defs_to_merge.append(
apply_tags_to_all_specs(
task_def.defs_list,
tags={DAG_ID_TAG: dag_id, TASK_ID_TAG: task_def.task_id},
)
)
return Definitions.merge(*defs_to_merge)


def task_defs(task_id, *defs: Union[AssetsDefinition, Definitions, AssetSpec]) -> TaskDefs:
"""Associate a set of definitions with a particular task in Airflow that is being tracked
by Airlift tooling.
"""
return TaskDefs(task_id, defs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from dagster import AssetKey, AssetSpec, Definitions, multi_asset
from dagster._core.definitions.asset_key import CoercibleToAssetKey
from dagster_airlift.core import dag_defs, task_defs
from dagster_airlift.core.utils import DAG_ID_TAG, TASK_ID_TAG


def asset_spec(defs: Definitions, key: CoercibleToAssetKey) -> AssetSpec:
ak = AssetKey.from_coercible(key)
return defs.get_assets_def(ak).get_asset_spec(ak)


def test_dag_def_spec() -> None:
defs = dag_defs(
"dag_one",
task_defs("task_one", AssetSpec(key="asset_one")),
)
assert asset_spec(defs, "asset_one").tags[DAG_ID_TAG] == "dag_one"
assert asset_spec(defs, "asset_one").tags[TASK_ID_TAG] == "task_one"


def test_dag_def_multi_tasks_multi_specs() -> None:
defs = dag_defs(
"dag_one",
task_defs("task_one", AssetSpec(key="asset_one")),
task_defs("task_two", AssetSpec(key="asset_two"), AssetSpec(key="asset_three")),
)
assert asset_spec(defs, "asset_one").tags[DAG_ID_TAG] == "dag_one"
assert asset_spec(defs, "asset_one").tags[TASK_ID_TAG] == "task_one"
assert asset_spec(defs, "asset_two").tags[DAG_ID_TAG] == "dag_one"
assert asset_spec(defs, "asset_two").tags[TASK_ID_TAG] == "task_two"
assert asset_spec(defs, "asset_three").tags[DAG_ID_TAG] == "dag_one"
assert asset_spec(defs, "asset_three").tags[TASK_ID_TAG] == "task_two"


def test_dag_def_assets_def() -> None:
@multi_asset(specs=[AssetSpec(key="asset_one")])
def an_asset() -> None: ...

defs = dag_defs(
"dag_one",
task_defs("task_one", an_asset),
)
assert asset_spec(defs, "asset_one").tags[DAG_ID_TAG] == "dag_one"
assert asset_spec(defs, "asset_one").tags[TASK_ID_TAG] == "task_one"


def test_dag_def_defs() -> None:
@multi_asset(specs=[AssetSpec(key="asset_one")])
def an_asset() -> None: ...

defs = dag_defs(
"dag_one",
task_defs("task_one", Definitions(assets=[an_asset])),
)
assert asset_spec(defs, "asset_one").tags[DAG_ID_TAG] == "dag_one"
assert asset_spec(defs, "asset_one").tags[TASK_ID_TAG] == "task_one"

0 comments on commit a7d42f6

Please sign in to comment.