Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Feat/add polars delta merge support #47

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

edgBR
Copy link

@edgBR edgBR commented Dec 18, 2023

No description provided.

@edgBR
Copy link
Author

edgBR commented Dec 18, 2023

Hi @danielgafni ,

Placeholder, new to dagster and not good writting tests, but I guess that the PR gives you the idea of what Im trying to achieve. Now in polars you can do:

image

Also I added a pre-commit hook that upgrade polars code to a target version.

@danielgafni
Copy link
Owner

danielgafni commented Dec 19, 2023

Hey @edgBR !

Thanks for the PR, sounds like a neat feature to have. Let's work on getting this merged, we would have to change a few things for that.

Copy link
Owner

@danielgafni danielgafni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the main thing here is keeping lower version constraints to ensure backwards-compatibility with existing code bases

@@ -37,6 +37,7 @@ jobs:
- "0.17.0"
- "0.18.0"
- "0.19.0"
- "0.20.1"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 0.20.0 would be more in line with the others

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I understand but polars merge was added in 0.20 is okay to change to 0.20?

Copy link
Owner

@danielgafni danielgafni Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.20.0 in this contest means ">=0.20.0, <0.21.0", or "latest available before 0.21.0". That's how CI is set up. So yes, it's ok to do 0.20.0 here.

@@ -81,6 +82,7 @@ jobs:
- "0.17.0"
- "0.18.0"
- "0.19.0"
- "0.20.1"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

.pre-commit-config.yaml Outdated Show resolved Hide resolved
.pre-commit-config.yaml Outdated Show resolved Hide resolved
@@ -28,11 +29,11 @@ license = "Apache-2.0"
[tool.poetry.dependencies]
python = "^3.8"
dagster = "^1.4.0"
polars = ">=0.17.0"
polars = ">=0.20.1"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not change the lower polars constraint. We don't want to force an update for users as it can break their code.

Copy link
Owner

@danielgafni danielgafni Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, we do want to update the dev polars version pinned in poetry.lock. This can be done via "poetry update polars" command.

pyarrow = ">=8.0.0"
typing-extensions = "^4.7.1"

deltalake = { version = ">=0.10.0", optional = true }
deltalake = { version = ">=0.14.0", optional = true }
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, I don't think we need to change this

@@ -101,7 +101,16 @@ def append_asset() -> pl.DataFrame:

pl_testing.assert_frame_equal(pl.concat([df, df]), pl.read_delta(saved_path))


def test_polars_delta_io_manager_merge(polars_delta_io_manager: PolarsDeltaIOManager):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test for the new merge functionality. You can take a look at other tests for inspiration. Let me know if you need any help with this

Removing polars upgrade pre-commit hooks to ensure backward compatibility of the code
@edgBR
Copy link
Author

edgBR commented Dec 22, 2023

Hi @danielgafni ,

We have been busy with a PR to scikit-learn. I will update this next week.

Thanks for the support.

BR
E

@danielgafni
Copy link
Owner

Hey @edgBR !

It has been decided to merge this repo into the main Dagster project.
I'm going to start with it soon.
Do you want me to step in here and complete the rest of this PR so we can merge it faster?
I would like to see this merged before we move this code to Dagster.

@edgBR
Copy link
Author

edgBR commented Dec 29, 2023

Hi Daniel,

What is the timeline?

Im on vacation right now. If you need to complete this before 8th of January then yes you can go ahead.

If not I will do it as soon as Im back.

Our scikit-learn PR is done (yet to be merged but their CI pipeline was broke for 1day and half due to a bug upstream in conda), took most of my last days.

BR
E

@danielgafni
Copy link
Owner

Hey @edgBR !

I'm hoping to finish with this in around 2 weeks. The Dagster PR is already created.

storage_options=storage_options,
delta_merge_options=delta_merge_options,
)
.when_matched_update_all()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be configurable in some way, basically this is a default upsert, but MERGEs can be complex set of different update, delete and insert operations.

I commonly use deduplicate on insert

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ion,

You are right and if you look to my example you will find a deduplication strategy using a rank function over a primary key and then selecting the first row. However for that the input dataset needs to have a "cdc" column (like load dts).

Shouldn´t this be responsibility of the user?

An alternative could be to modify:

 def get_metadata(self, context: OutputContext, obj: pl.DataFrame) -> Dict[str, MetadataValue]:
        assert context.metadata is not None

        metadata = super().get_metadata(context, obj)

        if context.has_asset_partitions:
            partition_by = context.metadata.get("partition_by")
            if partition_by is not None:
                metadata["partition_by"] = partition_by

        if context.metadata.get("mode") == "append":
            # modify the medatata to reflect the fact that we are appending to the table

            if context.has_asset_partitions:
                # paths = self._get_paths_for_partitions(context)
                # assert len(paths) == 1
                # path = list(paths.values())[0]

                # FIXME: what to about row_count metadata do if we are appending to a partitioned table?
                # we should not be using the full table length,
                # but it's unclear how to get the length of the partition we are appending to
                pass
            else:
                metadata["append_row_count"] = metadata["row_count"]

                path = self._get_path(context)
                # we need to get row_count from the full table
                metadata["row_count"] = MetadataValue.int(
                    DeltaTable(str(path), storage_options=self.get_storage_options(path))
                    .to_pyarrow_dataset()
                    .count_rows()
                )

        return metadata

To maybe do something like this:

        if context.metadata.get("mode") == "append":
            # modify the medatata to reflect the fact that we are appending to the table

            if context.has_asset_partitions:
                # paths = self._get_paths_for_partitions(context)
                # assert len(paths) == 1
                # path = list(paths.values())[0]

                # FIXME: what to about row_count metadata do if we are appending to a partitioned table?
                # we should not be using the full table length,
                # but it's unclear how to get the length of the partition we are appending to
                pass
            else:
                metadata["append_row_count"] = metadata["row_count"]
       if context.metadata.get("mode") == "merge":
            # modify the medatata to reflect the fact that we are appending to the table
            metadata["primary_key"] == "something here that refers to this key"
            metadata["cdc_column"] == "something here that refers to this key"

                path = self._get_path(context)
                # we need to get row_count from the full table
                metadata["row_count"] = MetadataValue.int(
                    DeltaTable(str(path), storage_options=self.get_storage_options(path))
                    .to_pyarrow_dataset()
                    .count_rows()
                )

        return metadata

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey :)

Yeah, I need to go a bit more through the current implementation of dagster-polars. I've already pushed a PR for dagster-deltalake-polars as a first step.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgBR for my own work I am planning to use the dagster-deltalake-polars and then only the parquet IO manager in dagster-polars.

So somewhere next week after my first PR get's merged in dagster-deltalake-polars I will expand it there to cover a couple common MERGE strategies

@danielgafni
Copy link
Owner

Hey @edgBR ! The DeltaLake IOManger is being reworked in #52. You might want to get back to this after this PR is merged.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants