Skip to content

Commit

Permalink
[dagster-airlift][dag] Remove serialized data from AirflowDefinitions…
Browse files Browse the repository at this point in the history
…Data (#25126)

## Summary & Motivation
Change remaining properties to rely on asset spec-level metadata and
delete dependency on serialized data.
## How I Tested These Changes
Existing tests
## Changelog
NOCHANGELOG
  • Loading branch information
dpeng817 authored Oct 10, 2024
1 parent 1ab8502 commit 01f23b7
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,23 +1,15 @@
from collections import defaultdict
from functools import cached_property
from typing import AbstractSet, Mapping, Set, cast
from typing import AbstractSet, Mapping, Set

from dagster import (
AssetKey,
Definitions,
_check as check,
)
from dagster import AssetKey, Definitions
from dagster._record import record
from dagster._serdes.serdes import deserialize_value

from dagster_airlift.constants import DAG_MAPPING_METADATA_KEY
from dagster_airlift.core.airflow_instance import AirflowInstance
from dagster_airlift.core.serialization.compute import AirliftMetadataMappingInfo
from dagster_airlift.core.serialization.serialized_data import (
SerializedAirflowDefinitionsData,
TaskHandle,
)
from dagster_airlift.core.utils import get_metadata_key, is_mapped_asset_spec, task_handles_for_spec
from dagster_airlift.core.serialization.serialized_data import TaskHandle
from dagster_airlift.core.utils import is_mapped_asset_spec, task_handles_for_spec


@record
Expand All @@ -36,29 +28,9 @@ def mapping_info(self) -> AirliftMetadataMappingInfo:
def task_ids_in_dag(self, dag_id: str) -> Set[str]:
return self.mapping_info.task_id_map[dag_id]

@cached_property
def serialized_data(self) -> SerializedAirflowDefinitionsData:
regular_metadata_key = get_metadata_key(self.airflow_instance.name)
automapped_metadata_key = regular_metadata_key + "/full_automapped_dags"
check.invariant(
any(
metadata_key in self.mapped_defs.metadata
for metadata_key in [regular_metadata_key, automapped_metadata_key]
),
"Expected at least one of the possible metadata keys to be present",
)
serialized_data_str = (
self.mapped_defs.metadata[regular_metadata_key].value
if regular_metadata_key in self.mapped_defs.metadata
else self.mapped_defs.metadata[automapped_metadata_key].value
)
return deserialize_value(
cast(str, serialized_data_str), as_type=SerializedAirflowDefinitionsData
)

@property
def all_dag_ids(self) -> AbstractSet[str]:
return set(self.serialized_data.dag_datas.keys())
def dag_ids_with_mapped_asset_keys(self) -> AbstractSet[str]:
return self.mapping_info.dag_ids

@cached_property
def asset_keys_per_task_handle(self) -> Mapping[TaskHandle, AbstractSet[AssetKey]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def materializations_and_requests_from_batch_iter(
airflow_data: AirflowDefinitionsData,
) -> Iterator[Optional[BatchResult]]:
runs = airflow_data.airflow_instance.get_dag_runs_batch(
dag_ids=list(airflow_data.all_dag_ids),
dag_ids=list(airflow_data.dag_ids_with_mapped_asset_keys),
end_date_gte=datetime_from_timestamp(end_date_gte),
end_date_lte=datetime_from_timestamp(end_date_lte),
offset=offset,
Expand Down

0 comments on commit 01f23b7

Please sign in to comment.