-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
362 additions
and
12 deletions.
There are no files selected for viewing
81 changes: 81 additions & 0 deletions
81
examples/experimental/dagster-airlift/dagster_airlift/in_airflow/dagster_operator.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
import os | ||
|
||
import requests | ||
from airflow import DAG | ||
from airflow.operators.python import PythonOperator | ||
|
||
from .gql_queries import ASSET_NODES_QUERY, RUNS_QUERY, TRIGGER_ASSETS_MUTATION | ||
|
||
|
||
def compute_fn() -> None: | ||
# https://github.com/apache/airflow/discussions/24463 | ||
os.environ["NO_PROXY"] = "*" | ||
dag_id = os.environ["AIRFLOW_CTX_DAG_ID"] | ||
task_id = os.environ["AIRFLOW_CTX_TASK_ID"] | ||
expected_op_name = f"{dag_id}__{task_id}" | ||
assets_to_trigger = {} # key is (repo_location, repo_name, job_name), value is list of asset keys | ||
# create graphql client | ||
dagster_url = os.environ["DAGSTER_URL"] | ||
response = requests.post(f"{dagster_url}/graphql", json={"query": ASSET_NODES_QUERY}, timeout=3) | ||
for asset_node in response.json()["data"]["assetNodes"]: | ||
if asset_node["opName"] == expected_op_name: | ||
repo_location = asset_node["jobs"][0]["repository"]["location"]["name"] | ||
repo_name = asset_node["jobs"][0]["repository"]["name"] | ||
job_name = asset_node["jobs"][0]["name"] | ||
if (repo_location, repo_name, job_name) not in assets_to_trigger: | ||
assets_to_trigger[(repo_location, repo_name, job_name)] = [] | ||
assets_to_trigger[(repo_location, repo_name, job_name)].append( | ||
asset_node["assetKey"]["path"] | ||
) | ||
print(f"Found assets to trigger: {assets_to_trigger}") # noqa: T201 | ||
triggered_runs = [] | ||
for (repo_location, repo_name, job_name), asset_keys in assets_to_trigger.items(): | ||
execution_params = { | ||
"mode": "default", | ||
"executionMetadata": {"tags": []}, | ||
"runConfigData": "{}", | ||
"selector": { | ||
"repositoryLocationName": repo_location, | ||
"repositoryName": repo_name, | ||
"pipelineName": job_name, | ||
"assetSelection": [{"path": asset_key} for asset_key in asset_keys], | ||
"assetCheckSelection": [], | ||
}, | ||
} | ||
print(f"Triggering run for {repo_location}/{repo_name}/{job_name} with assets {asset_keys}") # noqa: T201 | ||
response = requests.post( | ||
f"{dagster_url}/graphql", | ||
json={ | ||
"query": TRIGGER_ASSETS_MUTATION, | ||
"variables": {"executionParams": execution_params}, | ||
}, | ||
timeout=3, | ||
) | ||
run_id = response.json()["data"]["launchPipelineExecution"]["run"]["id"] | ||
print(f"Launched run {run_id}...") # noqa: T201 | ||
triggered_runs.append(run_id) | ||
completed_runs = {} # key is run_id, value is status | ||
while len(completed_runs) < len(triggered_runs): | ||
for run_id in triggered_runs: | ||
if run_id in completed_runs: | ||
continue | ||
response = requests.post( | ||
f"{dagster_url}/graphql", | ||
json={"query": RUNS_QUERY, "variables": {"runId": run_id}}, | ||
timeout=3, | ||
) | ||
run_status = response.json()["data"]["runOrError"]["status"] | ||
if run_status in ["SUCCESS", "FAILURE", "CANCELED"]: | ||
print(f"Run {run_id} completed with status {run_status}") # noqa: T201 | ||
completed_runs[run_id] = run_status | ||
non_successful_runs = [ | ||
run_id for run_id, status in completed_runs.items() if status != "SUCCESS" | ||
] | ||
if non_successful_runs: | ||
raise Exception(f"Runs {non_successful_runs} did not complete successfully.") | ||
print("All runs completed successfully.") # noqa: T201 | ||
return None | ||
|
||
|
||
def build_dagster_task(task_id: str, dag: DAG, **kwargs): | ||
return PythonOperator(task_id=task_id, dag=dag, python_callable=compute_fn, **kwargs) |
104 changes: 104 additions & 0 deletions
104
examples/experimental/dagster-airlift/dagster_airlift/in_airflow/gql_queries.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
ASSET_NODES_QUERY = """ | ||
query AssetNodeQuery { | ||
assetNodes { | ||
id | ||
assetKey { | ||
path | ||
} | ||
opName | ||
jobs { | ||
id | ||
name | ||
repository { | ||
id | ||
name | ||
location { | ||
id | ||
name | ||
} | ||
} | ||
} | ||
} | ||
} | ||
""" | ||
|
||
TRIGGER_ASSETS_MUTATION = """ | ||
mutation LaunchAssetsExecution($executionParams: ExecutionParams!) { | ||
launchPipelineExecution(executionParams: $executionParams) { | ||
... on LaunchRunSuccess { | ||
run { | ||
id | ||
pipelineName | ||
__typename | ||
} | ||
__typename | ||
} | ||
... on PipelineNotFoundError { | ||
message | ||
__typename | ||
} | ||
... on InvalidSubsetError { | ||
message | ||
__typename | ||
} | ||
... on RunConfigValidationInvalid { | ||
errors { | ||
message | ||
__typename | ||
} | ||
__typename | ||
} | ||
...PythonErrorFragment | ||
__typename | ||
} | ||
} | ||
fragment PythonErrorFragment on PythonError { | ||
message | ||
stack | ||
errorChain { | ||
...PythonErrorChain | ||
__typename | ||
} | ||
__typename | ||
} | ||
fragment PythonErrorChain on ErrorChainLink { | ||
isExplicitLink | ||
error { | ||
message | ||
stack | ||
__typename | ||
} | ||
__typename | ||
} | ||
""" | ||
|
||
RUNS_QUERY = """ | ||
query RunQuery($runId: ID!) { | ||
runOrError(runId: $runId) { | ||
__typename | ||
...PythonErrorFragment | ||
...NotFoundFragment | ||
... on Run { | ||
id | ||
status | ||
__typename | ||
} | ||
} | ||
} | ||
fragment NotFoundFragment on RunNotFoundError { | ||
__typename | ||
message | ||
} | ||
fragment PythonErrorFragment on PythonError { | ||
__typename | ||
message | ||
stack | ||
causes { | ||
message | ||
stack | ||
__typename | ||
} | ||
} | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
29 changes: 29 additions & 0 deletions
29
...irlift/dagster_airlift_tests/integration_tests/operator_test_project/dags/migrated_dag.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import logging | ||
from datetime import datetime | ||
|
||
from airflow import DAG | ||
from dagster_airlift.in_airflow.dagster_operator import build_dagster_task | ||
|
||
logging.basicConfig() | ||
logging.getLogger().setLevel(logging.INFO) | ||
requests_log = logging.getLogger("requests.packages.urllib3") | ||
requests_log.setLevel(logging.INFO) | ||
requests_log.propagate = True | ||
|
||
|
||
def print_hello(): | ||
print("Hello") # noqa: T201 | ||
|
||
|
||
default_args = { | ||
"owner": "airflow", | ||
"depends_on_past": False, | ||
"start_date": datetime(2023, 1, 1), | ||
"retries": 1, | ||
} | ||
|
||
dag = DAG( | ||
"the_dag", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False | ||
) | ||
migrated_op = build_dagster_task(task_id="some_task", dag=dag) | ||
other_migrated_op = build_dagster_task(task_id="other_task", dag=dag) |
19 changes: 19 additions & 0 deletions
19
...ter-airlift/dagster_airlift_tests/integration_tests/operator_test_project/dagster_defs.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from dagster import Definitions, asset | ||
|
||
|
||
@asset | ||
def the_dag__some_task(): | ||
return "asset_value" | ||
|
||
|
||
@asset | ||
def unrelated(): | ||
return "unrelated_value" | ||
|
||
|
||
@asset | ||
def the_dag__other_task(): | ||
return "other_task_value" | ||
|
||
|
||
defs = Definitions(assets=[the_dag__other_task, the_dag__some_task, unrelated]) |
Oops, something went wrong.