From 82b42862e75c1783edff4af4a2d98664db03708b Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Fri, 9 Aug 2024 14:06:25 -0700 Subject: [PATCH 1/5] [dagster-airlift] handle transitive dependencies between cacheable assets (#23532) Was previously not properly handling transitive cross dag dependencies between assets when linking the "dag" asset to its constituent task assets. Tested by creating a transitive dependency between two dags, and ensuring that the correct task assets are considered "leaves" --- .../core/airflow_cacheable_assets_def.py | 40 +++++++--- .../integration_tests/conftest.py | 10 ++- .../integration_tests/test_cacheable_asset.py | 15 ++++ .../test_transitive_asset_deps.py | 76 +++++++++++++++++++ .../transitive_asset_deps_dags/first.py | 19 +++++ .../transitive_asset_deps_dags/second.py | 20 +++++ 6 files changed, 165 insertions(+), 15 deletions(-) create mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_transitive_asset_deps.py create mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/first.py create mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/second.py diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py index 3a323889c4c3d..36a6e10e78a50 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py @@ -408,24 +408,40 @@ def get_task_info_for_asset( return airflow_instance.get_task_info(dag_id, task_id) -def list_intersection(list1, list2): - return list(set(list1) & set(list2)) - - def get_leaf_assets_for_dag( asset_keys_in_dag: Set[AssetKey], downstreams_asset_dependency_graph: Dict[AssetKey, Set[AssetKey]], ) -> List[AssetKey]: - # An asset is a "leaf" for the dag if it has no dependencies _within_ the dag. It may have + # An asset is a "leaf" for the dag if it has no transitive dependencies _within_ the dag. It may have # dependencies _outside_ the dag. - return [ - asset_key - for asset_key in asset_keys_in_dag - if list_intersection( - downstreams_asset_dependency_graph.get(asset_key, []), asset_keys_in_dag + leaf_assets = [] + cache = {} + for asset_key in asset_keys_in_dag: + if ( + get_transitive_dependencies_for_asset( + asset_key, downstreams_asset_dependency_graph, cache + ).intersection(asset_keys_in_dag) + == set() + ): + leaf_assets.append(asset_key) + return leaf_assets + + +def get_transitive_dependencies_for_asset( + asset_key: AssetKey, + downstreams_asset_dependency_graph: Dict[AssetKey, Set[AssetKey]], + cache: Dict[AssetKey, Set[AssetKey]], +) -> Set[AssetKey]: + if asset_key in cache: + return cache[asset_key] + transitive_deps = set() + for dep in downstreams_asset_dependency_graph[asset_key]: + transitive_deps.add(dep) + transitive_deps.update( + get_transitive_dependencies_for_asset(dep, downstreams_asset_dependency_graph, cache) ) - == set() - ] + cache[asset_key] = transitive_deps + return transitive_deps def _get_migration_state_for_task( diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/conftest.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/conftest.py index 95ba3965d44bb..a643d250c6483 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/conftest.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/conftest.py @@ -8,16 +8,20 @@ from dagster._core.test_utils import environ +@pytest.fixture(name="dags_dir") +def default_dags_dir(): + return Path(__file__).parent / "dags" + + @pytest.fixture(name="setup") -def setup_fixture() -> Generator[str, None, None]: +def setup_fixture(dags_dir: Path) -> Generator[str, None, None]: with TemporaryDirectory() as tmpdir: # run chmod +x create_airflow_cfg.sh and then run create_airflow_cfg.sh tmpdir temp_env = {**os.environ.copy(), "AIRFLOW_HOME": tmpdir} # go up one directory from current path_to_script = Path(__file__).parent.parent.parent / "airflow_setup.sh" - path_to_dags = Path(__file__).parent / "dags" subprocess.run(["chmod", "+x", path_to_script], check=True, env=temp_env) - subprocess.run([path_to_script, path_to_dags], check=True, env=temp_env) + subprocess.run([path_to_script, dags_dir], check=True, env=temp_env) with environ({"AIRFLOW_HOME": tmpdir}): yield tmpdir diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_cacheable_asset.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_cacheable_asset.py index 37d774f0b2b9e..30152fdf73990 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_cacheable_asset.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_cacheable_asset.py @@ -33,6 +33,7 @@ def test_cacheable_asset(airflow_instance: None) -> None: assert specs[0].tags == {"dagster/compute_kind": "airflow", "airlift/dag_id": "print_dag"} spec_metadata = specs[0].metadata assert set(spec_metadata.keys()) == {"Dag Info (raw)", "Dag ID", "Link to DAG", "Source Code"} + assert specs[0].deps == [] # Now, create a cacheable assets with a task peering. @@ -55,6 +56,8 @@ def first_asset(): def print_dag__downstream_print_task(): return 1 + # Having everything task with false migration state should be functionally equivalent to + # having no migration state. for migration_state in [ AirflowMigrationState( dags={ @@ -109,3 +112,15 @@ def print_dag__downstream_print_task(): assert next(iter(other_key_def.specs)).tags["airlift/task_id"] == "downstream_print_task" assert next(iter(other_key_def.specs)).tags["airlift/dag_id"] == "print_dag" assert next(iter(other_key_def.specs)).deps == [AssetDep(AssetKey(["some", "key"]))] + + dag_def = assets_defs[AssetKey(["airflow_instance", "dag", "print_dag"])] + assert len(list(dag_def.specs)) == 1 + dag_spec = next(iter(dag_def.specs)) + assert dag_spec.metadata.keys() == { + "Dag Info (raw)", + "Dag ID", + "Link to DAG", + "Source Code", + } + # Should have upstream dependency on the "leaf" task in the dag. + assert dag_spec.deps == [AssetDep(AssetKey(["other", "key"]))] diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_transitive_asset_deps.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_transitive_asset_deps.py new file mode 100644 index 0000000000000..3d9c3c65a29a2 --- /dev/null +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_transitive_asset_deps.py @@ -0,0 +1,76 @@ +from pathlib import Path +from typing import List + +import pytest +from dagster import Definitions, multi_asset +from dagster._core.definitions.asset_dep import AssetDep +from dagster._core.definitions.asset_key import AssetKey +from dagster._core.definitions.asset_spec import AssetSpec +from dagster_airlift.core.airflow_cacheable_assets_def import AirflowCacheableAssetsDefinition +from dagster_airlift.core.airflow_instance import AirflowInstance +from dagster_airlift.core.basic_auth import BasicAuthBackend + + +# Dag dir override +@pytest.fixture(name="dags_dir") +def dag_dir_fixture() -> Path: + return Path(__file__).parent / "transitive_asset_deps_dags" + + +def create_asset_for_task(task_id: str, dag_id: str, key: str, dep_keys: List[str] = []): + @multi_asset( + specs=[ + AssetSpec( + key=AssetKey([key]), + deps=[AssetSpec(key=AssetKey([dep_key])) for dep_key in dep_keys], + ) + ], + op_tags={"airlift/task_id": task_id, "airlift/dag_id": dag_id}, + ) + def the_asset(): + return 1 + + return the_asset + + +def test_transitive_deps(airflow_instance: None) -> None: + """Test that even with cross-dag transitive asset deps, the correct dependencies are generated for the asset graph.""" + instance = AirflowInstance( + auth_backend=BasicAuthBackend( + webserver_url="http://localhost:8080", username="admin", password="admin" + ), + name="airflow_instance", + ) + cacheable_assets = AirflowCacheableAssetsDefinition( + airflow_instance=instance, + orchestrated_defs=Definitions( + assets=[ + create_asset_for_task("one", "first", "first_one"), + create_asset_for_task("two", "first", "first_two", dep_keys=["second_one"]), + create_asset_for_task("three", "first", "first_three", dep_keys=["second_two"]), + create_asset_for_task("one", "second", "second_one", dep_keys=["first_one"]), + create_asset_for_task("two", "second", "second_two"), + ] + ), + migration_state_override=None, + poll_interval=1, + ) + + defs = Definitions(assets=[cacheable_assets]) + repository_def = defs.get_repository_def() + assets_defs = repository_def.assets_defs_by_key + assert len(assets_defs) == 7 # 5 tasks + 2 dags + assert AssetKey(["airflow_instance", "dag", "first"]) in assets_defs + dag_def = assets_defs[AssetKey(["airflow_instance", "dag", "first"])] + spec = next(iter(dag_def.specs)) + assert spec.deps == [ + AssetDep(asset=AssetKey(["first_two"])), + AssetDep(asset=AssetKey(["first_three"])), + ] + assert AssetKey(["airflow_instance", "dag", "second"]) in assets_defs + dag_def = assets_defs[AssetKey(["airflow_instance", "dag", "second"])] + spec = next(iter(dag_def.specs)) + assert spec.deps == [ + AssetDep(asset=AssetKey(["second_one"])), + AssetDep(asset=AssetKey(["second_two"])), + ] diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/first.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/first.py new file mode 100644 index 0000000000000..f2899c86d4e59 --- /dev/null +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/first.py @@ -0,0 +1,19 @@ +from datetime import datetime + +from airflow import DAG +from airflow.operators.python import PythonOperator + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2023, 1, 1), + "retries": 1, +} + +dag = DAG("first", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False) +# This task will have a downstream to second.upstream_on_first +one = PythonOperator(task_id="one", python_callable=lambda: None, dag=dag) +# This task will have an upstream on second.only. Meaning first.one should not be considered a "leaf". +two = PythonOperator(task_id="two", python_callable=lambda: None, dag=dag) +# This task will have an upstream on second.only. Meaning first.one should not be considered a "leaf". +three = PythonOperator(task_id="three", python_callable=lambda: None, dag=dag) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/second.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/second.py new file mode 100644 index 0000000000000..8d9c7c424a270 --- /dev/null +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/transitive_asset_deps_dags/second.py @@ -0,0 +1,20 @@ +from datetime import datetime + +from airflow import DAG +from airflow.operators.python import PythonOperator + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2023, 1, 1), + "retries": 1, +} + +dag = DAG( + "second", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False +) +# Neither of these tasks have downstreams within the dag, so they should be considered "leaf" tasks. +# This task will have a downstream to first.two +one = PythonOperator(task_id="one", python_callable=lambda: None, dag=dag) +# This task will have a downstream to first.three +two = PythonOperator(task_id="two", python_callable=lambda: None, dag=dag) From 4a096f8cbcf87165f0cf50bda48ea24f71f45342 Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Fri, 9 Aug 2024 14:08:04 -0700 Subject: [PATCH 2/5] [dagster-airlift] AirflowInstance, sensor refactor (#23538) Made the AirflowInstance vended classes more composable so we're not throwing around raw dicts everywhere. Added some utilities to trigger and wait for the completion of airflow runs, which are used in tests (for now...) Made some refactors to the sensor after realizing we don't actually want to report in topological order... we want to report in task order. So instead, retrieve per-task info about each execution. Better testing of this new method is forthcoming, but existing tests already exercise the codepath. --- .../core/airflow_cacheable_assets_def.py | 28 ++- .../dagster_airlift/core/airflow_instance.py | 173 ++++++++++++++++-- .../dagster_airlift/core/sensor.py | 136 +++++++------- .../integration_tests/__init__.py | 0 .../integration_tests/conftest.py | 8 +- .../test_airflow_instance.py | 36 ++-- .../integration_tests/test_peering.py | 42 ++--- .../test_transitive_asset_deps.py | 13 +- 8 files changed, 276 insertions(+), 160 deletions(-) create mode 100644 examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/__init__.py diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py index 36a6e10e78a50..a63129b6fea28 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py @@ -149,16 +149,14 @@ def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]: dag_specs_per_key: Dict[AssetKey, CacheableAssetSpec] = {} for dag in self.airflow_instance.list_dags(): source_code = self.airflow_instance.get_dag_source_code(dag.metadata["file_token"]) - dag_specs_per_key[self.airflow_instance.get_dag_run_asset_key(dag.dag_id)] = ( - get_cached_spec_for_dag( - airflow_instance=self.airflow_instance, - task_asset_keys_in_dag=cacheable_task_data.all_asset_keys_per_dag_id.get( - dag.dag_id, set() - ), - downstreams_asset_dependency_graph=cacheable_task_data.downstreams_asset_dependency_graph, - dag_info=dag, - source_code=source_code, - ) + dag_specs_per_key[dag.dag_asset_key] = get_cached_spec_for_dag( + airflow_instance=self.airflow_instance, + task_asset_keys_in_dag=cacheable_task_data.all_asset_keys_per_dag_id.get( + dag.dag_id, set() + ), + downstreams_asset_dependency_graph=cacheable_task_data.downstreams_asset_dependency_graph, + dag_info=dag, + source_code=source_code, ) return [ AssetsDefinitionCacheableData( @@ -213,9 +211,7 @@ def get_cached_spec_for_dag( metadata = { "Dag Info (raw)": JsonMetadataValue(dag_info.metadata), "Dag ID": dag_info.dag_id, - "Link to DAG": MarkdownMetadataValue( - f"[View DAG]({airflow_instance.get_dag_url(dag_info.dag_id)})" - ), + "Link to DAG": MarkdownMetadataValue(f"[View DAG]({dag_info.url})"), } # Attempt to retrieve source code from the DAG. metadata["Source Code"] = MarkdownMetadataValue( @@ -227,7 +223,7 @@ def get_cached_spec_for_dag( ) return CacheableAssetSpec( - asset_key=airflow_instance.get_dag_run_asset_key(dag_info.dag_id), + asset_key=dag_info.dag_asset_key, description=f"A materialization corresponds to a successful run of airflow DAG {dag_info.dag_id}.", metadata=metadata, tags={"dagster/compute_kind": "airflow", DAG_ID_TAG: dag_info.dag_id}, @@ -277,9 +273,7 @@ def construct_cacheable_assets_and_infer_dependencies( "Task Info (raw)": JsonMetadataValue(task_info.metadata), # In this case, "Dag ID": task_info.dag_id, - "Link to DAG": MarkdownMetadataValue( - f"[View DAG]({airflow_instance.get_dag_url(task_info.dag_id)})" - ), + "Link to DAG": MarkdownMetadataValue(f"[View DAG]({task_info.dag_url})"), } migration_state_for_task = _get_migration_state_for_task( migration_state, task_info.dag_id, task_info.task_id diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_instance.py b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_instance.py index e64008e2e550a..1f6e0bc78ab34 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_instance.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_instance.py @@ -3,9 +3,12 @@ from typing import Any, Dict, List import requests -from dagster import AssetKey +from dagster._core.definitions.asset_key import AssetKey from dagster._core.errors import DagsterError from dagster._record import record +from dagster._time import get_current_datetime + +TERMINAL_STATES = {"success", "failed", "skipped", "up_for_retry", "up_for_reschedule"} class AirflowAuthBackend(ABC): @@ -32,8 +35,10 @@ def list_dags(self) -> List["DagInfo"]: response = self.auth_backend.get_session().get(f"{self.get_api_url()}/dags") if response.status_code == 200: dags = response.json() + webserver_url = self.auth_backend.get_webserver_url() return [ DagInfo( + webserver_url=webserver_url, dag_id=dag["dag_id"], metadata=dag, ) @@ -44,12 +49,30 @@ def list_dags(self) -> List["DagInfo"]: f"Failed to fetch DAGs. Status code: {response.status_code}, Message: {response.text}" ) + def get_task_instance(self, dag_id: str, task_id: str, run_id: str) -> "TaskInstance": + response = self.auth_backend.get_session().get( + f"{self.get_api_url()}/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}" + ) + if response.status_code == 200: + return TaskInstance( + webserver_url=self.auth_backend.get_webserver_url(), + dag_id=dag_id, + task_id=task_id, + run_id=run_id, + metadata=response.json(), + ) + else: + raise DagsterError( + f"Failed to fetch task instance for {dag_id}/{task_id}/{run_id}. Status code: {response.status_code}, Message: {response.text}" + ) + def get_task_info(self, dag_id: str, task_id: str) -> "TaskInfo": response = self.auth_backend.get_session().get( f"{self.get_api_url()}/dags/{dag_id}/tasks/{task_id}" ) if response.status_code == 200: return TaskInfo( + webserver_url=self.auth_backend.get_webserver_url(), dag_id=dag_id, task_id=task_id, metadata=response.json(), @@ -59,23 +82,6 @@ def get_task_info(self, dag_id: str, task_id: str) -> "TaskInfo": f"Failed to fetch task info for {dag_id}/{task_id}. Status code: {response.status_code}, Message: {response.text}" ) - def get_dag_url(self, dag_id: str) -> str: - return f"{self.auth_backend.get_webserver_url()}/dags/{dag_id}" - - def get_dag_run_url(self, dag_id: str, run_id: str) -> str: - return f"{self.auth_backend.get_webserver_url()}/dags/{dag_id}/grid?dag_run_id={run_id}&tab=details" - - def get_task_instance_url(self, dag_id: str, task_id: str, run_id: str) -> str: - # http://localhost:8080/dags/print_dag/grid?dag_run_id=manual__2024-08-08T17%3A21%3A22.427241%2B00%3A00&task_id=print_task - return f"{self.auth_backend.get_webserver_url()}/dags/{dag_id}/grid?dag_run_id={run_id}&task_id={task_id}" - - def get_task_instance_log_url(self, dag_id: str, task_id: str, run_id: str) -> str: - # http://localhost:8080/dags/print_dag/grid?dag_run_id=manual__2024-08-08T17%3A21%3A22.427241%2B00%3A00&task_id=print_task&tab=logs - return f"{self.get_task_instance_url(dag_id, task_id, run_id)}&tab=logs" - - def get_dag_run_asset_key(self, dag_id: str) -> AssetKey: - return AssetKey([self.normalized_name, "dag", dag_id]) - def get_dag_source_code(self, file_token: str) -> str: response = self.auth_backend.get_session().get( f"{self.get_api_url()}/dagSources/{file_token}" @@ -93,7 +99,7 @@ def airflow_str_from_datetime(dt: datetime.datetime) -> str: def get_dag_runs( self, dag_id: str, start_date: datetime.datetime, end_date: datetime.datetime - ) -> List[Dict[str, Any]]: + ) -> List["DagRun"]: response = self.auth_backend.get_session().get( f"{self.get_api_url()}/dags/{dag_id}/dagRuns", params={ @@ -103,12 +109,55 @@ def get_dag_runs( }, ) if response.status_code == 200: - return response.json()["dag_runs"] + webserver_url = self.auth_backend.get_webserver_url() + return [ + DagRun( + webserver_url=webserver_url, + dag_id=dag_id, + run_id=dag_run["dag_run_id"], + metadata=dag_run, + ) + for dag_run in response.json()["dag_runs"] + ] else: raise DagsterError( f"Failed to fetch dag runs for {dag_id}. Status code: {response.status_code}, Message: {response.text}" ) + def trigger_dag(self, dag_id: str) -> str: + response = self.auth_backend.get_session().post( + f"{self.get_api_url()}/dags/{dag_id}/dagRuns", + json={}, + ) + if response.status_code != 200: + raise DagsterError( + f"Failed to launch run for {dag_id}. Status code: {response.status_code}, Message: {response.text}" + ) + return response.json()["dag_run_id"] + + def get_dag_run(self, dag_id: str, run_id: str) -> "DagRun": + response = self.auth_backend.get_session().get( + f"{self.get_api_url()}/dags/{dag_id}/dagRuns/{run_id}" + ) + if response.status_code != 200: + raise DagsterError( + f"Failed to fetch dag run for {dag_id}/{run_id}. Status code: {response.status_code}, Message: {response.text}" + ) + return DagRun( + webserver_url=self.auth_backend.get_webserver_url(), + dag_id=dag_id, + run_id=run_id, + metadata=response.json(), + ) + + def wait_for_run_completion(self, dag_id: str, run_id: str, timeout: int = 30) -> None: + start_time = get_current_datetime() + while get_current_datetime() - start_time < datetime.timedelta(seconds=timeout): + dag_run = self.get_dag_run(dag_id, run_id) + if dag_run.finished: + return + raise DagsterError(f"Timed out waiting for airflow run {run_id} to finish.") + @staticmethod def timestamp_from_airflow_date(airflow_date: str) -> float: try: @@ -121,12 +170,96 @@ def timestamp_from_airflow_date(airflow_date: str) -> float: @record class DagInfo: + webserver_url: str dag_id: str metadata: Dict[str, Any] + @property + def url(self) -> str: + return f"{self.webserver_url}/dags/{self.dag_id}" + + @property + def dag_asset_key(self) -> AssetKey: + # Conventional asset key representing a successful run of an airfow dag. + return AssetKey(["airflow_instance", "dag", self.dag_id]) + @record class TaskInfo: + webserver_url: str dag_id: str task_id: str metadata: Dict[str, Any] + + @property + def dag_url(self) -> str: + return f"{self.webserver_url}/dags/{self.dag_id}" + + +@record +class TaskInstance: + webserver_url: str + dag_id: str + task_id: str + run_id: str + metadata: Dict[str, Any] + + @property + def note(self) -> str: + return self.metadata.get("note") or "" + + @property + def details_url(self) -> str: + return f"{self.webserver_url}/dags/{self.dag_id}/grid?dag_run_id={self.run_id}&task_id={self.task_id}" + + @property + def log_url(self) -> str: + return f"{self.details_url}&tab=logs" + + @property + def start_date(self) -> float: + return AirflowInstance.timestamp_from_airflow_date(self.metadata["start_date"]) + + @property + def end_date(self) -> float: + return AirflowInstance.timestamp_from_airflow_date(self.metadata["end_date"]) + + +@record +class DagRun: + webserver_url: str + dag_id: str + run_id: str + metadata: Dict[str, Any] + + @property + def note(self) -> str: + return self.metadata.get("note") or "" + + @property + def url(self) -> str: + return f"{self.webserver_url}/dags/{self.dag_id}/grid?dag_run_id={self.run_id}&tab=details" + + @property + def success(self) -> bool: + return self.metadata["state"] == "success" + + @property + def finished(self) -> bool: + return self.metadata["state"] in TERMINAL_STATES + + @property + def run_type(self) -> str: + return self.metadata["run_type"] + + @property + def config(self) -> Dict[str, Any]: + return self.metadata["conf"] + + @property + def start_date(self) -> float: + return AirflowInstance.timestamp_from_airflow_date(self.metadata["start_date"]) + + @property + def end_date(self) -> float: + return AirflowInstance.timestamp_from_airflow_date(self.metadata["end_date"]) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/sensor.py b/examples/experimental/dagster-airlift/dagster_airlift/core/sensor.py index f2ce8197830a7..f26289049c1dd 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/sensor.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/sensor.py @@ -1,6 +1,6 @@ from collections import defaultdict from datetime import timedelta -from typing import Dict, Sequence, Set +from typing import Dict, List, Set, Tuple from dagster import ( AssetKey, @@ -18,11 +18,10 @@ from dagster._core.definitions.repository_definition.repository_definition import ( RepositoryDefinition, ) -from dagster._core.utils import toposort_flatten from dagster._time import datetime_from_timestamp, get_current_datetime, get_current_timestamp -from .airflow_instance import AirflowInstance -from .utils import get_dag_id_from_asset +from .airflow_instance import AirflowInstance, TaskInstance +from .utils import get_dag_id_from_asset, get_task_id_from_asset def build_airflow_polling_sensor( @@ -36,100 +35,95 @@ def build_airflow_polling_sensor( def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult: """Sensor to report materialization events for each asset as new runs come in.""" repository_def = check.not_none(context.repository_def) - toposorted_keys_per_dag = retrieve_toposorted_dag_keys(repository_def) - last_effective_date = ( datetime_from_timestamp(float(context.cursor)) if context.cursor else get_current_datetime() - timedelta(days=1) ) current_date = get_current_datetime() - materializations_to_report = [] - for dag_id, topo_order_keys in toposorted_keys_per_dag.items(): + materializations_to_report: List[Tuple[float, AssetMaterialization]] = [] + for dag_id, (dag_key, task_keys) in retrieve_dag_keys(repository_def).items(): # For now, we materialize assets representing tasks only when the whole dag completes. # With a more robust cursor that can let us know when we've seen a particular task run already, then we can relax this constraint. for dag_run in airflow_instance.get_dag_runs(dag_id, last_effective_date, current_date): - # If the dag run succeeded, add materializations for all assets referring to dags. - if dag_run["state"] != "success": + if not dag_run.success: raise Exception("Should only see successful dag runs at this point.") - for asset_key in topo_order_keys: - asset_node = repository_def.asset_graph.get(asset_key) - spec = asset_node.to_asset_spec() - task_id = spec.tags.get("airlift/task_id") - details_link = ( - airflow_instance.get_dag_run_url(dag_id, dag_run["dag_run_id"]) - if task_id is None - else airflow_instance.get_task_instance_url( - dag_id, task_id, dag_run["dag_run_id"] - ) - ) - metadata = { - "Airflow Run ID": dag_run["dag_run_id"], - "Run Metadata (raw)": JsonMetadataValue(dag_run), - "Start Date": TimestampMetadataValue( - airflow_instance.timestamp_from_airflow_date(dag_run["start_date"]) - ), - "End Date": TimestampMetadataValue( - airflow_instance.timestamp_from_airflow_date(dag_run["end_date"]) + + metadata = { + "Airflow Run ID": dag_run.run_id, + "Run Metadata (raw)": JsonMetadataValue(dag_run.metadata), + "Run Type": dag_run.run_type, + "Airflow Config": JsonMetadataValue(dag_run.config), + "Creation Timestamp": TimestampMetadataValue(get_current_timestamp()), + } + # Add dag materialization + dag_metadata = { + **metadata, + "Run Details": MarkdownMetadataValue(f"[View Run]({dag_run.url})"), + "Start Date": TimestampMetadataValue(dag_run.start_date), + "End Date": TimestampMetadataValue(dag_run.end_date), + } + materializations_to_report.append( + ( + dag_run.end_date, + AssetMaterialization( + asset_key=dag_key, + description=dag_run.note, + metadata=dag_metadata, ), - "Run Type": dag_run["run_type"], - "Airflow Config": JsonMetadataValue(dag_run["conf"]), - "Run Details": MarkdownMetadataValue(f"[View]({details_link})"), - "Creation Timestamp": TimestampMetadataValue(get_current_timestamp()), + ) + ) + task_runs = {} + for task_id, asset_key in task_keys: + task_run: TaskInstance = task_runs.get( + task_id, airflow_instance.get_task_instance(dag_id, task_id, dag_run.run_id) + ) + task_runs[task_id] = task_run + task_metadata = { + **metadata, + "Run Details": MarkdownMetadataValue(f"[View Run]({task_run.details_url})"), + "Task Logs": MarkdownMetadataValue(f"[View Logs]({task_run.log_url})"), + "Start Date": TimestampMetadataValue(dag_run.start_date), + "End Date": TimestampMetadataValue(dag_run.end_date), } - if task_id: - metadata["Task Logs"] = MarkdownMetadataValue( - f"[View Logs]({airflow_instance.get_task_instance_log_url(dag_id, task_id, dag_run['dag_run_id'])})" - ) - materializations_to_report.append( - AssetMaterialization( - asset_key=asset_key, - description=dag_run["note"], - metadata=metadata, + ( + task_run.end_date, + AssetMaterialization( + asset_key=asset_key, + description=task_run.note, + metadata=task_metadata, + ), ) ) + # Sort materialization + sorted_mats = sorted(materializations_to_report, key=lambda x: x[0]) context.update_cursor(str(current_date.timestamp())) return SensorResult( - asset_events=materializations_to_report, + asset_events=[sorted_mat[1] for sorted_mat in sorted_mats], ) return airflow_dag_sensor -def retrieve_toposorted_dag_keys( +def retrieve_dag_keys( repository_def: RepositoryDefinition, -) -> Dict[str, Sequence[AssetKey]]: - """For each dag, retrieve the topologically sorted list of asset keys.""" +) -> Dict[str, Tuple[AssetKey, Set[Tuple[str, AssetKey]]]]: + """For each dag, retrieve the list of asset keys.""" # First, we need to retrieve the upstreams for each asset key - upstreams_asset_dependency_graph: Dict[AssetKey, Set[AssetKey]] = defaultdict(set) - asset_keys_per_dag: Dict[str, Set[AssetKey]] = defaultdict(set) + key_per_dag = {} + task_keys_per_dag = defaultdict(set) for assets_def in repository_def.assets_defs_by_key.values(): # We could be more specific about the checks here to ensure that there's only one asset key # specifying the dag, and that all others have a task id. dag_id = get_dag_id_from_asset(assets_def) + task_id = get_task_id_from_asset(assets_def) if dag_id is None: continue - for spec in assets_def.specs: - for dep in spec.deps: - upstreams_asset_dependency_graph[spec.key].add(dep.asset_key) - asset_keys_per_dag[dag_id].add(spec.key) - - # Now, we can retrieve the topologically sorted list of asset keys for each dag - dag_keys_to_toposorted_asset_keys: Dict[str, Sequence[AssetKey]] = {} - for dag_id, asset_keys in asset_keys_per_dag.items(): - dag_keys_to_toposorted_asset_keys[dag_id] = toposort_keys( - asset_keys, upstreams_asset_dependency_graph - ) - - return dag_keys_to_toposorted_asset_keys - - -def toposort_keys( - keys_in_subgraph: Set[AssetKey], upstreams_asset_dependency_graph: Dict[AssetKey, Set[AssetKey]] -) -> Sequence[AssetKey]: - narrowed_asset_dependency_graph = { - key: {dep for dep in upstreams_asset_dependency_graph[key] if dep in keys_in_subgraph} - for key in keys_in_subgraph - } - return toposort_flatten(narrowed_asset_dependency_graph) + if task_id is None: + key_per_dag[dag_id] = ( + assets_def.key + ) # There should only be one key in the case of a "dag" asset + else: + task_keys_per_dag[dag_id].update((task_id, spec.key) for spec in assets_def.specs) + return {dag_id: (key, task_keys_per_dag[dag_id]) for dag_id, key in key_per_dag.items()} diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/__init__.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/conftest.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/conftest.py index a643d250c6483..781d2d5bb33dc 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/conftest.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/conftest.py @@ -2,12 +2,18 @@ import subprocess from pathlib import Path from tempfile import TemporaryDirectory -from typing import Generator +from typing import Any, Generator import pytest +import requests from dagster._core.test_utils import environ +def assert_link_exists(link_name: str, link_url: Any): + assert isinstance(link_url, str) + assert requests.get(link_url).status_code == 200, f"{link_name} is broken" + + @pytest.fixture(name="dags_dir") def default_dags_dir(): return Path(__file__).parent / "dags" diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_airflow_instance.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_airflow_instance.py index 07d219ff4ddb9..058be6642e418 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_airflow_instance.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_airflow_instance.py @@ -2,8 +2,10 @@ from dagster._core.errors import DagsterError from dagster_airlift.core import AirflowInstance, BasicAuthBackend +from .conftest import assert_link_exists -def test_airflow_instance(airflow_instance: None): + +def test_airflow_instance(airflow_instance: None) -> None: """Test AirflowInstance APIs against live-running airflow. Airflow is loaded with one dag (print_dag) which contains two tasks (print_task, downstream_print_task). @@ -28,10 +30,12 @@ def test_airflow_instance(airflow_instance: None): task_info = instance.get_task_info(dag_id="print_dag", task_id="print_task") assert task_info.dag_id == "print_dag" assert task_info.task_id == "print_task" + assert_link_exists("Dag url from task info object", task_info.dag_url) task_info = instance.get_task_info(dag_id="print_dag", task_id="downstream_print_task") assert task_info.dag_id == "print_dag" assert task_info.task_id == "downstream_print_task" + assert_link_exists("Dag url from task info object", task_info.dag_url) # Attempt a nonexistent task with pytest.raises( @@ -39,14 +43,24 @@ def test_airflow_instance(airflow_instance: None): ): instance.get_task_info(dag_id="print_dag", task_id="nonexistent_task") - assert instance.get_dag_url(dag_id="print_dag") == "http://localhost:8080/dags/print_dag" - assert ( - instance.get_dag_run_url(dag_id="print_dag", run_id="run_id") - == "http://localhost:8080/dags/print_dag/grid?dag_run_id=run_id&tab=details" - ) - assert ( - instance.get_task_instance_log_url( - dag_id="print_dag", task_id="print_task", run_id="run_id" - ) - == "http://localhost:8080/dags/print_dag/grid?dag_run_id=run_id&task_id=print_task&tab=logs" + # Kick off a run of the dag. + run_id = instance.trigger_dag(dag_id="print_dag") + instance.wait_for_run_completion(dag_id="print_dag", run_id=run_id) + run = instance.get_dag_run(dag_id="print_dag", run_id=run_id) + + assert run.run_id == run_id + assert_link_exists("Dag run", run.url) + + assert run.finished + assert run.success + + # Fetch task instance + task_instance = instance.get_task_instance( + dag_id="print_dag", task_id="print_task", run_id=run_id ) + assert_link_exists("Task instance", task_instance.details_url) + assert_link_exists("Task logs", task_instance.log_url) + + assert isinstance(task_instance.start_date, float) + assert isinstance(task_instance.end_date, float) + assert isinstance(task_instance.note, str) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_peering.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_peering.py index 16232c3a1af89..465968ebb6a41 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_peering.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_peering.py @@ -1,9 +1,7 @@ import re -import time from typing import Any, cast import pytest -import requests from dagster import ( AssetDep, AssetsDefinition, @@ -24,6 +22,8 @@ TaskMigrationState, ) +from .conftest import assert_link_exists + @pytest.mark.flaky(reruns=2) def test_dag_peering( @@ -91,7 +91,7 @@ def print_dag__downstream_print_task(): "[View DAG](http://localhost:8080/dags/print_dag)" ) link = extract_link(spec_metadata["Link to DAG"].value) - _assert_link_exists("Link to DAG", link) + assert_link_exists("Link to DAG", link) assert "Source Code" in spec_metadata task_def: AssetsDefinition = [ # noqa @@ -105,7 +105,7 @@ def print_dag__downstream_print_task(): "[View DAG](http://localhost:8080/dags/print_dag)" ) link = extract_link(task_spec.metadata["Link to DAG"].value) - _assert_link_exists("Link to DAG", link) + assert_link_exists("Link to DAG", link) other_task_def: AssetsDefinition = [ # noqa assets_def for assets_def in assets_defs if assets_def.key == AssetKey(["other", "key"]) @@ -120,23 +120,8 @@ def print_dag__downstream_print_task(): sensor_def = next(iter(defs.sensors)) # Kick off a run of the dag - response = requests.post( - "http://localhost:8080/api/v1/dags/print_dag/dagRuns", auth=("admin", "admin"), json={} - ) - assert response.status_code == 200, response.json() - # Wait until the run enters a terminal state - terminal_status = None - while True: - response = requests.get( - "http://localhost:8080/api/v1/dags/print_dag/dagRuns", auth=("admin", "admin") - ) - assert response.status_code == 200, response.json() - dag_runs = response.json()["dag_runs"] - if dag_runs[0]["state"] in ["success", "failed"]: - terminal_status = dag_runs[0]["state"] - break - time.sleep(1) - assert terminal_status == "success" + run_id = instance.trigger_dag("print_dag") + instance.wait_for_run_completion("print_dag", run_id, timeout=60) # invoke the sensor and check the sensor result. It should contain a new asset materialization for the dag. with instance_for_test() as instance: @@ -154,10 +139,10 @@ def print_dag__downstream_print_task(): assert "manual" in cast(str, dag_mat.metadata["Airflow Run ID"].value) run_id = dag_mat.metadata["Airflow Run ID"].value assert dag_mat.metadata["Run Details"] == MarkdownMetadataValue( - f"[View](http://localhost:8080/dags/print_dag/grid?dag_run_id={run_id}&tab=details)" + f"[View Run](http://localhost:8080/dags/print_dag/grid?dag_run_id={run_id}&tab=details)" ) pure_link = extract_link(dag_mat.metadata["Run Details"].value) - _assert_link_exists("Run Details", pure_link) + assert_link_exists("Run Details", pure_link) assert dag_mat.metadata["Airflow Config"] == JsonMetadataValue({}) task_mat = [ # noqa @@ -171,17 +156,17 @@ def print_dag__downstream_print_task(): assert "manual" in cast(str, task_mat.metadata["Airflow Run ID"].value) run_id = task_mat.metadata["Airflow Run ID"].value assert task_mat.metadata["Run Details"] == MarkdownMetadataValue( - f"[View](http://localhost:8080/dags/print_dag/grid?dag_run_id={run_id}&task_id=print_task)" + f"[View Run](http://localhost:8080/dags/print_dag/grid?dag_run_id={run_id}&task_id=print_task)" ) link = extract_link(task_mat.metadata["Run Details"].value) - _assert_link_exists("Run Details", link) + assert_link_exists("Run Details", link) assert "Task Logs" in task_mat.metadata assert task_mat.metadata["Task Logs"] == MarkdownMetadataValue( f"[View Logs](http://localhost:8080/dags/print_dag/grid?dag_run_id={run_id}&task_id=print_task&tab=logs)" ) link = extract_link(task_mat.metadata["Task Logs"].value) - _assert_link_exists("Task Logs", link) + assert_link_exists("Task Logs", link) other_mat = [ # noqa asset_mat @@ -197,11 +182,6 @@ def print_dag__downstream_print_task(): ) -def _assert_link_exists(link_name: str, link_url: Any): - assert isinstance(link_url, str) - assert requests.get(link_url).status_code == 200, f"{link_name} is broken" - - def extract_link(mrkdwn: Any) -> str: match = re.search(r"\[.*\]\((.*)\)", mrkdwn) assert match diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_transitive_asset_deps.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_transitive_asset_deps.py index 3d9c3c65a29a2..00f5bc554b4d0 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_transitive_asset_deps.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_transitive_asset_deps.py @@ -3,7 +3,6 @@ import pytest from dagster import Definitions, multi_asset -from dagster._core.definitions.asset_dep import AssetDep from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec from dagster_airlift.core.airflow_cacheable_assets_def import AirflowCacheableAssetsDefinition @@ -63,14 +62,10 @@ def test_transitive_deps(airflow_instance: None) -> None: assert AssetKey(["airflow_instance", "dag", "first"]) in assets_defs dag_def = assets_defs[AssetKey(["airflow_instance", "dag", "first"])] spec = next(iter(dag_def.specs)) - assert spec.deps == [ - AssetDep(asset=AssetKey(["first_two"])), - AssetDep(asset=AssetKey(["first_three"])), - ] + deps_keys = {dep.asset_key for dep in spec.deps} + assert deps_keys == {AssetKey(["first_two"]), AssetKey(["first_three"])} assert AssetKey(["airflow_instance", "dag", "second"]) in assets_defs dag_def = assets_defs[AssetKey(["airflow_instance", "dag", "second"])] spec = next(iter(dag_def.specs)) - assert spec.deps == [ - AssetDep(asset=AssetKey(["second_one"])), - AssetDep(asset=AssetKey(["second_two"])), - ] + deps_keys = {dep.asset_key for dep in spec.deps} + assert deps_keys == {AssetKey(["second_one"]), AssetKey(["second_two"])} From a2ab668d62e9055e135c652c076e7c80b6d07a84 Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Fri, 9 Aug 2024 14:08:25 -0700 Subject: [PATCH 3/5] [dagster-airlift] in-airflow submodule (#23493) removes the airflow dependency from airlift core, and creates a new "airflow"submodule for airflow-related dependencies. Tested by setting up tox environment and ensuring that's working. Moves "migration state" code into a neutral directory so it can be picked up by both the airflow submodule and the core submodule, and removes dependency on dagster project from that codepath. --- .../dagster_airlift/core/__init__.py | 2 +- .../core/airflow_cacheable_assets_def.py | 2 +- .../dagster_airlift/core/defs_from_airflow.py | 2 +- .../dagster_airlift/core/migration_state.py | 40 ------------- .../dagster_airlift/in_airflow/__init__.py | 0 .../dagster_airlift/migration_state.py | 56 +++++++++++++++++++ .../integration_tests/test_cacheable_asset.py | 2 +- .../integration_tests/test_peering.py | 2 +- .../unit_tests/test_migration_state.py | 6 +- .../examples/peering-with-dbt/setup.py | 2 +- .../examples/peering-with-dbt/tox.ini | 2 +- .../experimental/dagster-airlift/setup.py | 16 ++++-- examples/experimental/dagster-airlift/tox.ini | 2 +- 13 files changed, 77 insertions(+), 57 deletions(-) delete mode 100644 examples/experimental/dagster-airlift/dagster_airlift/core/migration_state.py create mode 100644 examples/experimental/dagster-airlift/dagster_airlift/in_airflow/__init__.py create mode 100644 examples/experimental/dagster-airlift/dagster_airlift/migration_state.py diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py b/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py index e3a5c35f069c1..8c4126af4adcb 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py @@ -1,7 +1,7 @@ +from ..migration_state import load_migration_state_from_yaml as load_migration_state_from_yaml from .basic_auth import BasicAuthBackend as BasicAuthBackend from .defs_from_airflow import ( AirflowInstance as AirflowInstance, build_defs_from_airflow_instance as build_defs_from_airflow_instance, ) -from .migration_state import load_migration_state_from_yaml as load_migration_state_from_yaml from .multi_asset import PythonDefs as PythonDefs diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py index a63129b6fea28..b8969fcfe5dab 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_cacheable_assets_def.py @@ -30,7 +30,7 @@ unpack_value, ) -from dagster_airlift.core.migration_state import AirflowMigrationState +from dagster_airlift.migration_state import AirflowMigrationState from .airflow_instance import AirflowInstance, DagInfo, TaskInfo from .utils import get_dag_id_from_asset, get_task_id_from_asset diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/defs_from_airflow.py b/examples/experimental/dagster-airlift/dagster_airlift/core/defs_from_airflow.py index 84bc8ccf8aabc..d0da5b383b491 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/defs_from_airflow.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/defs_from_airflow.py @@ -4,9 +4,9 @@ from dagster_airlift.core.sensor import build_airflow_polling_sensor +from ..migration_state import AirflowMigrationState from .airflow_cacheable_assets_def import DEFAULT_POLL_INTERVAL, AirflowCacheableAssetsDefinition from .airflow_instance import AirflowInstance -from .migration_state import AirflowMigrationState def build_defs_from_airflow_instance( diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/migration_state.py b/examples/experimental/dagster-airlift/dagster_airlift/core/migration_state.py deleted file mode 100644 index 130b6a5ff363c..0000000000000 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/migration_state.py +++ /dev/null @@ -1,40 +0,0 @@ -from pathlib import Path -from typing import Dict - -from dagster._core.errors import DagsterInvalidDefinitionError -from dagster._utils.pydantic_yaml import parse_yaml_file_to_pydantic -from pydantic import BaseModel - - -class TaskMigrationState(BaseModel): - migrated: bool - - -class DagMigrationState(BaseModel): - tasks: Dict[str, TaskMigrationState] - - -class AirflowMigrationState(BaseModel): - dags: Dict[str, DagMigrationState] - - def get_migration_state_for_task(self, dag_id: str, task_id: str) -> bool: - return self.dags[dag_id].tasks[task_id].migrated - - -def load_migration_state_from_yaml(migration_yaml_path: Path) -> AirflowMigrationState: - # Expect migration_yaml_path to be a directory, where each file represents a dag, and each - # file in the subdir represents a task. The dictionary each task should consist of a single bit: - # migrated: true/false. - dag_migration_states = {} - try: - for dag_file in migration_yaml_path.iterdir(): - # Check that the file is a yaml file or yml file - if dag_file.suffix not in [".yaml", ".yml"]: - continue - dag_id = dag_file.stem - dag_migration_states[dag_id] = parse_yaml_file_to_pydantic( - DagMigrationState, dag_file.read_text(), str(dag_file) - ) - except Exception as e: - raise DagsterInvalidDefinitionError("Error parsing migration yaml") from e - return AirflowMigrationState(dags=dag_migration_states) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/__init__.py b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/experimental/dagster-airlift/dagster_airlift/migration_state.py b/examples/experimental/dagster-airlift/dagster_airlift/migration_state.py new file mode 100644 index 0000000000000..bb9a258f9fc83 --- /dev/null +++ b/examples/experimental/dagster-airlift/dagster_airlift/migration_state.py @@ -0,0 +1,56 @@ +from pathlib import Path +from typing import Dict, NamedTuple + +import yaml + + +class TaskMigrationState(NamedTuple): + migrated: bool + + +class DagMigrationState(NamedTuple): + tasks: Dict[str, TaskMigrationState] + + +class AirflowMigrationState(NamedTuple): + dags: Dict[str, DagMigrationState] + + def get_migration_state_for_task(self, dag_id: str, task_id: str) -> bool: + return self.dags[dag_id].tasks[task_id].migrated + + +class MigrationStateParsingError(Exception): + pass + + +def load_migration_state_from_yaml(migration_yaml_path: Path) -> AirflowMigrationState: + # Expect migration_yaml_path to be a directory, where each file represents a dag, and each + # file in the subdir represents a task. The dictionary each task should consist of a single bit: + # migrated: true/false. + dag_migration_states = {} + try: + for dag_file in migration_yaml_path.iterdir(): + # Check that the file is a yaml file or yml file + if dag_file.suffix not in [".yaml", ".yml"]: + continue + dag_id = dag_file.stem + yaml_dict = yaml.safe_load(dag_file.read_text()) + if not isinstance(yaml_dict, dict): + raise Exception("Expected a dictionary") + if "tasks" not in yaml_dict: + raise Exception("Expected a 'tasks' key in the yaml") + task_migration_states = {} + for task_id, task_dict in yaml_dict["tasks"].items(): + if not isinstance(task_dict, dict): + raise Exception("Expected a dictionary for each task") + if "migrated" not in task_dict: + raise Exception("Expected a 'migrated' key in the task dictionary") + if set(task_dict.keys()) != {"migrated"}: + raise Exception("Expected only a 'migrated' key in the task dictionary") + if task_dict["migrated"] not in [True, False]: + raise Exception("Expected 'migrated' key to be a boolean") + task_migration_states[task_id] = TaskMigrationState(migrated=task_dict["migrated"]) + dag_migration_states[dag_id] = DagMigrationState(tasks=task_migration_states) + except Exception as e: + raise MigrationStateParsingError("Error parsing migration yaml") from e + return AirflowMigrationState(dags=dag_migration_states) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_cacheable_asset.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_cacheable_asset.py index 30152fdf73990..9f97c4c64b700 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_cacheable_asset.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_cacheable_asset.py @@ -5,7 +5,7 @@ from dagster_airlift.core.airflow_cacheable_assets_def import AirflowCacheableAssetsDefinition from dagster_airlift.core.airflow_instance import AirflowInstance from dagster_airlift.core.basic_auth import BasicAuthBackend -from dagster_airlift.core.migration_state import ( +from dagster_airlift.migration_state import ( AirflowMigrationState, DagMigrationState, TaskMigrationState, diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_peering.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_peering.py index 465968ebb6a41..1b1e677771894 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_peering.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/integration_tests/test_peering.py @@ -16,7 +16,7 @@ from dagster._core.definitions.asset_key import AssetKey from dagster._core.test_utils import instance_for_test from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance -from dagster_airlift.core.migration_state import ( +from dagster_airlift.migration_state import ( AirflowMigrationState, DagMigrationState, TaskMigrationState, diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_migration_state.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_migration_state.py index c472685d34096..d08720202ba1a 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_migration_state.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/test_migration_state.py @@ -1,11 +1,11 @@ from pathlib import Path import pytest -from dagster._core.errors import DagsterInvalidDefinitionError from dagster_airlift.core import load_migration_state_from_yaml -from dagster_airlift.core.migration_state import ( +from dagster_airlift.migration_state import ( AirflowMigrationState, DagMigrationState, + MigrationStateParsingError, TaskMigrationState, ) @@ -38,5 +38,5 @@ def test_migration_state() -> None: incorrect_dirs = ["empty_file", "nonexistent_dir", "extra_key", "nonsense"] for incorrect_dir in incorrect_dirs: incorrect_migration_file = Path(__file__).parent / "migration_state_yamls" / incorrect_dir - with pytest.raises(DagsterInvalidDefinitionError, match="Error parsing migration yaml"): + with pytest.raises(MigrationStateParsingError, match="Error parsing migration yaml"): load_migration_state_from_yaml(incorrect_migration_file) diff --git a/examples/experimental/dagster-airlift/examples/peering-with-dbt/setup.py b/examples/experimental/dagster-airlift/examples/peering-with-dbt/setup.py index 49c2d7893d208..78ce2071e8e65 100644 --- a/examples/experimental/dagster-airlift/examples/peering-with-dbt/setup.py +++ b/examples/experimental/dagster-airlift/examples/peering-with-dbt/setup.py @@ -24,7 +24,7 @@ def get_version() -> str: install_requires=[ f"dagster{pin}", f"dagster-webserver{pin}", - f"dagster-airlift[dbt]{pin}", + f"dagster-airlift[dbt,core,in-airflow]{pin}", "dbt-duckdb", "pandas", ], diff --git a/examples/experimental/dagster-airlift/examples/peering-with-dbt/tox.ini b/examples/experimental/dagster-airlift/examples/peering-with-dbt/tox.ini index 8b2b459c69235..9f92ed8ab5d27 100644 --- a/examples/experimental/dagster-airlift/examples/peering-with-dbt/tox.ini +++ b/examples/experimental/dagster-airlift/examples/peering-with-dbt/tox.ini @@ -15,7 +15,7 @@ deps = -e ../../../../../python_modules/dagster-pipes -e ../../../../../python_modules/dagster-graphql -e ../../../../../python_modules/libraries/dagster-dbt - -e ../../../dagster-airlift[core,dbt,test] + -e ../../../dagster-airlift[core,dbt,test,in-airflow] -e . pandas allowlist_externals = diff --git a/examples/experimental/dagster-airlift/setup.py b/examples/experimental/dagster-airlift/setup.py index 56d5f984c3940..623dbcf8466dd 100644 --- a/examples/experimental/dagster-airlift/setup.py +++ b/examples/experimental/dagster-airlift/setup.py @@ -1,5 +1,14 @@ from setuptools import find_packages, setup +airflow_dep_list = [ + "apache-airflow>=2.0.0,<2.8", + # Flask-session 0.6 is incompatible with certain airflow-provided test + # utilities. + "flask-session<0.6.0", + "connexion<3.0.0", # https://github.com/apache/airflow/issues/35234 + "pendulum>=2.0.0,<3.0.0", +] + setup( name="dagster-airlift", version="0.0.3", @@ -24,13 +33,8 @@ extras_require={ "core": [ "dagster", - "apache-airflow>=2.0.0,<2.8", - # Flask-session 0.6 is incompatible with certain airflow-provided test - # utilities. - "flask-session<0.6.0", - "connexion<3.0.0", # https://github.com/apache/airflow/issues/35234 - "pendulum>=2.0.0,<3.0.0", ], + "in-airflow": airflow_dep_list, "mwaa": ["boto3"], "dbt": ["dagster-dbt"], "test": ["pytest", "dagster-dbt", "dbt-duckdb", "boto3"], diff --git a/examples/experimental/dagster-airlift/tox.ini b/examples/experimental/dagster-airlift/tox.ini index d2d4e217bd518..ef360da0de058 100644 --- a/examples/experimental/dagster-airlift/tox.ini +++ b/examples/experimental/dagster-airlift/tox.ini @@ -13,7 +13,7 @@ deps = -e ../../../python_modules/dagster-test -e ../../../python_modules/dagster-pipes -e ../../../python_modules/libraries/dagster-dbt - -e .[core,mwaa,dbt,test] + -e .[core,mwaa,dbt,test,in-airflow] dbt-duckdb allowlist_externals = /bin/bash From 49b84679d8e34f36c6298be7294c7ebfc3ce0061 Mon Sep 17 00:00:00 2001 From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com> Date: Fri, 9 Aug 2024 14:30:39 -0700 Subject: [PATCH 4/5] Fix broken docs build (#23562) ## Summary & Motivation ## How I Tested These Changes --- .github/workflows/build-docs.yml | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build-docs.yml b/.github/workflows/build-docs.yml index 262d9a5df6f4f..a01a04f77172b 100644 --- a/.github/workflows/build-docs.yml +++ b/.github/workflows/build-docs.yml @@ -31,14 +31,14 @@ jobs: REF_NAME: ${{ github.ref_name }} if: | github.event_name == 'pull_request' || - (github.event_name == 'push' && (github.ref == 'refs/heads/master' || startsWith(github.ref, 'refs/heads/release-'))) + (github.event_name == 'push' && (github.ref == 'refs/heads/master' || startsWith(github.ref, 'refs/heads/release-') || startsWith(github.ref, 'refs/heads/docs-prod'))) run: | BRANCH_PREVIEW_SUBDOMAIN=$(echo "${HEAD_REF:-$REF_NAME}" | sed -e 's/[^a-zA-Z0-9-]/-/g; s/^-*//; s/-*$//') echo "$BRANCH_PREVIEW_SUBDOMAIN" echo "BRANCH_PREVIEW_SUBDOMAIN=$BRANCH_PREVIEW_SUBDOMAIN" >> "${GITHUB_ENV}" - name: Checkout master/release branch - if: github.event_name == 'push' && (github.ref == 'refs/heads/master' || startsWith(github.ref, 'refs/heads/release-')) + if: github.event_name == 'push' && (github.ref == 'refs/heads/master' || startsWith(github.ref, 'refs/heads/release-') || startsWith(github.ref, 'refs/heads/docs-prod')) uses: actions/checkout@v4 - name: Get PR fetch depth @@ -71,6 +71,7 @@ jobs: - name: Copy doc snippets to public directory run: | + ls mkdir -p docs/next/public/docs_snippets cp -R examples/docs_snippets/docs_snippets docs/next/public/docs_snippets/ @@ -78,7 +79,7 @@ jobs: uses: amondnet/vercel-action@v25 if: | github.event_name == 'pull_request' || - (github.event_name == 'push' && (github.ref == 'refs/heads/master' || startsWith(github.ref, 'refs/heads/release-'))) + (github.event_name == 'push' && (github.ref == 'refs/heads/master' || startsWith(github.ref, 'refs/heads/release-') || startsWith(github.ref, 'refs/heads/docs-prod'))) with: github-comment: ${{ github.event.pull_request && env.CHANGES_ENTRY || true }} vercel-token: ${{ secrets.VERCEL_TOKEN }} @@ -108,4 +109,4 @@ jobs: vercel-project-id: ${{ secrets.VERCEL_PROJECT_ID }} vercel-args: "--prod" github-token: ${{ secrets.GITHUB_TOKEN }} - scope: ${{ secrets.VERCEL_ORG_ID }} + scope: ${{ secrets.VERCEL_ORG_ID }} \ No newline at end of file From 319959f98d55a231bf4ffb2c3a57577dd141af43 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 9 Aug 2024 15:01:03 -0700 Subject: [PATCH 5/5] docstring for AssetsDefinition.get_asset_spec (#23561) ## Summary & Motivation ## How I Tested These Changes --- .../dagster/dagster/_core/definitions/assets.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index e0b65f3b084cf..58f78f11ac95d 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -1403,6 +1403,18 @@ def _output_to_source_asset(self, output_name: str) -> SourceAsset: @public def get_asset_spec(self, key: Optional[AssetKey] = None) -> AssetSpec: + """Returns a representation of this asset as an :py:class:`AssetSpec`. + + If this is a multi-asset, the "key" argument allows selecting which asset to return the + spec for. + + Args: + key (Optional[AssetKey]): If this is a multi-asset, select which asset to return its + AssetSpec. If not a multi-asset, this can be left as None. + + Returns: + AssetSpec + """ return self._specs_by_key[key or self.key] def get_io_manager_key_for_asset_key(self, key: AssetKey) -> str: