Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-airlift] dagster operator #23386

Merged
merged 2 commits into from
Aug 11, 2024
Merged

[dagster-airlift] dagster operator #23386

merged 2 commits into from
Aug 11, 2024

Conversation

dpeng817
Copy link
Contributor

@dpeng817 dpeng817 commented Aug 2, 2024

Dagster operator which is able to remotely invoke dagster via airflow.
It searches for a "node def" with the name that matches the task (and matches our opinionated file format for blueprints).

Includes unit test which runs against live airflow and dagster, checks that runs of the respective assets are invoked for each task.

Tests are probably not entirely sufficient. I'd ideally like to run against a few other cases:

  • workspace with multiple defined code locations
  • multi asset with multiple keys. Ensure they all get picked up within same run.

The api surface area here won't actually be exposed to the user. What I'm imagining is something like this:

... # dag code
create_migrating_dag(
    migrating_dict={...},
    dagster_instance=DagsterInstance(url=...) # or, in cloud case DagsterCloudInstance(url=..., auth_token=...)
)

then, under the hood, we swap out operators via the stack, same as we do with the dag construction.

any real use case will need new graphql endpoints so that we can easily retrieve asset info per node. The number of steps here feels gratuitous (although only because the way we're retrieving information is a bit hacky)

@dpeng817 dpeng817 marked this pull request as ready for review August 2, 2024 19:50
@dpeng817 dpeng817 requested a review from schrockn August 2, 2024 19:50
Comment on lines 67 to 89
ASSET_NODES_QUERY = """
query AssetNodeQuery {
assetNodes {
id
assetKey {
path
}
opName
jobs {
id
name
repository {
id
name
location {
id
name
}
}
}
}
}
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the graphql queries should go in their own file IMO

os.environ["NO_PROXY"] = "*"
dag_id = os.environ["AIRFLOW_CTX_DAG_ID"]
task_id = os.environ["AIRFLOW_CTX_TASK_ID"]
expected_op_name = f"{dag_id}__{task_id}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will want to make this configurable for people who want to name their ops/compute nodes

Comment on lines 201 to 202
dag_id = os.environ["AIRFLOW_CTX_DAG_ID"]
task_id = os.environ["AIRFLOW_CTX_TASK_ID"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for documentation purposes I think it would be worth having a dumb value object with all the airflow context env vars that we depend on. We can also use this for testing.

assets_to_trigger = {} # key is (repo_location, repo_name, job_name), value is list of asset keys
# create graphql client
dagster_url = os.environ["DAGSTER_URL"]
response = requests.post(f"{dagster_url}/graphql", json={"query": ASSET_NODES_QUERY}, timeout=3)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to be able to subset this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query less than the total number of assets in the deployment (filter by op name)?

Comment on lines 218 to 266
print(f"Found assets to trigger: {assets_to_trigger}") # noqa: T201
triggered_runs = []
for (repo_location, repo_name, job_name), asset_keys in assets_to_trigger.items():
execution_params = {
"mode": "default",
"executionMetadata": {"tags": []},
"runConfigData": "{}",
"selector": {
"repositoryLocationName": repo_location,
"repositoryName": repo_name,
"pipelineName": job_name,
"assetSelection": [{"path": asset_key} for asset_key in asset_keys],
"assetCheckSelection": [],
},
}
print(f"Triggering run for {repo_location}/{repo_name}/{job_name} with assets {asset_keys}") # noqa: T201
response = requests.post(
f"{dagster_url}/graphql",
json={
"query": TRIGGER_ASSETS_MUTATION,
"variables": {"executionParams": execution_params},
},
timeout=3,
)
run_id = response.json()["data"]["launchPipelineExecution"]["run"]["id"]
print(f"Launched run {run_id}...") # noqa: T201
triggered_runs.append(run_id)
completed_runs = {} # key is run_id, value is status
while len(completed_runs) < len(triggered_runs):
for run_id in triggered_runs:
if run_id in completed_runs:
continue
response = requests.post(
f"{dagster_url}/graphql",
json={"query": RUNS_QUERY, "variables": {"runId": run_id}},
timeout=3,
)
run_status = response.json()["data"]["runOrError"]["status"]
if run_status in ["SUCCESS", "FAILURE", "CANCELED"]:
print(f"Run {run_id} completed with status {run_status}") # noqa: T201
completed_runs[run_id] = run_status
non_successful_runs = [
run_id for run_id, status in completed_runs.items() if status != "SUCCESS"
]
if non_successful_runs:
raise Exception(f"Runs {non_successful_runs} did not complete successfully.")
print("All runs completed successfully.") # noqa: T201
return None

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok for now but we will want to restructure this substantially to bring it under test etc.

Also willing to defer this until we build a OpenAPI-compliant REST API for this and use strongly typed APIs as that will make this easier.

Copy link
Member

@schrockn schrockn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good start. We will end up evolving this a lot so you don't have to take all my suggestions before we merge.

@dpeng817 dpeng817 force-pushed the dpeng817/airflow_migrating branch from 466fff1 to e1e11e2 Compare August 3, 2024 23:59
@dpeng817 dpeng817 force-pushed the dpeng817/dagster_operator branch from 4325983 to e9a752e Compare August 3, 2024 23:59
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_migrating branch from e1e11e2 to ac1ddf1 Compare August 4, 2024 00:04
@dpeng817 dpeng817 force-pushed the dpeng817/dagster_operator branch from e9a752e to ebc04fe Compare August 4, 2024 00:04
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_migrating branch from ac1ddf1 to 277a9ae Compare August 5, 2024 02:09
@dpeng817 dpeng817 force-pushed the dpeng817/dagster_operator branch from ebc04fe to 2e766d4 Compare August 5, 2024 02:09
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_migrating branch from 277a9ae to a69dca9 Compare August 5, 2024 03:13
@dpeng817 dpeng817 force-pushed the dpeng817/dagster_operator branch from 2e766d4 to e12b6b5 Compare August 5, 2024 03:13
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_migrating branch from a69dca9 to f212aea Compare August 5, 2024 16:22
@dpeng817 dpeng817 force-pushed the dpeng817/dagster_operator branch from e12b6b5 to 40cea78 Compare August 5, 2024 16:22
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_migrating branch from f212aea to c0b5cea Compare August 5, 2024 16:40
@dpeng817 dpeng817 force-pushed the dpeng817/dagster_operator branch from 40cea78 to f89e978 Compare August 5, 2024 16:40
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_migrating branch from c0b5cea to 0a4a7bd Compare August 5, 2024 17:54
@dpeng817 dpeng817 force-pushed the dpeng817/dagster_operator branch 2 times, most recently from daefdd8 to e7baac3 Compare August 9, 2024 15:18
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_migrating branch from 4fc7a51 to cb04863 Compare August 9, 2024 20:32
@dpeng817 dpeng817 force-pushed the dpeng817/dagster_operator branch from e7baac3 to 61aad66 Compare August 9, 2024 20:32
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_migrating branch from cb04863 to 2b11d64 Compare August 9, 2024 21:21
@dpeng817 dpeng817 force-pushed the dpeng817/dagster_operator branch from 61aad66 to cf772be Compare August 9, 2024 21:28
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_migrating branch from 2b11d64 to 5279253 Compare August 9, 2024 21:48
@dpeng817 dpeng817 force-pushed the dpeng817/dagster_operator branch 3 times, most recently from d6f9f6e to 9cc01d3 Compare August 9, 2024 23:31
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_migrating branch from 5279253 to 1fccf5a Compare August 11, 2024 18:10
@dpeng817 dpeng817 force-pushed the dpeng817/dagster_operator branch from 9cc01d3 to ebf5275 Compare August 11, 2024 18:15
Copy link

vercel bot commented Aug 11, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
dagster-docs-next ❌ Failed (Inspect) Aug 11, 2024 6:15pm

Base automatically changed from dpeng817/airflow_migrating to master August 11, 2024 19:06
@dpeng817 dpeng817 merged commit dde6733 into master Aug 11, 2024
2 of 3 checks passed
@dpeng817 dpeng817 deleted the dpeng817/dagster_operator branch August 11, 2024 19:06
PedramNavid pushed a commit that referenced this pull request Aug 14, 2024
Dagster operator which is able to remotely invoke dagster via airflow.
It searches for a "node def" with the name that matches the task (and
matches our opinionated file format for blueprints).

Includes unit test which runs against live airflow and dagster, checks
that runs of the respective assets are invoked for each task.

Tests are probably not entirely sufficient. I'd ideally like to run
against a few other cases:
- workspace with multiple defined code locations
- multi asset with multiple keys. Ensure they all get picked up within
same run.

The api surface area here won't actually be exposed to the user. What
I'm imagining is something like this:
```python
... # dag code
create_migrating_dag(
    migrating_dict={...},
    dagster_instance=DagsterInstance(url=...) # or, in cloud case DagsterCloudInstance(url=..., auth_token=...)
)
```
then, under the hood, we swap out operators via the stack, same as we do
with the dag construction.

any real use case will need new graphql endpoints so that we can easily
retrieve asset info per node. The number of steps here feels gratuitous
(although only because the way we're retrieving information is a bit
hacky)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants