From 36dce92d6235374a9064e271b73bb626871151d9 Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Wed, 4 Sep 2024 15:52:16 -0700 Subject: [PATCH] [dagster-airlift] move tags to a neutral location (#24197) Removes a core import from in_airflow. Gets rid of relative pathing along the way. --- .../dagster-airlift/dagster_airlift/constants.py | 3 +++ .../core/airflow_cacheable_assets_def.py | 13 +++++-------- .../dagster_airlift/core/dag_defs.py | 2 +- .../dagster_airlift/core/defs_builders.py | 2 +- .../dagster-airlift/dagster_airlift/core/sensor.py | 5 +++-- .../dagster-airlift/dagster_airlift/core/utils.py | 4 +--- .../in_airflow/base_proxy_operator.py | 2 +- .../unit_tests/core_tests/test_dag_defs.py | 2 +- .../integration_tests/test_migrating_e2e.py | 2 +- 9 files changed, 17 insertions(+), 18 deletions(-) create mode 100644 examples/experimental/dagster-airlift/dagster_airlift/constants.py diff --git a/examples/experimental/dagster-airlift/dagster_airlift/constants.py b/examples/experimental/dagster-airlift/dagster_airlift/constants.py new file mode 100644 index 0000000000000..c31c0cef6d3b2 --- /dev/null +++ b/examples/experimental/dagster-airlift/dagster_airlift/constants.py @@ -0,0 +1,3 @@ +MIGRATED_TAG = "airlift/task_migrated" +DAG_ID_TAG = "airlift/dag_id" +TASK_ID_TAG = "airlift/task_id" diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py index 09364d08324c0..d41e82c9cf5a7 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py @@ -33,17 +33,14 @@ unpack_value, ) -from dagster_airlift.core.utils import convert_to_valid_dagster_name -from dagster_airlift.migration_state import AirflowMigrationState - -from .airflow_instance import AirflowInstance, DagInfo, TaskInfo -from .utils import ( - DAG_ID_TAG, - MIGRATED_TAG, - TASK_ID_TAG, +from dagster_airlift.constants import DAG_ID_TAG, MIGRATED_TAG, TASK_ID_TAG +from dagster_airlift.core.airflow_instance import AirflowInstance, DagInfo, TaskInfo +from dagster_airlift.core.utils import ( + convert_to_valid_dagster_name, get_dag_id_from_asset, get_task_id_from_asset, ) +from dagster_airlift.migration_state import AirflowMigrationState # We serialize dictionaries as json, and json doesn't know how to serialize AssetKeys. So we wrap the mapping diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/dag_defs.py b/examples/experimental/dagster-airlift/dagster_airlift/core/dag_defs.py index 32aba23a02f89..09e2aea229fed 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/dag_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/dag_defs.py @@ -7,7 +7,7 @@ _check as check, ) -from dagster_airlift.core.utils import DAG_ID_TAG, TASK_ID_TAG +from dagster_airlift.constants import DAG_ID_TAG, TASK_ID_TAG class TaskDefs: diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/defs_builders.py b/examples/experimental/dagster-airlift/dagster_airlift/core/defs_builders.py index 4d2a2601a754a..c12db414fc220 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/defs_builders.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/defs_builders.py @@ -4,7 +4,7 @@ from dagster._core.definitions.asset_key import CoercibleToAssetKey from typing_extensions import TypeAlias -from .utils import DAG_ID_TAG, TASK_ID_TAG +from dagster_airlift.constants import DAG_ID_TAG, TASK_ID_TAG CoercibleToAssetSpec: TypeAlias = Union[AssetSpec, CoercibleToAssetKey] diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/sensor.py b/examples/experimental/dagster-airlift/dagster_airlift/core/sensor.py index 80a365e324f4e..88d9853f64ce2 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/sensor.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/sensor.py @@ -25,8 +25,9 @@ from dagster._record import record from dagster._time import datetime_from_timestamp, get_current_datetime, get_current_timestamp -from .airflow_instance import AirflowInstance, TaskInstance -from .utils import MIGRATED_TAG, get_dag_id_from_asset, get_task_id_from_asset +from dagster_airlift.constants import MIGRATED_TAG +from dagster_airlift.core.airflow_instance import AirflowInstance, TaskInstance +from dagster_airlift.core.utils import get_dag_id_from_asset, get_task_id_from_asset def build_airflow_polling_sensor( diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/utils.py b/examples/experimental/dagster-airlift/dagster_airlift/core/utils.py index 37cd728419fbe..c8ad36a4aaebd 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/utils.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/utils.py @@ -7,9 +7,7 @@ ) from dagster._core.definitions.utils import VALID_NAME_REGEX -MIGRATED_TAG = "airlift/task_migrated" -DAG_ID_TAG = "airlift/dag_id" -TASK_ID_TAG = "airlift/task_id" +from dagster_airlift.constants import DAG_ID_TAG, TASK_ID_TAG def convert_to_valid_dagster_name(name: str) -> str: diff --git a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_proxy_operator.py b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_proxy_operator.py index 658f4d5c241f1..974435cdeec20 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_proxy_operator.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_proxy_operator.py @@ -8,7 +8,7 @@ from airflow.models.operator import BaseOperator from airflow.utils.context import Context -from dagster_airlift.core.utils import DAG_ID_TAG, TASK_ID_TAG +from dagster_airlift.constants import DAG_ID_TAG, TASK_ID_TAG from .gql_queries import ASSET_NODES_QUERY, RUNS_QUERY, TRIGGER_ASSETS_MUTATION, VERIFICATION_QUERY diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_dag_defs.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_dag_defs.py index 80c51b732943f..5a8d8451e186b 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_dag_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_dag_defs.py @@ -1,7 +1,7 @@ from dagster import AssetKey, AssetSpec, Definitions, multi_asset from dagster._core.definitions.asset_key import CoercibleToAssetKey +from dagster_airlift.constants import DAG_ID_TAG, TASK_ID_TAG from dagster_airlift.core import dag_defs, task_defs -from dagster_airlift.core.utils import DAG_ID_TAG, TASK_ID_TAG def from_specs(*specs: AssetSpec) -> Definitions: diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example_tests/integration_tests/test_migrating_e2e.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example_tests/integration_tests/test_migrating_e2e.py index 63343e03c081e..994a5a178dd44 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example_tests/integration_tests/test_migrating_e2e.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example_tests/integration_tests/test_migrating_e2e.py @@ -7,8 +7,8 @@ from dagster import DagsterInstance from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.storage.dagster_run import DagsterRunStatus +from dagster_airlift.constants import MIGRATED_TAG, TASK_ID_TAG from dagster_airlift.core import AirflowInstance, BasicAuthBackend -from dagster_airlift.core.utils import MIGRATED_TAG, TASK_ID_TAG from .utils import ( poll_for_materialization,