Skip to content

Commit

Permalink
Expose a way to get nonexistant partition keys from an upstream parti…
Browse files Browse the repository at this point in the history
…tion mapping using AssetGraphView (#25995)

## Summary & Motivation
Adds a way to raise on invalid upstream partition definitions for
callsites using EntitySubset/AssetGraphView.

## How I Tested These Changes
New test case

## Changelog
NOCHANGELOG
  • Loading branch information
gibsondan authored Dec 20, 2024
1 parent 43e43c5 commit 4425985
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
Literal,
NamedTuple,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
)
Expand All @@ -24,6 +26,7 @@
PartitionDimensionDefinition,
)
from dagster._core.definitions.partition import AllPartitionsSubset
from dagster._core.definitions.partition_mapping import UpstreamPartitionsResult
from dagster._core.definitions.time_window_partitions import (
TimeWindow,
TimeWindowPartitionsDefinition,
Expand Down Expand Up @@ -227,6 +230,30 @@ def get_asset_subset_from_asset_partitions(
)
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def compute_parent_subset_and_required_but_nonexistent_partition_keys(
self, parent_key, subset: EntitySubset[T_EntityKey]
) -> Tuple[EntitySubset[AssetKey], Sequence[str]]:
check.invariant(
parent_key in self.asset_graph.get(subset.key).parent_entity_keys,
)
to_key = parent_key
to_partitions_def = self.asset_graph.get(to_key).partitions_def

if subset.is_empty:
return self.get_empty_subset(key=parent_key), []
elif to_partitions_def is None:
return self.get_full_subset(key=to_key), []

upstream_partition_result = self._compute_upstream_partitions_result(to_key, subset)

parent_subset = EntitySubset(
self,
key=to_key,
value=_ValidatedEntitySubsetValue(upstream_partition_result.partitions_subset),
)

return parent_subset, upstream_partition_result.required_but_nonexistent_partition_keys

def compute_parent_subset(
self, parent_key: AssetKey, subset: EntitySubset[T_EntityKey]
) -> EntitySubset[AssetKey]:
Expand All @@ -243,6 +270,25 @@ def compute_child_subset(
)
return self.compute_mapped_subset(child_key, subset, direction="down")

def _compute_upstream_partitions_result(
self, to_key: T_EntityKey, from_subset: EntitySubset
) -> UpstreamPartitionsResult:
from_key = from_subset.key
parent_key = to_key
partition_mapping = self.asset_graph.get_partition_mapping(from_key, parent_key)
from_partitions_def = self.asset_graph.get(from_key).partitions_def
to_partitions_def = self.asset_graph.get(to_key).partitions_def

return partition_mapping.get_upstream_mapped_partitions_result_for_partitions(
downstream_partitions_subset=from_subset.get_internal_subset_value()
if from_partitions_def is not None
else None,
downstream_partitions_def=from_partitions_def,
upstream_partitions_def=check.not_none(to_partitions_def),
dynamic_partitions_store=self._queryer,
current_time=self.effective_dt,
)

def compute_mapped_subset(
self, to_key: T_EntityKey, from_subset: EntitySubset, direction: Literal["up", "down"]
) -> EntitySubset[T_EntityKey]:
Expand All @@ -258,9 +304,7 @@ def compute_mapped_subset(
else self.get_full_subset(key=to_key)
)

child_key = to_key
parent_key = from_key
partition_mapping = self.asset_graph.get_partition_mapping(child_key, parent_key)
partition_mapping = self.asset_graph.get_partition_mapping(to_key, from_key)

to_partitions_subset = partition_mapping.get_downstream_partitions_for_partitions(
upstream_partitions_subset=from_subset.get_internal_subset_value(),
Expand All @@ -277,21 +321,9 @@ def compute_mapped_subset(
else self.get_full_subset(key=to_key)
)

child_key = from_key
parent_key = to_key
partition_mapping = self.asset_graph.get_partition_mapping(child_key, parent_key)

to_partitions_subset = (
partition_mapping.get_upstream_mapped_partitions_result_for_partitions(
downstream_partitions_subset=from_subset.get_internal_subset_value()
if from_partitions_def is not None
else None,
downstream_partitions_def=from_partitions_def,
upstream_partitions_def=to_partitions_def,
dynamic_partitions_store=self._queryer,
current_time=self.effective_dt,
).partitions_subset
)
to_partitions_subset = self._compute_upstream_partitions_result(
to_key, from_subset
).partitions_subset

return EntitySubset(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class AllPartitionMapping(PartitionMapping, NamedTuple("_AllPartitionMapping", [
"""Maps every partition in the downstream asset to every partition in the upstream asset.
Commonly used in the case when the downstream asset is not partitioned, in which the entire
downstream asset depends on all partitions of the usptream asset.
downstream asset depends on all partitions of the upstream asset.
"""

def get_upstream_mapped_partitions_result_for_partitions(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dagster import AssetDep, Definitions, asset
from dagster import AssetDep, Definitions, IdentityPartitionMapping, asset
from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.events import AssetKeyPartitionKey
Expand Down Expand Up @@ -27,6 +27,46 @@ def an_asset() -> None: ...
assert asset_graph_view_t0.asset_graph.get_all_asset_keys() == {an_asset.key}


def test_upstream_non_existent_partitions():
xy = StaticPartitionsDefinition(["x", "y"])
zx = StaticPartitionsDefinition(["z", "x"])

@asset(partitions_def=xy)
def up_asset() -> None: ...

@asset(
deps=[AssetDep(up_asset, partition_mapping=IdentityPartitionMapping())],
partitions_def=zx,
)
def down_asset(): ...

defs = Definitions([up_asset, down_asset])

with DagsterInstance.ephemeral() as instance:
asset_graph_view = AssetGraphView.for_test(defs, instance)
down_subset = asset_graph_view.get_full_subset(key=down_asset.key)
assert down_subset.expensively_compute_partition_keys() == {"x", "z"}

parent_subset, required_but_nonexistent_partition_keys = (
asset_graph_view.compute_parent_subset_and_required_but_nonexistent_partition_keys(
parent_key=up_asset.key, subset=down_subset
)
)

assert parent_subset.expensively_compute_partition_keys() == {"x"}
assert required_but_nonexistent_partition_keys == ["z"]

# Mapping onto an empty subset is empty
empty_down_subset = asset_graph_view.get_empty_subset(key=down_asset.key)
parent_subset, required_but_nonexistent_partition_keys = (
asset_graph_view.compute_parent_subset_and_required_but_nonexistent_partition_keys(
parent_key=up_asset.key, subset=empty_down_subset
)
)
assert parent_subset.is_empty
assert required_but_nonexistent_partition_keys == []


def test_subset_traversal_static_partitions() -> None:
number_keys = {"1", "2", "3"}
letter_keys = {"a", "b", "c"}
Expand Down Expand Up @@ -133,6 +173,43 @@ def up_numbers() -> None: ...
)


def test_downstream_of_unpartitioned_partition_mapping() -> None:
@asset
def unpartitioned() -> None: ...

@asset(
partitions_def=StaticPartitionsDefinition(["a", "b", "c"]),
deps=[AssetDep(unpartitioned, partition_mapping=LastPartitionMapping())],
)
def downstream() -> None: ...

defs = Definitions([downstream, unpartitioned])
instance = DagsterInstance.ephemeral()
asset_graph_view = AssetGraphView.for_test(defs, instance)

unpartitioned_full = asset_graph_view.get_full_subset(key=unpartitioned.key)
unpartitioned_empty = asset_graph_view.get_empty_subset(key=unpartitioned.key)

downstream_full = asset_graph_view.get_full_subset(key=downstream.key)
downstream_empty = asset_graph_view.get_empty_subset(key=downstream.key)

assert unpartitioned_full.compute_child_subset(child_key=downstream.key) == downstream_full
assert unpartitioned_empty.compute_child_subset(child_key=downstream.key) == downstream_empty

assert downstream_full.compute_parent_subset(parent_key=unpartitioned.key) == unpartitioned_full
assert (
downstream_empty.compute_parent_subset(parent_key=unpartitioned.key) == unpartitioned_empty
)

assert asset_graph_view.compute_parent_subset_and_required_but_nonexistent_partition_keys(
parent_key=unpartitioned.key, subset=downstream_full
) == (unpartitioned_full, [])

assert asset_graph_view.compute_parent_subset_and_required_but_nonexistent_partition_keys(
parent_key=unpartitioned.key, subset=downstream_empty
) == (unpartitioned_empty, [])


def test_upstream_of_unpartitioned_partition_mapping() -> None:
@asset(partitions_def=StaticPartitionsDefinition(["a", "b", "c"]))
def upstream() -> None: ...
Expand Down

0 comments on commit 4425985

Please sign in to comment.