-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dagster-airlift] create a custom operator for lakehouse example #23597
Conversation
4ee3b85
to
60dfc57
Compare
cec4498
to
6a7ceda
Compare
60dfc57
to
531fac1
Compare
6a7ceda
to
38f4c05
Compare
531fac1
to
ee43fba
Compare
38f4c05
to
eb5ebf2
Compare
ee43fba
to
f1266cb
Compare
eb5ebf2
to
7424d3b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a clarification q, but code seems good to me
@@ -33,10 +26,20 @@ def dbt_project_path() -> Path: | |||
defs = build_defs_from_airflow_instance( | |||
airflow_instance=airflow_instance, | |||
orchestrated_defs=defs_from_factories( | |||
PythonDefs( | |||
CSVToDuckdbDefs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main purpose of the custom DefsFactory
here is to produce an API/user code matching the custom Airflow operator, is that right? e.g. there's no technical reason that there needs to be a 1:1 mapping of operator to defs factory - just making sure I understand this right.
It seems like a user could just use a PythonDefs
here with a fn/lambda to define the inputs to load_csv_to_duckdb
, but the downside is that there's a higher mental load in migrating that way since the user code ends up looking less similar moving airflow->Dagster?
load_iris = PythonOperator(
task_id="load_iris",
dag=dag,
python_callable=lambda: load_csv_to_duckdb(
table_name="iris_lakehouse_table",
csv_path=Path("iris.csv"),
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
column_names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
duckdb_schema="iris_dataset",
duckdb_database_name="jaffle_shop",
)
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're correct that PythonDefs could be used here, in the same way that a PythonOperator could be used to easily achieve the same goal of the load_csv_etc operator we use. The goal is to make it explicitly clear this semi 1:1 mapping exists for your custom operators / provide a clear path forward. The example is contrived, but the pattern isn't
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that makes sense 👍
f1266cb
to
cccaa7e
Compare
7424d3b
to
3af5db1
Compare
cccaa7e
to
ad6f180
Compare
3af5db1
to
2593ed9
Compare
ad6f180
to
3b70d75
Compare
2593ed9
to
4b66f55
Compare
3b70d75
to
006945f
Compare
4b66f55
to
1b56c18
Compare
1b56c18
to
6a50a2e
Compare
) As discussed offline, adds a custom operator for the lakehouse example. This should allow us to better demonstrate the step of "writing an operator to translate dagster code." I think this leaves us with an unclear interstitial step for this type of asset. What's the "observe" phase look like here? Do we expect people to just provide an assetspec? We should sketch that out in a future PR.
As discussed offline, adds a custom operator for the lakehouse example. This should allow us to better demonstrate the step of "writing an operator to translate dagster code."
I think this leaves us with an unclear interstitial step for this type of asset. What's the "observe" phase look like here? Do we expect people to just provide an assetspec? We should sketch that out in a future PR.