Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AssetGraphView, TemporalContext, and AssetSlice #20312

Merged
merged 1 commit into from
Mar 11, 2024
Merged

Conversation

schrockn
Copy link
Member

@schrockn schrockn commented Mar 6, 2024

Summary & Motivation

With the additional of AMP, asset partitions, dynamic partitioning, and other related features, the complexity of our system has outstripped the ability of our abstractions to model it. A shallow indication of this our repeated threading of current time/evaluation time, storage_id/cursors, and dynamic_partitions_store up and down our stack. Another is that also have one off "caching" classes like CachingStaleStatusResolver, CachingInstanceQueryer, CachingDataTimeResolver and perhaps others I do not know about that present a wide range of capabilities inconsistently. Superficially it is annoying to have to thread time, storage_id, and dynamic_partitions_store around everywhere and hard to understand what class to use when interrogating the asset graph.

This belies a more profound problem: Some of our capabilities respect current time; some do not. Some of our capabilities respect storage_id; some do not. That means the engineers do not know what reads are volatile with respect to time and underlying storage. It is also difficult to know what is to safe to cache or not. This means also that as an engineer navigates the state of the asset graph, it is difficult to know what operations are cheap to compute, versus potentially extremely expensive to compute.

This PR introduces AssetGraphView, AssetSlice and TemporalContext to address this issue.

Temporal Context: TemporalContext represents an effective time, used for business logic, and last_event_id which is used to identify that state of the event log at some point in time. Put another way, the value of a TemporalContext represents a point in time and a snapshot of the event log.

Asset Graph View. It is a view of the asset graph and its state from the perspective of a specific temporal context. From the asset graph you can access asset slices, the main API for navigating the view of an asset graph.

Asset Slice: Represents a set of partitions attached to a particular asset. By having AssetSlice contain a reference to AssetGraphView this enables a more elegant traversal of an asset graph's partition space than before.

AssetSlice strives to be "partition-native". Very specifically, it deliberately does not have properties like is_partitioned. Instead they are just represented a slice with a single asset partition. Right now we do an inordinate (and unnecessary) special casing for unpartitioned assets. This will take some adjustment but will result in much cleaner code and and mental model.

e.g.

Before:

def parent_slice(
    context, 
    asset_graph: AssetGraph,
    child_asset_subset: AssetSubset, 
    parent_key: AssetKey) -> AssetSubset:
    return asset_graph.get_parent_asset_subset(
        dynamic_partitions_store=context.instance_queryer,
        parent_asset_key=parent_asset_key,
        child_asset_subset=child_asset_subset,
        current_time=context.evaluation_dt,
    )

After:

def parent_slice(child_slice: AssetSlice, parent_key: AssetKey) -> AssetSlice:
    return child_slice.compute_parent_slice(parent_key)

We also have naming conventions to indicate the performance characteristics of methods:

Naming conventions

  • Properties guaranteed to be fast.
  • Methods prefixed with get_ do some work in-memory but not hugely expensive
  • Methods prefixed with compute_ do potentially expensive work, like compute
  • partition mappings and query the instance.
  • Methods using "materialize" indicate that they fully materialize partition sets
    These can potentially be very expensive if the underlying partition set has
    an in-memory representation that involves large time windows. I.e. if
    the underlying PartitionsSubset in the ValidAssetSubset is a TimeWindowPartitionsSubset
    Usage of these methods should be avoided if possible if you are potentially
    dealing with slices with large time-based partition windows.

This PR also addes two type aliases:

PartitionKey: TypeAlias = Optional[str]
AssetPartition: TypeAlias = AssetKeyPartitionKey

I consider the rename of AssetKeyPartitionKey to AssetPartition merely an acknowledgement of the current ground truth in the code base, where nearly all local variables and method names refer to asset_partition because asset_key_partition_key is self evidently gross.

The PartitionKey alias is perhaps more controversial.

FAQ:

  • Why add AssetSlice and not reuse ValidAssetSubset?
    • AssetSlice is different as it contains a reference to the asset graph view. This makes it fundamentally different. It also allow for elegant traversal of the graph without having to thread a datetime and a reference to an instance or instance queryer everywhere, or having to convert between ValidAssetSubset and AssetSubset.
    • Second I think the term "Subset" is extremely confusing, given that the "Subset" can actually refer to a complete set of partitions to an asset.
    • Introducing a new name allows for disambiguation via a single word. slice and subset as local variables is very clear.
    • Slice also seeks to be "partition-native" and treat partitioned and unpartitioned assets uniformly. I consider properties like bool_value and subset_value on AssetSubset to be quite gross, so this abstractions seeks to encapsulate that.

The plan here is to introduce this abstraction then use it instead of direct use of various "resolver" and "queryer" classes throughout the code base. This will allow capabilities such as AMP, the backfill daemon, and state/outdated calculations int the web server to be more consistent and done with less code.

