Skip to content

Commit

Permalink
[dagster-airlift][dag] Dag overrides top level API (#25158)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Add a top level API for adding a dag override to a list of assets
## How I Tested These Changes
Added a test for the behavior.
## Changelog
NOCHANGELOG
  • Loading branch information
dpeng817 authored Oct 14, 2024
1 parent c54bb0e commit 9cdf47a
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
build_airflow_polling_sensor_defs as build_airflow_polling_sensor_defs,
)
from .top_level_dag_def_api import (
assets_with_dag_mappings as assets_with_dag_mappings,
assets_with_task_mappings as assets_with_task_mappings,
dag_defs as dag_defs,
task_defs as task_defs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
_check as check,
)

from dagster_airlift.core.utils import metadata_for_task_mapping
from dagster_airlift.core.utils import metadata_for_dag_mapping, metadata_for_task_mapping


class TaskDefs:
Expand Down Expand Up @@ -74,7 +74,7 @@ def assets_with_task_mappings(
Concretely this adds metadata to all asset specs in the provided definitions
with the provided dag_id and task_id. The dag_id comes from the dag_id argument;
the task_id comes from the key of the provided task_mappings dictionary.
There is a single metadata key "airlift/task_mapping" that is used to store
There is a single metadata key "airlift/task-mapping" that is used to store
this information. It is a list of dictionaries with keys "dag_id" and "task_id".
Example:
Expand Down Expand Up @@ -106,6 +106,51 @@ def asset_one() -> None: ...
return assets_list


def assets_with_dag_mappings(
dag_mappings: Mapping[str, Iterable[Union[AssetsDefinition, AssetSpec]]],
) -> Sequence[Union[AssetsDefinition, AssetSpec]]:
"""Modify assets to be associated with a particular dag in Airlift tooling.
Used in concert with `build_defs_from_airflow_instance` to observe an airflow
instance to monitor the dags that are associated with the assets and
keep their materialization histories up to date.
In contrast with `assets_with_task_mappings`, which maps assets on a per-task basis, this is used in concert with
`proxying_to_dagster` dag-level mappings where an entire dag is migrated at once.
Concretely this adds metadata to all asset specs in the provided definitions
with the provided dag_id. The dag_id comes from the key of the provided dag_mappings dictionary.
There is a single metadata key "airlift/dag-mapping" that is used to store
this information. It is a list of strings, where each string is a dag_id which the asset is associated with.
Example:
.. code-block:: python
from dagster import AssetSpec, Definitions, asset
from dagster_airlift.core import assets_with_dag_mappings
@asset
def asset_one() -> None: ...
defs = Definitions(
assets=assets_with_dag_mappings(
dag_mappings={
"dag_one": [asset_one],
"dag_two": [AssetSpec(key="asset_two"), AssetSpec(key="asset_three")],
},
)
)
"""
assets_list = []
for dag_id, assets in dag_mappings.items():
assets_list.extend(
apply_metadata_to_assets(
assets,
metadata_for_dag_mapping(dag_id=dag_id),
)
)
return assets_list


def dag_defs(dag_id: str, *defs: TaskDefs) -> Definitions:
"""Construct a Dagster :py:class:`Definitions` object with definitions
associated with a particular Dag in Airflow that is being tracked by Airlift tooling.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from dagster import AssetKey, AssetSpec, Definitions, multi_asset
from dagster._core.definitions.asset_key import CoercibleToAssetKey
from dagster_airlift.constants import TASK_MAPPING_METADATA_KEY
from dagster_airlift.core import assets_with_task_mappings, dag_defs, task_defs
from dagster_airlift.constants import DAG_MAPPING_METADATA_KEY, TASK_MAPPING_METADATA_KEY
from dagster_airlift.core import (
assets_with_dag_mappings,
assets_with_task_mappings,
dag_defs,
task_defs,
)


def from_specs(*specs: AssetSpec) -> Definitions:
Expand All @@ -19,6 +24,12 @@ def has_single_task_handle(spec: AssetSpec, dag_id: str, task_id: str) -> bool:
return task_handle_dict["dag_id"] == dag_id and task_handle_dict["task_id"] == task_id


def has_single_dag_handle(spec: AssetSpec, dag_id: str) -> bool:
assert len(spec.metadata[DAG_MAPPING_METADATA_KEY]) == 1
mapping = next(iter(spec.metadata[DAG_MAPPING_METADATA_KEY]))
return mapping == {"dag_id": dag_id}


def test_dag_def_spec() -> None:
defs = dag_defs(
"dag_one",
Expand Down Expand Up @@ -87,3 +98,15 @@ def an_asset() -> None: ...
)
)
assert has_single_task_handle(asset_spec(defs, "asset_one"), "dag_one", "task_one")


def test_dag_mappings_assets_def() -> None:
@multi_asset(specs=[AssetSpec(key="asset_one")])
def an_asset() -> None: ...

defs = Definitions(
assets=assets_with_dag_mappings(
{"dag_one": [an_asset]},
)
)
assert has_single_dag_handle(asset_spec(defs, "asset_one"), "dag_one")

0 comments on commit 9cdf47a

Please sign in to comment.