Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data versions in partitioned assets unnecessarily hash upstream data versions even if only one partition was used #26563

Open
easontm opened this issue Dec 18, 2024 · 0 comments
Labels
area: partitions Related to Partitions type: bug Something isn't working

Comments

@easontm
Copy link
Contributor

easontm commented Dec 18, 2024

What's the issue?

When fetching upstream data versions (for composing the output data version), the combined upstream partition keys are hashed together. This makes sense for cases when Downstream Bar depends on more than one partition of Upstream Foo because you need to represent the group of partitions with a single data version.

However, a pretty standard use case would be Foo and Bar having the same partitions def and 1:1 partition dependencies. Tracing lineage would be much easier if you could match the data versions.

You can get this easy-trace behavior by adding these two lines

# data_version_cache.py
    def _get_partitions_data_version_from_keys(
        self, key: AssetKey, partition_keys: Sequence[str]
    ) -> "DataVersion":
        from dagster._core.definitions.data_version import DataVersion
        from dagster._core.events import DagsterEventType

        event_type = DagsterEventType.ASSET_MATERIALIZATION
        tags_by_partition = self._context.instance._event_storage.get_latest_tags_by_partition(  # noqa: SLF001
            key, event_type, [DATA_VERSION_TAG], asset_partitions=list(partition_keys)
        )
        partition_data_versions = [
            pair[1][DATA_VERSION_TAG]
            for pair in sorted(tags_by_partition.items(), key=lambda x: x[0])
        ]
        ###############################################################
        if len(partition_data_versions) == 1:
            return DataVersion(partition_data_versions[0])
        ###############################################################
        hash_sig = sha256()
        hash_sig.update(bytearray("".join(partition_data_versions), "utf8"))
        return DataVersion(hash_sig.hexdigest())

What did you expect to happen?

When looking at input_data_version/foo in the Bar materialization information, the version is the same as the data version listed in Foo for the consumed partition.

How to reproduce?

No response

Dagster version

1.9.2

Deployment type

Local

Deployment details

No response

Additional information

I checked the original PR #14265 by @smackesey and didn't see any specific reason why single partitions also must be hashed.

I thought about submitting this as a PR, but this is a change to a private function and I couldn't find the best place to write the tests. I'd be happy to give it a go if you tell me the best place to put them.

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.
By submitting this issue, you agree to follow Dagster's Code of Conduct.

@easontm easontm added the type: bug Something isn't working label Dec 18, 2024
@garethbrickman garethbrickman added the area: partitions Related to Partitions label Dec 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: partitions Related to Partitions type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants