Skip to content

Commit

Permalink
[dagster-airlift][dag] Remove SerializedTaskHandleData (#25127)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Continuing the quest to simplify the implementation here.
This information can also be inferred from mapping info, so get rid of
it.
## How I Tested These Changes
Existing tests.
## Changelog
NOCHANGELOG
  • Loading branch information
dpeng817 authored Oct 10, 2024
1 parent 01f23b7 commit 8fb2133
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
KeyScopedDataItem,
SerializedAirflowDefinitionsData,
SerializedDagData,
SerializedTaskHandleData,
TaskHandle,
TaskInfo,
)
Expand Down Expand Up @@ -94,14 +93,6 @@ def all_mapped_tasks(self) -> Dict[AssetKey, AbstractSet[TaskHandle]]:
spec.key: task_handles_for_spec(spec) for spec in self.mapping_info.mapped_asset_specs
}

def task_handle_data_for_dag(self, dag_id: str) -> Dict[str, SerializedTaskHandleData]:
return {
task_id: SerializedTaskHandleData(
asset_keys_in_task=self.mapping_info.asset_key_map[dag_id][task_id],
)
for task_id in self.mapping_info.task_id_map[dag_id]
}


def fetch_all_airflow_data(
airflow_instance: AirflowInstance, mapping_info: AirliftMetadataMappingInfo
Expand Down Expand Up @@ -135,7 +126,6 @@ def compute_serialized_data(
dag_datas={
dag_id: SerializedDagData(
dag_id=dag_id,
task_handle_data=fetched_airflow_data.task_handle_data_for_dag(dag_id),
dag_info=dag_info,
source_code=airflow_instance.get_dag_source_code(dag_info.metadata["file_token"]),
leaf_asset_keys=get_leaf_assets_for_dag(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ class SerializedDagData:
"""A record containing pre-computed data about a given airflow dag."""

dag_id: str
task_handle_data: Mapping[str, "SerializedTaskHandleData"]
dag_info: DagInfo
source_code: str
leaf_asset_keys: Set[AssetKey]
Expand Down Expand Up @@ -92,13 +91,3 @@ class SerializedAirflowDefinitionsData:
@cached_property
def all_mapped_tasks(self) -> Dict[AssetKey, AbstractSet[TaskHandle]]:
return {item.asset_key: item.mapped_tasks for item in self.key_scoped_data_items}


# History:
# - created
@whitelist_for_serdes
@record
class SerializedTaskHandleData:
"""A record containing known data about a given airflow task handle."""

asset_keys_in_task: AbstractSet[AssetKey]
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,6 @@ def test_automapped_loaded_data() -> None:
airflow_instance=airflow_instance,
)

mapping_info = build_airlift_metadata_mapping_info(defs)

fetched_airflow_data = fetch_all_airflow_data(airflow_instance, mapping_info)

airflow_data = AirflowDefinitionsData(airflow_instance=airflow_instance, mapped_defs=defs)

task_handle_data = fetched_airflow_data.task_handle_data_for_dag("dag1")
assert task_handle_data["task1"].asset_keys_in_task == {ak("test_instance/dag/dag1/task/task1")}
assert task_handle_data["task2"].asset_keys_in_task == {ak("test_instance/dag/dag1/task/task2")}

assert airflow_data.task_ids_in_dag("dag1") == {"task1", "task2"}

0 comments on commit 8fb2133

Please sign in to comment.