-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[dagster-airlift] dagster operator (#23386)
Dagster operator which is able to remotely invoke dagster via airflow. It searches for a "node def" with the name that matches the task (and matches our opinionated file format for blueprints). Includes unit test which runs against live airflow and dagster, checks that runs of the respective assets are invoked for each task. Tests are probably not entirely sufficient. I'd ideally like to run against a few other cases: - workspace with multiple defined code locations - multi asset with multiple keys. Ensure they all get picked up within same run. The api surface area here won't actually be exposed to the user. What I'm imagining is something like this: ```python ... # dag code create_migrating_dag( migrating_dict={...}, dagster_instance=DagsterInstance(url=...) # or, in cloud case DagsterCloudInstance(url=..., auth_token=...) ) ``` then, under the hood, we swap out operators via the stack, same as we do with the dag construction. any real use case will need new graphql endpoints so that we can easily retrieve asset info per node. The number of steps here feels gratuitous (although only because the way we're retrieving information is a bit hacky)
- Loading branch information
1 parent
6500acc
commit c434a56
Showing
9 changed files
with
364 additions
and
12 deletions.
There are no files selected for viewing
81 changes: 81 additions & 0 deletions
81
examples/experimental/dagster-airlift/dagster_airlift/in_airflow/dagster_operator.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
import os | ||
|
||
import requests | ||
from airflow import DAG | ||
from airflow.operators.python import PythonOperator | ||
|
||
from .gql_queries import ASSET_NODES_QUERY, RUNS_QUERY, TRIGGER_ASSETS_MUTATION | ||
|
||
|
||
def compute_fn() -> None: | ||
# https://github.com/apache/airflow/discussions/24463 | ||
os.environ["NO_PROXY"] = "*" | ||
dag_id = os.environ["AIRFLOW_CTX_DAG_ID"] | ||
task_id = os.environ["AIRFLOW_CTX_TASK_ID"] | ||
expected_op_name = f"{dag_id}__{task_id}" | ||
assets_to_trigger = {} # key is (repo_location, repo_name, job_name), value is list of asset keys | ||
# create graphql client | ||
dagster_url = os.environ["DAGSTER_URL"] | ||
response = requests.post(f"{dagster_url}/graphql", json={"query": ASSET_NODES_QUERY}, timeout=3) | ||
for asset_node in response.json()["data"]["assetNodes"]: | ||
if asset_node["opName"] == expected_op_name: | ||
repo_location = asset_node["jobs"][0]["repository"]["location"]["name"] | ||
repo_name = asset_node["jobs"][0]["repository"]["name"] | ||
job_name = asset_node["jobs"][0]["name"] | ||
if (repo_location, repo_name, job_name) not in assets_to_trigger: | ||
assets_to_trigger[(repo_location, repo_name, job_name)] = [] | ||
assets_to_trigger[(repo_location, repo_name, job_name)].append( | ||
asset_node["assetKey"]["path"] | ||
) | ||
print(f"Found assets to trigger: {assets_to_trigger}") # noqa: T201 | ||
triggered_runs = [] | ||
for (repo_location, repo_name, job_name), asset_keys in assets_to_trigger.items(): | ||
execution_params = { | ||
"mode": "default", | ||
"executionMetadata": {"tags": []}, | ||
"runConfigData": "{}", | ||
"selector": { | ||
"repositoryLocationName": repo_location, | ||
"repositoryName": repo_name, | ||
"pipelineName": job_name, | ||
"assetSelection": [{"path": asset_key} for asset_key in asset_keys], | ||
"assetCheckSelection": [], | ||
}, | ||
} | ||
print(f"Triggering run for {repo_location}/{repo_name}/{job_name} with assets {asset_keys}") # noqa: T201 | ||
response = requests.post( | ||
f"{dagster_url}/graphql", | ||
json={ | ||
"query": TRIGGER_ASSETS_MUTATION, | ||
"variables": {"executionParams": execution_params}, | ||
}, | ||
timeout=3, | ||
) | ||
run_id = response.json()["data"]["launchPipelineExecution"]["run"]["id"] | ||
print(f"Launched run {run_id}...") # noqa: T201 | ||
triggered_runs.append(run_id) | ||
completed_runs = {} # key is run_id, value is status | ||
while len(completed_runs) < len(triggered_runs): | ||
for run_id in triggered_runs: | ||
if run_id in completed_runs: | ||
continue | ||
response = requests.post( | ||
f"{dagster_url}/graphql", | ||
json={"query": RUNS_QUERY, "variables": {"runId": run_id}}, | ||
timeout=3, | ||
) | ||
run_status = response.json()["data"]["runOrError"]["status"] | ||
if run_status in ["SUCCESS", "FAILURE", "CANCELED"]: | ||
print(f"Run {run_id} completed with status {run_status}") # noqa: T201 | ||
completed_runs[run_id] = run_status | ||
non_successful_runs = [ | ||
run_id for run_id, status in completed_runs.items() if status != "SUCCESS" | ||
] | ||
if non_successful_runs: | ||
raise Exception(f"Runs {non_successful_runs} did not complete successfully.") | ||
print("All runs completed successfully.") # noqa: T201 | ||
return None | ||
|
||
|
||
def build_dagster_task(task_id: str, dag: DAG, **kwargs): | ||
return PythonOperator(task_id=task_id, dag=dag, python_callable=compute_fn, **kwargs) |
104 changes: 104 additions & 0 deletions
104
examples/experimental/dagster-airlift/dagster_airlift/in_airflow/gql_queries.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
ASSET_NODES_QUERY = """ | ||
query AssetNodeQuery { | ||
assetNodes { | ||
id | ||
assetKey { | ||
path | ||
} | ||
opName | ||
jobs { | ||
id | ||
name | ||
repository { | ||
id | ||
name | ||
location { | ||
id | ||
name | ||
} | ||
} | ||
} | ||
} | ||
} | ||
""" | ||
|
||
TRIGGER_ASSETS_MUTATION = """ | ||
mutation LaunchAssetsExecution($executionParams: ExecutionParams!) { | ||
launchPipelineExecution(executionParams: $executionParams) { | ||
... on LaunchRunSuccess { | ||
run { | ||
id | ||
pipelineName | ||
__typename | ||
} | ||
__typename | ||
} | ||
... on PipelineNotFoundError { | ||
message | ||
__typename | ||
} | ||
... on InvalidSubsetError { | ||
message | ||
__typename | ||
} | ||
... on RunConfigValidationInvalid { | ||
errors { | ||
message | ||
__typename | ||
} | ||
__typename | ||
} | ||
...PythonErrorFragment | ||
__typename | ||
} | ||
} | ||
fragment PythonErrorFragment on PythonError { | ||
message | ||
stack | ||
errorChain { | ||
...PythonErrorChain | ||
__typename | ||
} | ||
__typename | ||
} | ||
fragment PythonErrorChain on ErrorChainLink { | ||
isExplicitLink | ||
error { | ||
message | ||
stack | ||
__typename | ||
} | ||
__typename | ||
} | ||
""" | ||
|
||
RUNS_QUERY = """ | ||
query RunQuery($runId: ID!) { | ||
runOrError(runId: $runId) { | ||
__typename | ||
...PythonErrorFragment | ||
...NotFoundFragment | ||
... on Run { | ||
id | ||
status | ||
__typename | ||
} | ||
} | ||
} | ||
fragment NotFoundFragment on RunNotFoundError { | ||
__typename | ||
message | ||
} | ||
fragment PythonErrorFragment on PythonError { | ||
__typename | ||
message | ||
stack | ||
causes { | ||
message | ||
stack | ||
__typename | ||
} | ||
} | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
29 changes: 29 additions & 0 deletions
29
...irlift/dagster_airlift_tests/integration_tests/operator_test_project/dags/migrated_dag.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import logging | ||
from datetime import datetime | ||
|
||
from airflow import DAG | ||
from dagster_airlift.in_airflow.dagster_operator import build_dagster_task | ||
|
||
logging.basicConfig() | ||
logging.getLogger().setLevel(logging.INFO) | ||
requests_log = logging.getLogger("requests.packages.urllib3") | ||
requests_log.setLevel(logging.INFO) | ||
requests_log.propagate = True | ||
|
||
|
||
def print_hello(): | ||
print("Hello") # noqa: T201 | ||
|
||
|
||
default_args = { | ||
"owner": "airflow", | ||
"depends_on_past": False, | ||
"start_date": datetime(2023, 1, 1), | ||
"retries": 1, | ||
} | ||
|
||
dag = DAG( | ||
"the_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False | ||
) | ||
migrated_op = build_dagster_task(task_id="some_task", dag=dag) | ||
other_migrated_op = build_dagster_task(task_id="other_task", dag=dag) |
19 changes: 19 additions & 0 deletions
19
...ter-airlift/dagster_airlift_tests/integration_tests/operator_test_project/dagster_defs.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from dagster import Definitions, asset | ||
|
||
|
||
@asset | ||
def the_dag__some_task(): | ||
return "asset_value" | ||
|
||
|
||
@asset | ||
def unrelated(): | ||
return "unrelated_value" | ||
|
||
|
||
@asset | ||
def the_dag__other_task(): | ||
return "other_task_value" | ||
|
||
|
||
defs = Definitions(assets=[the_dag__other_task, the_dag__some_task, unrelated]) |
Oops, something went wrong.