Skip to content

Commit

Permalink
[dagster-airlift] move tags to a neutral location (#24197)
Browse files Browse the repository at this point in the history
Removes a core import from in_airflow. Gets rid of relative pathing
along the way.
  • Loading branch information
dpeng817 authored Sep 4, 2024
1 parent ead2863 commit 36dce92
Show file tree
Hide file tree
Showing 9 changed files with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
MIGRATED_TAG = "airlift/task_migrated"
DAG_ID_TAG = "airlift/dag_id"
TASK_ID_TAG = "airlift/task_id"
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,14 @@
unpack_value,
)

from dagster_airlift.core.utils import convert_to_valid_dagster_name
from dagster_airlift.migration_state import AirflowMigrationState

from .airflow_instance import AirflowInstance, DagInfo, TaskInfo
from .utils import (
DAG_ID_TAG,
MIGRATED_TAG,
TASK_ID_TAG,
from dagster_airlift.constants import DAG_ID_TAG, MIGRATED_TAG, TASK_ID_TAG
from dagster_airlift.core.airflow_instance import AirflowInstance, DagInfo, TaskInfo
from dagster_airlift.core.utils import (
convert_to_valid_dagster_name,
get_dag_id_from_asset,
get_task_id_from_asset,
)
from dagster_airlift.migration_state import AirflowMigrationState


# We serialize dictionaries as json, and json doesn't know how to serialize AssetKeys. So we wrap the mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
_check as check,
)

from dagster_airlift.core.utils import DAG_ID_TAG, TASK_ID_TAG
from dagster_airlift.constants import DAG_ID_TAG, TASK_ID_TAG


class TaskDefs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dagster._core.definitions.asset_key import CoercibleToAssetKey
from typing_extensions import TypeAlias

from .utils import DAG_ID_TAG, TASK_ID_TAG
from dagster_airlift.constants import DAG_ID_TAG, TASK_ID_TAG

CoercibleToAssetSpec: TypeAlias = Union[AssetSpec, CoercibleToAssetKey]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
from dagster._record import record
from dagster._time import datetime_from_timestamp, get_current_datetime, get_current_timestamp

from .airflow_instance import AirflowInstance, TaskInstance
from .utils import MIGRATED_TAG, get_dag_id_from_asset, get_task_id_from_asset
from dagster_airlift.constants import MIGRATED_TAG
from dagster_airlift.core.airflow_instance import AirflowInstance, TaskInstance
from dagster_airlift.core.utils import get_dag_id_from_asset, get_task_id_from_asset


def build_airflow_polling_sensor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
)
from dagster._core.definitions.utils import VALID_NAME_REGEX

MIGRATED_TAG = "airlift/task_migrated"
DAG_ID_TAG = "airlift/dag_id"
TASK_ID_TAG = "airlift/task_id"
from dagster_airlift.constants import DAG_ID_TAG, TASK_ID_TAG


def convert_to_valid_dagster_name(name: str) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from airflow.models.operator import BaseOperator
from airflow.utils.context import Context

from dagster_airlift.core.utils import DAG_ID_TAG, TASK_ID_TAG
from dagster_airlift.constants import DAG_ID_TAG, TASK_ID_TAG

from .gql_queries import ASSET_NODES_QUERY, RUNS_QUERY, TRIGGER_ASSETS_MUTATION, VERIFICATION_QUERY

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dagster import AssetKey, AssetSpec, Definitions, multi_asset
from dagster._core.definitions.asset_key import CoercibleToAssetKey
from dagster_airlift.constants import DAG_ID_TAG, TASK_ID_TAG
from dagster_airlift.core import dag_defs, task_defs
from dagster_airlift.core.utils import DAG_ID_TAG, TASK_ID_TAG


def from_specs(*specs: AssetSpec) -> Definitions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from dagster import DagsterInstance
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.storage.dagster_run import DagsterRunStatus
from dagster_airlift.constants import MIGRATED_TAG, TASK_ID_TAG
from dagster_airlift.core import AirflowInstance, BasicAuthBackend
from dagster_airlift.core.utils import MIGRATED_TAG, TASK_ID_TAG

from .utils import (
poll_for_materialization,
Expand Down

0 comments on commit 36dce92

Please sign in to comment.