Two concrete items this will help with immediately:

  • Consistent treatment of last_storage_id in AMP
  • Canonicalization of "outdated" logic in AMP and "stale" logic in dagster-webserver under a new single concept, likely named "unsynced" (but subject to discussion).

Another objective is to allow user-defined AMP rules against a higher level API that is either this or something directly backed by this.

How I Tested These Changes

Included unit tests

@schrockn
Copy link
Member Author

schrockn commented Mar 6, 2024

This stack of pull requests is managed by Graphite. Learn more about stacking.

Join @schrockn and the rest of your teammates on Graphite Graphite

@schrockn schrockn force-pushed the asset-graph-view-1 branch from d83cda2 to 7574878 Compare March 7, 2024 13:43
@schrockn schrockn marked this pull request as ready for review March 7, 2024 14:08
Copy link
Contributor

@OwenKephart OwenKephart left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

broadly in support of this pattern, had some comments on naming / structure

@schrockn schrockn force-pushed the asset-graph-view-1 branch from 7574878 to f976741 Compare March 7, 2024 22:45
@schrockn
Copy link
Member Author

schrockn commented Mar 7, 2024

Thanks for feedback @OwenKephart updated

Copy link
Contributor

@OwenKephart OwenKephart left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it! Tiny remaining nits but I'm feeling good about this structure.

self._partitions_def,
{AssetPartition(self.asset_key, partition_key) for partition_key in partition_keys},
)
& self._compatible_subset,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this should be self._compatible_subset & AssetSubset.from_partitions_set(...) for perf. This method will often be called on an AssetSlice which contains an AllPartitionsSubset, which has a "free" implementation of __and__ (AllPartitionsSubset & X = X), but if we do X & AllPartitionsSubset, the default __and__ implementation will force us to construct the keys in the AllPartitionsSubset.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's specialize the base class operators instead so we don't have to think about it. #20354

"""

effective_dt: datetime
last_event_id: Optional[int]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming opinion: Since TemporalContext represents a point in time/event history, IMO these should just be datetime and event_id-- the qualifiers "effective" and "last" are already implied by the fact that these are on a TemporalContext.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the only issue here is that there values will appear on their own without a TemporalContext object in lots of places.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I understand e.g. "effective datetime" will became a new standard phrase, but I think my reasoning still holds. Like if I had a cursor class that represented an x, y upper bound:

cursor = MaxCursor(x=..., y=...)  # not max_x, max_y

# then later if I need to extract one of these values

max_x = cursor.x

I think the situation is similar here. Just kinda a matter of taste though, NBD.



PartitionKey: TypeAlias = Optional[str]
AssetPartition: TypeAlias = AssetKeyPartitionKey
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming opinion: Having one of these qualified with Key and the other not is IMO a bit confusing, since they are both actually keys. If you want to lose AssetKeyPartitionKey, IMO should be either:

  • Partition
  • AssetPartition

or

  • PartitionKey
  • AssetPartitionKey

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great point. The only thing is that people have been quite happy renaming variables asset_partitions. How do we feel about asset_partition_keys instead everywhere? cc: @OwenKephart on this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like asset_partition_key and it also matches what we do with asset_check_key/AssetCheckKey

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

K i'm sold. Would like consensus from @clairelin135 @OwenKephart @sryza before we do a mass renaming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like AssetKeyPartitionKey -> AssetPartitionKey.

A couple concerns I have a about PartitionKey:

  • Main concern: if it's just a type alias, it's not enforced by the type system, which means we're likely to end up with
    a mix of str and PartitionKey all over the codebase. And policing this in reviews feels like a potentially large time sync.
  • If we do go this route, I think it should be str, not Optional[str]. There are some situations in which a partition key is required, and we'd need a way to express that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OwenKephart I think that's a great call. I'll take that approach and respin this 👍🏻

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm actually I think pyright is too permissive on this:

from typing import NewType

PartitionKey = NewType('PartitionKey', str)


def _takes_str(string: str):
    ...

def foo() -> None: 
    _takes_str(PartitionKey('foo'))

This works. We might need a custom class?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@schrockn

I was thinking the following is the scenario we'd be more worried about (which pyright does catch):

from typing import NewType

PartitionKey = NewType("PartitionKey", str)


def _takes_pk(pk: PartitionKey) -> None: ...


def foo() -> None:
    _takes_pk("foo")

imagining _takes_pk as something like:

def get_children_partitions(
        self,
        dynamic_partitions_store: DynamicPartitionsStore,
        current_time: datetime,
        asset_key: AssetKey,
        partition_key: PartitionKey,
    ) -> AbstractSet[AssetKeyPartitionKey]:

One issue there is that there are plenty of user-facing methods like execute_in_process which take a partition_key argument so we wouldn't be able to change those.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to quick do a version that makes a completely new type to see how it feels.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue there is that there are plenty of user-facing methods like execute_in_process which take a partition_key argument so we wouldn't be able to change those.

Yes I think the normal, public API should be in raw strings regardless. However tradeoffs are different there than places where a user will be writing custom AMP rules that require deep understanding of the partitioning system.


@property
def _partitions_def(self) -> Optional["PartitionsDefinition"]:
return self._asset_graph_view.asset_graph.get_partitions_def(self.asset_key)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My second AssetGraph callsites PR removes AssetGraph.get_partitions_def. We should use new API asset_graph.get(self.asset_key).partitions_def.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great I just approved. Merge and I'll restack.

@schrockn schrockn force-pushed the asset-graph-view-1 branch from 6210591 to 99cd2dc Compare March 8, 2024 14:22
schrockn added a commit that referenced this pull request Mar 8, 2024
## Summary & Motivation

Owen made the sensible suggestion (#20312 (comment)) to take advantage of native partition set features to avoid materializing partition keys with the AllPartitionsSubset is involved. This required changing operation order.

Instead, this PR proposes to also special case the base class operators when "other" is AllPartitionsSubset so we don't have to think about order.


## How I Tested These Changes

BK
@schrockn schrockn force-pushed the asset-graph-view-1 branch 2 times, most recently from b92518d to 6bb9be9 Compare March 9, 2024 12:27
Comment on lines 46 to 60
"""
PartitionKey is a string that represents a partition key. It is a new type so
that we can track the use of strs as partition keys.

