diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/compute.py b/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/compute.py index 21642b39cc331..dcf5208211a53 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/compute.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/compute.py @@ -11,7 +11,6 @@ KeyScopedDataItem, SerializedAirflowDefinitionsData, SerializedDagData, - SerializedTaskHandleData, TaskHandle, TaskInfo, ) @@ -94,14 +93,6 @@ def all_mapped_tasks(self) -> Dict[AssetKey, AbstractSet[TaskHandle]]: spec.key: task_handles_for_spec(spec) for spec in self.mapping_info.mapped_asset_specs } - def task_handle_data_for_dag(self, dag_id: str) -> Dict[str, SerializedTaskHandleData]: - return { - task_id: SerializedTaskHandleData( - asset_keys_in_task=self.mapping_info.asset_key_map[dag_id][task_id], - ) - for task_id in self.mapping_info.task_id_map[dag_id] - } - def fetch_all_airflow_data( airflow_instance: AirflowInstance, mapping_info: AirliftMetadataMappingInfo @@ -135,7 +126,6 @@ def compute_serialized_data( dag_datas={ dag_id: SerializedDagData( dag_id=dag_id, - task_handle_data=fetched_airflow_data.task_handle_data_for_dag(dag_id), dag_info=dag_info, source_code=airflow_instance.get_dag_source_code(dag_info.metadata["file_token"]), leaf_asset_keys=get_leaf_assets_for_dag( diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/serialized_data.py b/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/serialized_data.py index 431fdfca5ee78..a6a4879baec0c 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/serialized_data.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/serialized_data.py @@ -59,7 +59,6 @@ class SerializedDagData: """A record containing pre-computed data about a given airflow dag.""" dag_id: str - task_handle_data: Mapping[str, "SerializedTaskHandleData"] dag_info: DagInfo source_code: str leaf_asset_keys: Set[AssetKey] @@ -92,13 +91,3 @@ class SerializedAirflowDefinitionsData: @cached_property def all_mapped_tasks(self) -> Dict[AssetKey, AbstractSet[TaskHandle]]: return {item.asset_key: item.mapped_tasks for item in self.key_scoped_data_items} - - -# History: -# - created -@whitelist_for_serdes -@record -class SerializedTaskHandleData: - """A record containing known data about a given airflow task handle.""" - - asset_keys_in_task: AbstractSet[AssetKey] diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_airflow_asset_mapping.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_airflow_asset_mapping.py index 108c0897b3784..f984d0c4cb0c5 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_airflow_asset_mapping.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_airflow_asset_mapping.py @@ -218,14 +218,6 @@ def test_automapped_loaded_data() -> None: airflow_instance=airflow_instance, ) - mapping_info = build_airlift_metadata_mapping_info(defs) - - fetched_airflow_data = fetch_all_airflow_data(airflow_instance, mapping_info) - airflow_data = AirflowDefinitionsData(airflow_instance=airflow_instance, mapped_defs=defs) - task_handle_data = fetched_airflow_data.task_handle_data_for_dag("dag1") - assert task_handle_data["task1"].asset_keys_in_task == {ak("test_instance/dag/dag1/task/task1")} - assert task_handle_data["task2"].asset_keys_in_task == {ak("test_instance/dag/dag1/task/task2")} - assert airflow_data.task_ids_in_dag("dag1") == {"task1", "task2"}