Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-airlift] Fix materialization reporting order for tasks #23326

Merged
merged 3 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,14 @@ auth_backend = airflow.api.auth.backend.basic_auth
[webserver]
expose_config = True

EOL
EOL

# call airflow command to create the default user
airflow db migrate && \
airflow users create \
--username admin \
--password admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email [email protected]
1 change: 1 addition & 0 deletions examples/experimental/dagster-airlift/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytest_plugins = ["dagster_airlift.shared_fixtures"]
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
multi_asset,
sensor,
)
from dagster._time import datetime_from_timestamp, get_current_datetime
from dagster._core.utils import toposort_flatten
from dagster._time import datetime_from_timestamp, get_current_datetime, get_current_timestamp
from dagster_dbt import build_dbt_asset_specs
from pydantic import BaseModel

Expand Down Expand Up @@ -217,9 +218,12 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult:
"Link to Run": MarkdownMetadataValue(
f"[View Run]({airflow_webserver_url}/dags/{dag_id}/grid?dag_run_id={dag_run['dag_run_id']}&tab=details)"
),
"Creation Timestamp": TimestampMetadataValue(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider "Event Creation Timestamp"

get_current_timestamp()
),
},
)
for spec in specs
for spec in toposort_specs(specs)
]
)
context.update_cursor(str(current_date.timestamp()))
Expand Down Expand Up @@ -304,3 +308,13 @@ def get_leaf_specs(specs: List[AssetSpec]) -> List[AssetSpec]:
if key not in downstreams_per_key
]
return leaf_specs


