From 75414fa6bca3563ff850d9084f3e70216c33f9d6 Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Mon, 7 Oct 2024 17:59:53 -0700 Subject: [PATCH] [dagster-airlift] unbreak master (#25119) ## Summary & Motivation Because of automapping, we need to filter out non-executable asset nodes instead of just erroring. Instead, error when no nodes available ## How I Tested These Changes Existing tests ## Changelog NOCHANGELOG --- .../in_airflow/base_asset_operator.py | 20 +++++++++++-------- .../examples/dbt-example/Makefile | 16 +++++++++------ .../integration_tests/test_e2e.py | 7 +++++++ .../kitchen_sink/airflow_dags/print_dag.py | 2 +- .../kitchen_sink/dagster_defs/mapped_defs.py | 6 ++++++ .../custom_operator_examples/custom_proxy.py | 4 ++-- .../plus_proxy_operator.py | 4 ++-- 7 files changed, 40 insertions(+), 19 deletions(-) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_asset_operator.py b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_asset_operator.py index d029299ce2a1b..10889a272980b 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_asset_operator.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_asset_operator.py @@ -124,12 +124,6 @@ def default_dagster_run_tags(self, context: Context) -> Dict[str, str]: TASK_ID_TAG_KEY: self.get_airflow_task_id(context), } - def ensure_executable(self, asset_node: Mapping[str, Any]) -> None: - if not asset_node["jobs"]: - raise Exception( - f"Asset node {asset_node} has no jobs, and therefore cannot be executed." - ) - def launch_runs_for_task(self, context: Context, dag_id: str, task_id: str) -> None: """Launches runs for the given task in Dagster.""" session = self._get_validated_session(context) @@ -140,8 +134,14 @@ def launch_runs_for_task(self, context: Context, dag_id: str, task_id: str) -> N ) asset_nodes_data = self.get_all_asset_nodes(session, dagster_url, context) logger.info(f"Got response {asset_nodes_data}") - for asset_node in self.filter_asset_nodes(context, asset_nodes_data): - self.ensure_executable(asset_node) + filtered_asset_nodes = [ + asset_node + for asset_node in self.filter_asset_nodes(context, asset_nodes_data) + if _is_asset_node_executable(asset_node) + ] + if not filtered_asset_nodes: + raise Exception(f"No asset nodes found to trigger for task {dag_id}.{task_id}") + for asset_node in filtered_asset_nodes: assets_to_trigger_per_job[_build_dagster_job_identifier(asset_node)].append( asset_node["assetKey"]["path"] ) @@ -213,3 +213,7 @@ def _build_dagster_run_execution_params( "assetCheckSelection": [], }, } + + +def _is_asset_node_executable(asset_node: Mapping[str, Any]) -> bool: + return bool(asset_node["jobs"]) diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/Makefile b/examples/experimental/dagster-airlift/examples/dbt-example/Makefile index a9f75e6e27345..63674e7153d71 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/Makefile +++ b/examples/experimental/dagster-airlift/examples/dbt-example/Makefile @@ -31,20 +31,24 @@ setup_local_env: ../../scripts/airflow_setup.sh $(MAKEFILE_DIR)/dbt_example/airflow_dags $(MAKE) dbt_setup +not_proxied: + chmod +x ../../scripts/find_and_replace_in_yaml_dir.sh + ../../scripts/find_and_replace_in_yaml_dir.sh $(MAKEFILE_DIR)/dbt_example/airflow_dags/proxied_state True False + +proxied: + chmod +x ../../scripts/find_and_replace_in_yaml_dir.sh + ../../scripts/find_and_replace_in_yaml_dir.sh $(MAKEFILE_DIR)/dbt_example/airflow_dags/proxied_state False True + run_airflow: airflow standalone run_peer: dagster dev -m dbt_example.dagster_defs.peer -p 3333 -run_observe: - chmod +x ../../scripts/find_and_replace_in_yaml_dir.sh - ../../scripts/find_and_replace_in_yaml_dir.sh $(MAKEFILE_DIR)/dbt_example/airflow_dags/proxied_state True False +run_observe: not_proxied dagster dev -m dbt_example.dagster_defs.observe -p 3333 -run_migrate: - chmod +x ../../scripts/find_and_replace_in_yaml_dir.sh && \ - ../../scripts/find_and_replace_in_yaml_dir.sh $(MAKEFILE_DIR)/dbt_example/airflow_dags/proxied_state False True && \ +run_migrate: proxied dagster dev -m dbt_example.dagster_defs.migrate -p 3333 run_complete: diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/test_e2e.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/test_e2e.py index 4e6c665066379..72ae11e71b3b9 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/test_e2e.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/test_e2e.py @@ -1,4 +1,5 @@ import os +import subprocess from datetime import timedelta from typing import Callable, List, Tuple @@ -10,6 +11,10 @@ from dbt_example_tests.integration_tests.conftest import makefile_dir +def make_unmigrated() -> None: + subprocess.check_output(["make", "not_proxied", "-C", str(makefile_dir())]) + + @pytest.fixture(name="dagster_home") def dagster_home_fixture(local_env: None) -> str: return os.environ["DAGSTER_HOME"] @@ -77,6 +82,8 @@ def test_dagster_materializes( ) -> None: """Test that assets can load properly, and that materializations register.""" dagster_dev_module, af_instance_fn = stage_and_fn + if dagster_dev_module.endswith("peer"): + make_unmigrated() af_instance = af_instance_fn() for dag_id, expected_asset_key in [("rebuild_iris_models", AssetKey(["lakehouse", "iris"]))]: run_id = af_instance.trigger_dag(dag_id=dag_id) diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/print_dag.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/print_dag.py index 463163d912c92..62f32cede23b5 100644 --- a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/print_dag.py +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/print_dag.py @@ -15,7 +15,7 @@ def print_hello() -> None: "owner": "airflow", "depends_on_past": False, "start_date": datetime(2023, 1, 1), - "retries": 1, + "retries": 0, } diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/mapped_defs.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/mapped_defs.py index 90347f0d997f3..c46e5ad923566 100644 --- a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/mapped_defs.py +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/mapped_defs.py @@ -11,6 +11,11 @@ def print_asset() -> None: print("Hello, world!") +@asset +def another_print_asset() -> None: + print("Hello, world!") + + @asset(description="Asset one is materialized by multiple airflow tasks") def asset_one() -> None: # ruff: noqa: T201 @@ -24,6 +29,7 @@ def build_mapped_defs() -> Definitions: dag_defs( "print_dag", task_defs("print_task", Definitions(assets=[print_asset])), + task_defs("downstream_print_task", Definitions(assets=[another_print_asset])), ), targeted_by_multiple_tasks( Definitions([asset_one]), diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/custom_operator_examples/custom_proxy.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/custom_operator_examples/custom_proxy.py index 8019e7fa6a689..51da6f95d711f 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/custom_operator_examples/custom_proxy.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/custom_operator_examples/custom_proxy.py @@ -3,11 +3,11 @@ import requests from airflow import DAG from airflow.utils.context import Context -from dagster_airlift.in_airflow import BaseDagsterAssetsOperator, proxying_to_dagster +from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator, proxying_to_dagster from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml -class CustomProxyToDagsterOperator(BaseDagsterAssetsOperator): +class CustomProxyToDagsterOperator(BaseProxyTaskToDagsterOperator): def get_dagster_session(self, context: Context) -> requests.Session: if "var" not in context: raise ValueError("No variables found in context") diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/custom_operator_examples/plus_proxy_operator.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/custom_operator_examples/plus_proxy_operator.py index 8da230792f77b..520a54cd4a39d 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/custom_operator_examples/plus_proxy_operator.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/custom_operator_examples/plus_proxy_operator.py @@ -1,9 +1,9 @@ import requests from airflow.utils.context import Context -from dagster_airlift.in_airflow import BaseDagsterAssetsOperator +from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator -class DagsterCloudProxyOperator(BaseDagsterAssetsOperator): +class DagsterCloudProxyOperator(BaseProxyTaskToDagsterOperator): def get_variable(self, context: Context, var_name: str) -> str: if "var" not in context: raise ValueError("No variables found in context")