Skip to content

Commit

Permalink
Final cleanup of compute_serialized_data (#24812)
Browse files Browse the repository at this point in the history
## Summary & Motivation

I think we can call `compute_serialized_data` tamed now as it has been reduced to a couple imperative lines of code and a reasonable dict comprehension. Much easier to understand and manipulate now.

## How I Tested These Changes

BK

## Changelog

NOCHANGELOG
  • Loading branch information
schrockn authored Sep 27, 2024
1 parent d55e3b7 commit 1d3c490
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Mapping, Set
from typing import Any, Dict, Mapping, Set

from dagster import AssetKey, JsonMetadataValue, MarkdownMetadataValue
from dagster._core.definitions.metadata.metadata_value import UrlMetadataValue
Expand Down Expand Up @@ -34,7 +34,7 @@ def dag_asset_metadata(dag_info: DagInfo, source_code: str) -> Mapping[str, Any]
def get_leaf_assets_for_dag(
asset_keys_in_dag: Set[AssetKey],
downstreams_asset_dependency_graph: Dict[AssetKey, Set[AssetKey]],
) -> List[AssetKey]:
) -> Set[AssetKey]:
# An asset is a "leaf" for the dag if it has no transitive dependencies _within_ the dag. It may have
# dependencies _outside_ the dag.
leaf_assets = []
Expand All @@ -47,7 +47,7 @@ def get_leaf_assets_for_dag(
== set()
):
leaf_assets.append(asset_key)
return leaf_assets
return set(leaf_assets)


def get_transitive_dependencies_for_asset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ def all_mapped_tasks(self) -> Dict[AssetKey, List[MappedAirflowTaskData]]:
for spec in self.mapping_info.mapped_asset_specs
}

def task_handle_data_for_dag(self, dag_id: str) -> Dict[str, SerializedTaskHandleData]:
return {
task_id: SerializedTaskHandleData(
migration_state=self.migration_state_map[dag_id].get(task_id),
asset_keys_in_task=self.mapping_info.asset_key_map[dag_id][task_id],
)
for task_id in self.mapping_info.task_id_map[dag_id]
}


def fetch_all_airflow_data(
airflow_instance: AirflowInstance, mapping_info: AirliftMetadataMappingInfo
Expand Down Expand Up @@ -165,31 +174,22 @@ def compute_serialized_data(
) -> "SerializedAirflowDefinitionsData":
mapping_info = build_airlift_metadata_mapping_info(defs)
fetched_airflow_data = fetch_all_airflow_data(airflow_instance, mapping_info)

dag_datas = {}
for dag_id, dag_info in fetched_airflow_data.dag_infos.items():
leaf_asset_keys = get_leaf_assets_for_dag(
asset_keys_in_dag=mapping_info.asset_keys_per_dag_id[dag_id],
downstreams_asset_dependency_graph=mapping_info.downstream_deps,
)
task_handle_data = {}
for task_id in mapping_info.task_id_map[dag_id]:
task_handle_data[task_id] = SerializedTaskHandleData(
migration_state=fetched_airflow_data.migration_state_map[dag_id].get(task_id),
asset_keys_in_task=mapping_info.asset_key_map[dag_id][task_id],
)
dag_datas[dag_id] = SerializedDagData(
dag_id=dag_id,
task_handle_data=task_handle_data,
dag_info=dag_info,
source_code=airflow_instance.get_dag_source_code(dag_info.metadata["file_token"]),
leaf_asset_keys=set(leaf_asset_keys),
)

return SerializedAirflowDefinitionsData(
key_scoped_data_items=[
KeyScopedDataItem(asset_key=k, mapped_tasks=v)
for k, v in fetched_airflow_data.all_mapped_tasks.items()
],
dag_datas=dag_datas,
dag_datas={
dag_id: SerializedDagData(
dag_id=dag_id,
task_handle_data=fetched_airflow_data.task_handle_data_for_dag(dag_id),
dag_info=dag_info,
source_code=airflow_instance.get_dag_source_code(dag_info.metadata["file_token"]),
leaf_asset_keys=get_leaf_assets_for_dag(
asset_keys_in_dag=mapping_info.asset_keys_per_dag_id[dag_id],
downstreams_asset_dependency_graph=mapping_info.downstream_deps,
),
)
for dag_id, dag_info in fetched_airflow_data.dag_infos.items()
},
)

0 comments on commit 1d3c490

Please sign in to comment.