def toposort_specs(specs: List[AssetSpec]) -> List[AssetSpec]:
spec_per_key = {spec.key: spec for spec in specs}
return [
spec_per_key[key]
for key in toposort_flatten(
{spec.key: {dep.asset_key for dep in spec.deps} for spec in specs}
)
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
import signal
import subprocess
import time
from typing import Any, Generator

import pytest
import requests
from dagster._time import get_current_timestamp


def airflow_is_ready():
try:
response = requests.get("http://localhost:8080")
return response.status_code == 200
except:
return False


# Setup should have set AIRFLOW_HOME env var
@pytest.fixture(name="airflow_instance")
def airflow_instance_fixture(setup: None) -> Generator[Any, None, None]:
process = subprocess.Popen(
["airflow", "standalone"],
env=os.environ, # since we have some temp vars in the env right now
shell=False,
preexec_fn=os.setsid, # noqa # fuck it we ball
)
# Give airflow a second to stand up
time.sleep(5)
initial_time = get_current_timestamp()

airflow_ready = False
while get_current_timestamp() - initial_time < 30:
if airflow_is_ready():
airflow_ready = True
break
time.sleep(1)

assert airflow_ready, "Airflow did not start within 30 seconds..."
yield process
# Kill process group, since process.kill and process.terminate do not work.
os.killpg(process.pid, signal.SIGKILL)
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ def print_hello():
dag = DAG(
"print_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False
)
load_iris = PythonOperator(task_id="print_task", python_callable=print_hello, dag=dag)
print_op = PythonOperator(task_id="print_task", python_callable=print_hello, dag=dag)
downstream_print_op = PythonOperator(
task_id="downstream_print_task", python_callable=print_hello, dag=dag
)
Original file line number Diff line number Diff line change
@@ -1,65 +1,24 @@
import os
import signal
import subprocess
from tempfile import TemporaryDirectory
from typing import Any, Generator
from typing import Generator

import pytest
from dagster._core.test_utils import environ


@pytest.fixture(name="airflow_home_dir")
def airflow_home_dir_fixture() -> Generator[str, None, None]:
@pytest.fixture(name="setup")
def setup_fixture() -> Generator[str, None, None]:
with TemporaryDirectory() as tmpdir:
# run chmod +x create_airflow_cfg.sh and then run create_airflow_cfg.sh tmpdir
temp_env = {**os.environ.copy(), "AIRFLOW_HOME": tmpdir}
# go up one directory from current
path_to_script = os.path.join(os.path.dirname(__file__), "..", "create_airflow_cfg.sh")
path_to_script = os.path.join(os.path.dirname(__file__), "..", "airflow_setup.sh")
path_to_dags = os.path.join(os.path.dirname(__file__), "airflow_project", "dags")
subprocess.run(["chmod", "+x", path_to_script], check=True, env=temp_env)
subprocess.run([path_to_script, path_to_dags], check=True, env=temp_env)
yield tmpdir


@pytest.fixture(name="airflow_instance")
def airflow_instance_fixture(airflow_home_dir: str) -> Generator[Any, None, None]:
temp_env = {**os.environ.copy(), "AIRFLOW_HOME": airflow_home_dir}
subprocess.run(["airflow", "db", "migrate"], check=True, env=temp_env)
subprocess.run(
[
"airflow",
"users",
"create",
"--username",
"admin",
"--firstname",
"admin",
"--lastname",
"admin",
"--role",
"Admin",
"--email",
"[email protected]",
"--password",
"admin",
],
check=True,
env=temp_env,
)
process = subprocess.Popen(
["airflow", "standalone"],
env=temp_env,
shell=False,
stdout=subprocess.PIPE,
preexec_fn=os.setsid, # noqa # fuck it we ball
)
assert process.stdout is not None
for line in process.stdout:
if "Airflow is ready" in line.decode():
break
yield process
# Kill process group, since process.kill and process.terminate do not work.
os.killpg(process.pid, signal.SIGKILL)
process.wait()
with environ({"AIRFLOW_HOME": tmpdir}):
yield tmpdir


@pytest.fixture(name="dbt_project_dir")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
from typing import cast

import requests
from dagster import JsonMetadataValue, MarkdownMetadataValue, SensorResult, build_sensor_context
from dagster import (
AssetDep,
JsonMetadataValue,
MarkdownMetadataValue,
SensorResult,
build_sensor_context,
)
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.test_utils import instance_for_test
from dagster_airlift import (
Expand All @@ -14,7 +20,6 @@

def test_dag_peering(
airflow_instance: None,
airflow_home_dir: str,
) -> None:
"""Test that dags can be correctly peered from airflow, and certain metadata properties are retained."""
assets_defs = assets_defs_from_airflow_instance(
Expand All @@ -26,10 +31,16 @@ def test_dag_peering(
dag_id="print_dag",
task_id="print_task",
key=AssetKey(["some", "key"]),
)
),
TaskMapping(
dag_id="print_dag",
task_id="downstream_print_task",
key=AssetKey(["other", "key"]),
deps=[AssetDep(AssetKey(["some", "key"]))],
),
],
)
assert len(assets_defs) == 2
assert len(assets_defs) == 3
dag_def = [ # noqa
assets_def
for assets_def in assets_defs
Expand Down Expand Up @@ -83,7 +94,7 @@ def test_dag_peering(
sensor_context = build_sensor_context(instance=instance)
sensor_result = sensor_def(sensor_context)
assert isinstance(sensor_result, SensorResult)
assert len(sensor_result.asset_events) == 2
assert len(sensor_result.asset_events) == 3
dag_mat = [ # noqa
asset_mat
for asset_mat in sensor_result.asset_events
Expand Down Expand Up @@ -112,3 +123,16 @@ def test_dag_peering(
assert task_mat.metadata["Link to Run"] == MarkdownMetadataValue(
f"[View Run](http://localhost:8080/dags/print_dag/grid?dag_run_id={run_id}&tab=details)"
)

other_mat = [ # noqa
asset_mat
for asset_mat in sensor_result.asset_events
if asset_mat.asset_key == AssetKey(["other", "key"])
][0]

assert other_mat
# other mat should be downstream of task mat
assert ( # type: ignore
other_mat.metadata["Creation Timestamp"].value
>= task_mat.metadata["Creation Timestamp"].value
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,8 @@ setup_local_env:
make wipe && \
mkdir -p $$AIRFLOW_HOME && \
mkdir -p $$DAGSTER_HOME && \
chmod +x ../../create_airflow_cfg.sh && \
../../create_airflow_cfg.sh $(MAKEFILE_DIR)/peering_with_dbt/airflow_dags && \
airflow db migrate && \
airflow users create \
--username admin \
--password admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email [email protected] && \
chmod +x ../../airflow_setup.sh && \
../../airflow_setup.sh $(MAKEFILE_DIR)/peering_with_dbt/airflow_dags && \
make dbt_setup

run_airflow:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytest_plugins = ["dagster_airlift.shared_fixtures"]
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import signal
import subprocess
from typing import Any, Generator
from typing import Generator

import pytest
from dagster._core.test_utils import environ
Expand All @@ -19,21 +18,3 @@ def setup_fixture() -> Generator[None, None, None]:
):
yield
subprocess.run(["make", "wipe"], check=True)


@pytest.fixture(name="airflow_instance")
def airflow_instance_fixture(setup: None) -> Generator[Any, None, None]:
process = subprocess.Popen(
["airflow", "standalone"],
env=os.environ, # since we have some temp vars in the env right now
shell=False,
stdout=subprocess.PIPE,
preexec_fn=os.setsid, # noqa # fuck it we ball
)
assert process.stdout is not None
for line in process.stdout:
if "Airflow is ready" in line.decode():
break
yield process
# Kill process group, since process.kill and process.terminate do not work.
os.killpg(process.pid, signal.SIGKILL)