PartitionKey should only be created via the static factory methods on AssetPartitionKey.
We want to consolidate all partition key creation so we have a leverage point to change
how they. Eventually we want to create a new singleton string partition to represent
the "all" partition key for unpartitioned assets.
"""


class AssetPartitionKey(NamedTuple):
"""Represents a a partition in a particular asset. Prefer usage to AssetKeyPartitionKey.
If you need to convert AssetKeyPartitionKeys to AssetPartitionKeys, use the static
factory methods.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OwenKephart and @smackesey would like your sign-off here. I want to make sure that we are aligned that this executes on the plan we discussed in the Slack huddle.

Copy link
Member Author

@schrockn schrockn Mar 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(The docblock on AssetPartitionKey and PartitionKey)

@schrockn schrockn mentioned this pull request Mar 10, 2024
@schrockn
Copy link
Member Author

schrockn commented Mar 10, 2024

Heads up @OwenKephart @smackesey and @sryza I decided not to introduce PartitionKey and AssetPartitionKey here as I was dissatisfied with the consequences of it when I applied the change up the stack.

This PR is now written in terms of str for partition keys and AssetKeyPartitionKey. My plan is introduce a PartitionKey and AssetPartitionKey class after we have fleshed this out more, so that we truly understand the consequences of this.

@schrockn schrockn force-pushed the asset-graph-view-1 branch 2 times, most recently from b2a2725 to 25abc9f Compare March 11, 2024 09:36
@smackesey
Copy link
Collaborator

This PR is now written in terms of str for partition keys and AssetKeyPartitionKey. My plan is introduce a PartitionKey and AssetPartitionKey class after we have fleshed this out more, so that we truly understand the consequences of this.

Seems sensible to me. Also feel like it would be good to fully replace AssetKeyPartitionKey with AssetPartitionKey in one PR so as to avoid a confusing state where both of these exist.

partition windows.
"""

def __init__(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use a dataclass/namedtuple/basemodel here?

Copy link
Member Author

@schrockn schrockn Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They do not play nicely with @cached_method and @cached_property.

I actually experimented with declaring __slots__ for the cached method instances, but that seemed like premature optimization.

@schrockn schrockn force-pushed the asset-graph-view-1 branch from 25abc9f to 1a3f697 Compare March 11, 2024 17:27
@schrockn
Copy link
Member Author

schrockn commented Mar 11, 2024

Merge activity

  • Mar 11, 2:02 PM EDT: @schrockn started a stack merge that includes this pull request via Graphite.
  • Mar 11, 2:03 PM EDT: @schrockn merged this pull request with Graphite.

@schrockn schrockn merged commit 2a9e0f9 into master Mar 11, 2024
1 of 2 checks passed
@schrockn schrockn deleted the asset-graph-view-1 branch March 11, 2024 18:03
@schrockn schrockn mentioned this pull request Mar 25, 2024
PedramNavid pushed a commit that referenced this pull request Mar 28, 2024
## Summary & Motivation

Owen made the sensible suggestion (#20312 (comment)) to take advantage of native partition set features to avoid materializing partition keys with the AllPartitionsSubset is involved. This required changing operation order.

Instead, this PR proposes to also special case the base class operators when "other" is AllPartitionsSubset so we don't have to think about order.


## How I Tested These Changes

BK
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants