Skip to content

Commit

Permalink
add partition filter to get_latest_storage_id_by_partition (#25510)
Browse files Browse the repository at this point in the history
## Summary & Motivation
We want to filter the number of partitions we scan for, so that we can
optimize the restricted partition case.

## How I Tested These Changes
BK
  • Loading branch information
prha authored Oct 25, 2024
1 parent 4377048 commit f1dfc2f
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 9 deletions.
9 changes: 7 additions & 2 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2265,13 +2265,18 @@ def get_materialized_partitions(

@traced
def get_latest_storage_id_by_partition(
self, asset_key: AssetKey, event_type: "DagsterEventType"
self,
asset_key: AssetKey,
event_type: "DagsterEventType",
partitions: Optional[Set[str]] = None,
) -> Mapping[str, int]:
"""Fetch the latest materialzation storage id for each partition for a given asset key.
Returns a mapping of partition to storage id.
"""
return self._event_storage.get_latest_storage_id_by_partition(asset_key, event_type)
return self._event_storage.get_latest_storage_id_by_partition(
asset_key, event_type, partitions
)

@traced
def get_latest_planned_materialization_info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,10 @@ def get_materialized_partitions(

@abstractmethod
def get_latest_storage_id_by_partition(
self, asset_key: AssetKey, event_type: DagsterEventType
self,
asset_key: AssetKey,
event_type: DagsterEventType,
partitions: Optional[Set[str]] = None,
) -> Mapping[str, int]:
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1877,7 +1877,10 @@ def _latest_event_ids_by_partition_subquery(
)

def get_latest_storage_id_by_partition(
self, asset_key: AssetKey, event_type: DagsterEventType
self,
asset_key: AssetKey,
event_type: DagsterEventType,
partitions: Optional[Set[str]] = None,
) -> Mapping[str, int]:
"""Fetch the latest materialzation storage id for each partition for a given asset key.
Expand All @@ -1886,7 +1889,7 @@ def get_latest_storage_id_by_partition(
check.inst_param(asset_key, "asset_key", AssetKey)

latest_event_ids_by_partition_subquery = self._latest_event_ids_by_partition_subquery(
asset_key, [event_type]
asset_key, [event_type], asset_partitions=list(partitions) if partitions else None
)
latest_event_ids_by_partition = db_select(
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,10 +553,13 @@ def get_materialized_partitions(
)

def get_latest_storage_id_by_partition(
self, asset_key: "AssetKey", event_type: "DagsterEventType"
self,
asset_key: "AssetKey",
event_type: "DagsterEventType",
partitions: Optional[Set[str]] = None,
) -> Mapping[str, int]:
return self._storage.event_log_storage.get_latest_storage_id_by_partition(
asset_key, event_type
asset_key, event_type, partitions
)

def get_latest_tags_by_partition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2634,10 +2634,12 @@ def test_get_latest_storage_ids_by_partition(self, storage, instance):
b = AssetKey(["b"])
run_id = make_new_run_id()

def _assert_storage_matches(expected):
def _assert_storage_matches(expected, partition: Optional[str] = None):
assert (
storage.get_latest_storage_id_by_partition(
a, DagsterEventType.ASSET_MATERIALIZATION
a,
DagsterEventType.ASSET_MATERIALIZATION,
partitions={partition} if partition else None,
)
== expected
)
Expand Down Expand Up @@ -2679,6 +2681,10 @@ def _store_partition_event(asset_key, partition) -> int:
latest_storage_ids["p2"] = _store_partition_event(a, "p2")
_assert_storage_matches(latest_storage_ids)

# check that we can filter for specific partitions
_assert_storage_matches({"p1": latest_storage_ids["p1"]}, partition="p1")
_assert_storage_matches({"p2": latest_storage_ids["p2"]}, partition="p2")

# unrelated asset materialized
_store_partition_event(b, "p1")
_store_partition_event(b, "p2")
Expand Down

0 comments on commit f1dfc2f

Please sign in to comment.