Skip to content

Commit

Permalink
[dagster-airlift] create migrating dag
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Aug 5, 2024
1 parent a70502a commit 0a4a7bd
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
PythonDefs as PythonDefs,
load_defs_from_yaml as load_defs_from_yaml,
)
from .within_airflow import mark_as_dagster_migrating as mark_as_dagster_migrating
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import json
import sys

from airflow import DAG


def mark_as_dagster_migrating(
migration_status: dict,
) -> None:
"""Alters all airflow dags in the current context to be marked as migrating to dagster.
Uses a migration dictionary to determine the status of the migration for each task within each dag.
Should only ever be the last line in a dag file.
"""
# get global context from above frame
global_vars = sys._getframe(1).f_globals # noqa: SLF001
globals_to_update = {}
for var in global_vars:
if not isinstance(global_vars[var], DAG):
continue
dag: DAG = global_vars[var]
if dag.dag_id in migration_status:
tags = [
*dag.tags,
json.dumps({"dagster_migration": migration_status[dag.dag_id]}),
]
new_dag = DAG(
dag_id=dag.dag_id,
description=dag.description,
schedule_interval=dag.schedule_interval,
timetable=dag.timetable,
start_date=dag.start_date,
end_date=dag.end_date,
full_filepath=dag.full_filepath,
template_searchpath=dag.template_searchpath,
template_undefined=dag.template_undefined,
user_defined_macros=dag.user_defined_macros,
user_defined_filters=dag.user_defined_filters,
default_args=dag.default_args,
concurrency=dag.concurrency,
max_active_tasks=dag.max_active_tasks,
max_active_runs=dag.max_active_runs,
dagrun_timeout=dag.dagrun_timeout,
sla_miss_callback=dag.sla_miss_callback,
default_view=dag.default_view,
orientation=dag.orientation,
catchup=dag.catchup,
on_success_callback=dag.on_success_callback,
on_failure_callback=dag.on_failure_callback,
doc_md=dag.doc_md,
params=dag.params,
access_control=dag.access_control,
is_paused_upon_creation=dag.is_paused_upon_creation,
jinja_environment_kwargs=dag.jinja_environment_kwargs,
render_template_as_native_obj=dag.render_template_as_native_obj,
tags=tags,
owner_links=dag.owner_links,
auto_register=dag.auto_register,
fail_stop=dag.fail_stop,
)
globals_to_update[var] = new_dag
global_vars.update(globals_to_update)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from dagster_airlift import mark_as_dagster_migrating


def print_hello():
print("Hello") # noqa: T201


default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 1, 1),
"retries": 1,
}

dag = DAG(
"print_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False
)
print_op = PythonOperator(task_id="print_task", python_callable=print_hello, dag=dag)
downstream_print_op = PythonOperator(
task_id="downstream_print_task", python_callable=print_hello, dag=dag
)

mark_as_dagster_migrating(
{
"print_dag": {
"print_task": True,
"downstream_print_task": False,
}
}
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import json
import os
import subprocess
from tempfile import TemporaryDirectory
from typing import Generator

import pytest
import requests
from dagster._core.test_utils import environ


@pytest.fixture(name="setup")
def setup_fixture() -> Generator[str, None, None]:
with TemporaryDirectory() as tmpdir:
# run chmod +x create_airflow_cfg.sh and then run create_airflow_cfg.sh tmpdir
temp_env = {**os.environ.copy(), "AIRFLOW_HOME": tmpdir}
# go up one directory from current
path_to_script = os.path.join(os.path.dirname(__file__), "..", "airflow_setup.sh")
path_to_dags = os.path.join(os.path.dirname(__file__), "airflow_migrating_project", "dags")
subprocess.run(["chmod", "+x", path_to_script], check=True, env=temp_env)
subprocess.run([path_to_script, path_to_dags], check=True, env=temp_env)
with environ({"AIRFLOW_HOME": tmpdir}):
yield tmpdir


def test_migrating_dag(airflow_instance: None) -> None:
"""Test that airflow dags set as migrating have injected migration information."""
response = requests.get("http://localhost:8080/api/v1/dags/print_dag", auth=("admin", "admin"))
assert response.status_code == 200
tags = response.json()["tags"]
assert len(tags) == 1
assert json.loads(tags[0]["name"]) == {
"dagster_migration": {"print_task": True, "downstream_print_task": False}
}

0 comments on commit 0a4a7bd

Please sign in to comment.