Skip to content

Commit

Permalink
[dagster-airlift] unbreak master (#25119)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Because of automapping, we need to filter out non-executable asset nodes
instead of just erroring. Instead, error when no nodes available
## How I Tested These Changes
Existing tests
## Changelog
NOCHANGELOG
  • Loading branch information
dpeng817 authored Oct 8, 2024
1 parent 888a66f commit 75414fa
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,6 @@ def default_dagster_run_tags(self, context: Context) -> Dict[str, str]:
TASK_ID_TAG_KEY: self.get_airflow_task_id(context),
}

def ensure_executable(self, asset_node: Mapping[str, Any]) -> None:
if not asset_node["jobs"]:
raise Exception(
f"Asset node {asset_node} has no jobs, and therefore cannot be executed."
)

def launch_runs_for_task(self, context: Context, dag_id: str, task_id: str) -> None:
"""Launches runs for the given task in Dagster."""
session = self._get_validated_session(context)
Expand All @@ -140,8 +134,14 @@ def launch_runs_for_task(self, context: Context, dag_id: str, task_id: str) -> N
)
asset_nodes_data = self.get_all_asset_nodes(session, dagster_url, context)
logger.info(f"Got response {asset_nodes_data}")
for asset_node in self.filter_asset_nodes(context, asset_nodes_data):
self.ensure_executable(asset_node)
filtered_asset_nodes = [
asset_node
for asset_node in self.filter_asset_nodes(context, asset_nodes_data)
if _is_asset_node_executable(asset_node)
]
if not filtered_asset_nodes:
raise Exception(f"No asset nodes found to trigger for task {dag_id}.{task_id}")
for asset_node in filtered_asset_nodes:
assets_to_trigger_per_job[_build_dagster_job_identifier(asset_node)].append(
asset_node["assetKey"]["path"]
)
Expand Down Expand Up @@ -213,3 +213,7 @@ def _build_dagster_run_execution_params(
"assetCheckSelection": [],
},
}


def _is_asset_node_executable(asset_node: Mapping[str, Any]) -> bool:
return bool(asset_node["jobs"])
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,24 @@ setup_local_env:
../../scripts/airflow_setup.sh $(MAKEFILE_DIR)/dbt_example/airflow_dags
$(MAKE) dbt_setup

not_proxied:
chmod +x ../../scripts/find_and_replace_in_yaml_dir.sh
../../scripts/find_and_replace_in_yaml_dir.sh $(MAKEFILE_DIR)/dbt_example/airflow_dags/proxied_state True False

proxied:
chmod +x ../../scripts/find_and_replace_in_yaml_dir.sh
../../scripts/find_and_replace_in_yaml_dir.sh $(MAKEFILE_DIR)/dbt_example/airflow_dags/proxied_state False True

run_airflow:
airflow standalone

run_peer:
dagster dev -m dbt_example.dagster_defs.peer -p 3333

run_observe:
chmod +x ../../scripts/find_and_replace_in_yaml_dir.sh
../../scripts/find_and_replace_in_yaml_dir.sh $(MAKEFILE_DIR)/dbt_example/airflow_dags/proxied_state True False
run_observe: not_proxied
dagster dev -m dbt_example.dagster_defs.observe -p 3333

run_migrate:
chmod +x ../../scripts/find_and_replace_in_yaml_dir.sh && \
../../scripts/find_and_replace_in_yaml_dir.sh $(MAKEFILE_DIR)/dbt_example/airflow_dags/proxied_state False True && \
run_migrate: proxied
dagster dev -m dbt_example.dagster_defs.migrate -p 3333

run_complete:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import subprocess
from datetime import timedelta
from typing import Callable, List, Tuple

Expand All @@ -10,6 +11,10 @@
from dbt_example_tests.integration_tests.conftest import makefile_dir


def make_unmigrated() -> None:
subprocess.check_output(["make", "not_proxied", "-C", str(makefile_dir())])


@pytest.fixture(name="dagster_home")
def dagster_home_fixture(local_env: None) -> str:
return os.environ["DAGSTER_HOME"]
Expand Down Expand Up @@ -77,6 +82,8 @@ def test_dagster_materializes(
) -> None:
"""Test that assets can load properly, and that materializations register."""
dagster_dev_module, af_instance_fn = stage_and_fn
if dagster_dev_module.endswith("peer"):
make_unmigrated()
af_instance = af_instance_fn()
for dag_id, expected_asset_key in [("rebuild_iris_models", AssetKey(["lakehouse", "iris"]))]:
run_id = af_instance.trigger_dag(dag_id=dag_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def print_hello() -> None:
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 1, 1),
"retries": 1,
"retries": 0,
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ def print_asset() -> None:
print("Hello, world!")


@asset
def another_print_asset() -> None:
print("Hello, world!")


@asset(description="Asset one is materialized by multiple airflow tasks")
def asset_one() -> None:
# ruff: noqa: T201
Expand All @@ -24,6 +29,7 @@ def build_mapped_defs() -> Definitions:
dag_defs(
"print_dag",
task_defs("print_task", Definitions(assets=[print_asset])),
task_defs("downstream_print_task", Definitions(assets=[another_print_asset])),
),
targeted_by_multiple_tasks(
Definitions([asset_one]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import requests
from airflow import DAG
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseDagsterAssetsOperator, proxying_to_dagster
from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator, proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml


class CustomProxyToDagsterOperator(BaseDagsterAssetsOperator):
class CustomProxyToDagsterOperator(BaseProxyTaskToDagsterOperator):
def get_dagster_session(self, context: Context) -> requests.Session:
if "var" not in context:
raise ValueError("No variables found in context")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import requests
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseDagsterAssetsOperator
from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator


class DagsterCloudProxyOperator(BaseDagsterAssetsOperator):
class DagsterCloudProxyOperator(BaseProxyTaskToDagsterOperator):
def get_variable(self, context: Context, var_name: str) -> str:
if "var" not in context:
raise ValueError("No variables found in context")
Expand Down

0 comments on commit 75414fa

Please sign in to comment.