Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: dagster-pyiceberg #25

Open
JasperHG90 opened this issue Nov 14, 2024 · 8 comments
Open

Feat: dagster-pyiceberg #25

JasperHG90 opened this issue Nov 14, 2024 · 8 comments

Comments

@JasperHG90
Copy link

Hi 👋 ,

I have two possible additions that I'd like to add to this repository:

Dagster-pyiceberg

dagster-pyiceberg is an IO manager to read from and write to Iceberg tables using PyIceberg. I'm working on a couple of end-to-end examples. The example with a postgresql catalog backend can be found here. Next I'll be working on an example using a polaris catalog backend.

This library has alpha status and depends currently on a prerelease version of pyiceberg.

Most PyIceberg features are supported. Users can define partitions on assets which will be mapped to a pyiceberg partition spec, and can update this partition spec by updating the dagster partition mapping. Schema evolution is also supported (albeit a bit crudely implemented). Some features that are not yet supported in PyIcberg (but will be supported in future versions), such as commit retries are also implemented.

I need to see what the best way is to port the code to this repository. I use UV with a workspace package layout. The setup is similar to dagster-deltalake, with pandas and polars support added to separate libraries that import the dagster-pyiceberg library.

Dagster-pipes-gcp

This library essentially ports the dagster-pipes AWS Lambda external execution functionality to GCP cloud functions. I'd like to expand this to support Cloud Run Jobs and GCP batch as execution environments.

This code needs tests (currently no tests are written).

Could you let me know if this is of interest to you?

Thanks!

@danielgafni
Copy link
Contributor

danielgafni commented Nov 14, 2024

Hey!

Sounds really great, I think both additions are reasonable and welcome.

The only slight problem we might have with dagster-pyiceberg-polars is that we already have dagster-polars, and it has more features around loading lazy frames and partitions of lazy frames.

I know we also have dagster-deltalake-polars. I think it was a mistake to create this separate package. Unfortunately, I didn't know it was being created at that time. We ended up with 2 implementations of DeltaLake IOManagers for Polars which is probably confusing to our users.

I would like to avoid this with Iceberg.

Do you think it would be possible to add the Polars IOManager to dagster-polars instead? I wanted to do it myself for a while now but never had some spare time for it.

In general, I think less packages is better than more packages, it will make the ecosystem easier to navigate.

Perhaps the same could be done with dagster-pyiceberg-pandas (but I do not maintain this package - so not sure).

@JasperHG90
Copy link
Author

Thanks for your response.

I understand your point about the organization of this code. I only looked at the dagster-deltalake library and thought this was the best way of organizing the code. I agree with less packages is better statement.

Adding the Polars IOManager to dagster-polars but raises some questions for me:

Ultimately, the polars IOManager is simply inheriting from the Arrow IOManager. All the helper functions (and there are quite a few) live in the dagster-pyiceberg library. You would need to add this as an (optional) dependency to dagster-polars. I'm not sure if this is what you desire. I would like to avoid replicating code if possible.

If you do add it as a dependency, wouldn't it be strange that there's a dependency on a community-maintained library?

@danielgafni
Copy link
Contributor

  1. I don't think we can have dependencies on community-maintained libraries. @cmpadden am I wrong?
  2. I think inheritance from the arrow IOManager actually hurts here. Please correct me if I'm wrong, I haven't spent too much time looking into your code, only did so briefly. But I had an impression that there was no way to load lazy datafarmes without materializing them first? That's a big no-no, it's very important to be able to load lazy frames with all the optimizations like predicate pushdown and row group filtering etc which speed things up and reduce memory footprint. Let me know if my understanding was correct and if you have a way to address this issue

@JasperHG90
Copy link
Author

JasperHG90 commented Nov 15, 2024

Thanks for your reply.

The pyiceberg Table object has a scan method that facilitates predicate and projection pushdowns. (used here). After scanning a table, one must load it using the to_arrow() or to_arrow_batch_reader () methods to materialize the table. The other methods implemented (e.g. conversion to pandas or ray) by PyIceberg actually call to_arrow() under the hood. The exception is the interoperability with the daft engine which supports a full lazily optimized query optimization engine.

Of course, Polars has the scan_iceberg() function that returns a LazyFrame, so you'd be able to just call

pl.scan_iceberg(
    table # pyiceberg table
)

For writing, I don't think it matters much which library is used. Polars doesn't support writes I think and Daft calls 'collect' on the input DF which returns a pyarrow table.

TBH I'm not married to the current implementation so I'm happy to refactor or donate the code to another library if it fits there. I think the 'general' pieces of code (i.e. the mapping from Dagster partitions to e.g. Iceberg partition specs) can be reused by multiple libraries, reading tables can be handled by specific libraries (polars, dagster, duckdb, daft, etc.), and writes probably only have the pyarrow implementation at the moment.

@JasperHG90
Copy link
Author

FYI I've refactored such that Polars and Daft are now supported as IO managers without being dependent on PyArrow and support lazy Dataframes.

@danielgafni
Copy link
Contributor

danielgafni commented Nov 22, 2024

Hey @JasperHG90, that sounds great.

I think it's going to be easier to proceed if we have a short call first.

Do you mind reaching out to me on Dagster's community slack?

@cmpadden
Copy link
Collaborator

cmpadden commented Dec 5, 2024

Hey @JasperHG90, that sounds great.

I think it's going to be easier to proceed if we have a short call first.

Do you mind reaching out to me on Dagster's community slack?

Hey @danielgafni and @JasperHG90 - did this meeting take place? Would love to hear an update on this!

@danielgafni
Copy link
Contributor

danielgafni commented Dec 5, 2024

Hey!

Yes, we did have a call.

  1. We decided to start with the work on GCP Pipes. I gave @JasperHG90 some directions on how to proceed (mainly refactoring but also about using the PipesThreadedMessageReader).
  2. It's not clear what to do with pyiceberg right now, mostly because it's not mature enough yet and requires lots of boilerplate code to get Dagster partitions mapped to Iceberg partitions, and weird things like manually setting up retries when writing partitions in parallel. We decided to wait a bit and meanwhile try to refactor some common dataframe engine agnostic code into a separate package (like dagster-pyiceberg-utils).

cmpadden pushed a commit that referenced this issue Dec 12, 2024
* 🔨 Use cargo nextest

* 📝 Update changelog

* Include doctest
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants