How can I upsert partitions with an IO manager? #18066
Replies: 4 comments 1 reply
-
Some additional details around what partition information is available on the context is documented here If you want to reference some internal implementation details, you can see https://github.com/dagster-io/dagster/blob/1.5.8/python_modules/dagster/dagster/_core/storage/db_io_manager.py which powers our DB io managers like snowflake and bigquery |
Beta Was this translation helpful? Give feedback.
-
Thanks @sryza @tacastillo @alangenfeld , so I need to write a postgres io manager that extends dagsters DB io manager? Is there any plans on the card for Dagster to write a io manager for Postgres that handles partitioned assets? I've had a look at the code in your DB io manager and I would'nt even know where to begin if i was looking to extend this for Postgres partition upsert behaviour. Why does it feel like i'm trying to crack a nut with a sledgehammer. I've got a basic scenario of a statically partitioned software asset that is being materialized in Postgres and i'd like to get the benefits of saving partitions independently of each other without having to introduce something like DBT. |
Beta Was this translation helpful? Give feedback.
-
We have this exact same use case and the missing piece to make this straightforward is the absence of an “upset” approach in the pandas API I used this gist as the basis for our implementation: https://gist.github.com/pedrovgp/b46773a1240165bf2b1448b3f70bed32 |
Beta Was this translation helpful? Give feedback.
-
@alangenfeld and @m-o-leary thanks for the advice and guidance. I will give it a go when time permits and let you know how I went. |
Beta Was this translation helpful? Give feedback.
-
What is the best practice for IO Management of a statically partitioned table using Dagster(v1.5.8) and Postgres (v15). In my scenario I have a table that i'm intending to partition by 'state' i.e. the states in a country. I'm unclear how i'm suppose to write the IO Manager to 'upsert' data only for each 'state'. Dagster's documentation suggests that ...
"I/O managers can be written to handle partitioned assets. For a partitioned asset, each invocation of handle_output will (over)write a single partition, and each invocation of load_input will load one or more partitions."
I'd like my IO Manager to write only data related to a partition, however the only options i see available on the DataFrame.to_sql(if_exists="fail/replace/append")
So it only appends data or rebuilds (wipes the table and starts again) the table. If we use the states 'California', 'New York', 'Texas'. I'd like the handle_output() to process the data per state meaning that after i've added data for 'California' and I write data for 'New York', that I dont loose 'California' when writing 'New York'.
On my previous projects i've used DBT to manage incremental tables and used Dagster for the pipeline partitioning. This is the first time i'm trying to do things only with Dagster and i'm struggling with this simple scenario.
Partitioned Software Asset Code
IO Manager Code
The question was originally asked in Dagster Slack.
Beta Was this translation helpful? Give feedback.
All reactions