-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[branching io manager][fix] Fix loading partitioned parent assets in BranchingIOManager #18491
Conversation
Current dependencies on/for this PR:
This stack of pull requests is managed by Graphite. |
I'm confused as to why |
We might not be loading an asset at all. The IOManager should also work for normal jobs. This is because it’s often used as a drop-in replacement for an existing IOManager. |
That's true, but the question I'm raising is about |
Sorry, I don’t see any tests logs or anything. Maybe that’s because I’m on mobile, not sure. Anyway, keeping this use case in mind (working with non-assets) is all what I wanted to point out, and it seems like we are on the same page about this :) |
I'm not sure why it's set to be the partition for the asset that's currently being materialized. It is pretty confusing especially since the two properties would seemingly be the same thing - the input manager should really be agnostic to what's being produced in the actual compute step.
This is a good call-out - right now if there's no upstream asset involved the IO manager will fallback to |
13f501c
to
dc9f19e
Compare
My inclination is to consider this a bug and change it so that To summarize Alex, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This touches on the complexity around how partitioning works and is modeled that I don't have a great grasp on.
I don't think its OK to make a breaking change the @public
partition_key
method, until 2.0 given my understanding of our policy. My first reaction was that maybe we could deprecate it and replace it with run_partition_key
to more clearly disambiguate, but I don't have confidence that thats the right move.
When does the upstream output partition key differ from the run partition key in the non asset case? From what I understand you can only attach PartitionMapping
to assets.
try: | ||
partition_key = context.asset_partition_key | ||
except CheckError: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there not a better way to do this? would prefer adding a has_
over control flow exceptions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, updated to handle the multi-partition-key case and this logic is a bit cleaner now.
python_modules/dagster/dagster_tests/storage_tests/branching_io_manager_tests/utils.py
Outdated
Show resolved
Hide resolved
python_modules/dagster/dagster_tests/storage_tests/branching_io_manager_tests/utils.py
Outdated
Show resolved
Hide resolved
else None | ||
) | ||
try: | ||
partition_key = context.asset_partition_key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about for other complex partition mapping schemes where asset_partition_key
would raise due to there being more than 1 value? https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/execution/context/input.py#L347-L350
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated logic & added test case, should be able to handle non-standard (e.g. many-to-one) partition mappings now.
@@ -69,10 +70,20 @@ def load_input(self, context: InputContext) -> Any: | |||
return self.branch_io_manager.load_input(context) | |||
else: | |||
# we are dealing with an asset input | |||
partition_key = ( | |||
context.upstream_output.partition_key | |||
if context.upstream_output and context.upstream_output.has_partition_key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
7531548
to
4b47f02
Compare
Pretty sure it doesn't. So it would only change in the asset case. I am supportive of introducing |
c7daeea
to
7bee3fd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this version seems like a more principled fix to me, I assume the cases that danielgafni mentioned above are under test somewhere so this continues to work for them?
) | ||
for partition_key in partition_keys | ||
] | ||
if all( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
careful here on all([]) == True
if nothing comes back - may want an explicit check for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/dagster-io/dagster/pull/18491/files/7bee3fd9cdf04c032e6955d5eadd05d5c17b7729#r1422995242 should ensure that the length is always >=1, will add a note though
) | ||
if ( | ||
event_log_entry | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think a little comment block explaining why were doing this would be nice, make [1] easier to grok
partition_keys = context.asset_partition_keys | ||
|
||
if len(partition_keys) == 0: | ||
partition_keys = [None] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[1]
7bee3fd
to
39e178d
Compare
Deploy preview for dagit-storybook ready! ✅ Preview Built with commit 39e178d. |
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit 39e178d. |
Deploy preview for dagster-docs ready! Preview available at https://dagster-docs-39dn3bt9j-elementl.vercel.app Direct link to changed pages: |
Summary
Tweak of fix in #17303 which uses the upstream asset partition rather than the current asset partition when using the branching IO manager. Uses
context.asset_partition_key
rather than the upstream output's partition key, which can sometimes beNone
when the upstream output is not available.Test Plan
Adds a new unit test with a non-standard partition mapping, which previously failed.