Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dynamic partitioning in AssetSlice #20355

Merged
merged 2 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dagster._core.definitions.partition import (
AllPartitionsSubset,
DefaultPartitionsSubset,
DynamicPartitionsDefinition,
StaticPartitionsDefinition,
)
from dagster._core.definitions.time_window_partitions import (
Expand Down Expand Up @@ -210,8 +211,8 @@ def time_windows(self) -> Sequence[TimeWindow]:
check.failed(
f"Unsupported subset value in generated subset {self._compatible_subset.subset_value} created by keys {tw_partition_keys}"
)
else:
check.failed(f"Unsupported subset value: {self._compatible_subset.subset_value}")

check.failed(f"Unsupported subset value: {self._compatible_subset.subset_value}")

def _time_window_partitions_def_in_context(self) -> Optional[TimeWindowPartitionsDefinition]:
pd = self._partitions_def
Expand Down Expand Up @@ -406,10 +407,9 @@ def create_latest_time_window_slice(self, asset_key: AssetKey) -> AssetSlice:
TODO: add language for dynamic partitioning when we support it
"""
partitions_def = self._get_partitions_def(asset_key)
if partitions_def is None:
return self.get_asset_slice(asset_key)

if isinstance(partitions_def, StaticPartitionsDefinition):
if partitions_def is None or isinstance(
partitions_def, (DynamicPartitionsDefinition, StaticPartitionsDefinition)
):
return self.get_asset_slice(asset_key)

if isinstance(partitions_def, TimeWindowPartitionsDefinition):
Expand Down Expand Up @@ -485,7 +485,8 @@ def _build_multi_partition_slice(
last_tw
)
for secondary_pk in multi_dim_info.secondary_partition_def.get_partition_keys(
dynamic_partitions_store=self._queryer, current_time=self.effective_dt
current_time=self.effective_dt,
dynamic_partitions_store=self._queryer,
)
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,10 @@ def has_time_window_dimension(self) -> bool:
@property
def time_window_partitions_def(self) -> TimeWindowPartitionsDefinition:
check.invariant(self.has_time_window_dimension, "Must have time window dimension")
assert isinstance(
self.primary_dimension.partitions_def, TimeWindowPartitionsDefinition
) # appease pyright
return check.inst(self.primary_dimension.partitions_def, TimeWindowPartitionsDefinition)
return cast(
TimeWindowPartitionsDefinition,
check.inst(self.primary_dimension.partitions_def, TimeWindowPartitionsDefinition),
)

def time_window_for_partition_key(self, partition_key: str) -> TimeWindow:
if not isinstance(partition_key, MultiPartitionKey):
Expand Down
8 changes: 6 additions & 2 deletions python_modules/dagster/dagster/_core/definitions/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,9 @@ def get_partition_keys(
check.failed(
"The instance is not available to load partitions. You may be seeing this error"
" when using dynamic partitions with a version of dagster-webserver or"
" dagster-cloud that is older than 1.1.18."
" dagster-cloud that is older than 1.1.18. The other possibility is that an"
" internal framework error where a dynamic partitions store was not properly"
" threaded down a call stack."
schrockn marked this conversation as resolved.
Show resolved Hide resolved
)

return dynamic_partitions_store.get_dynamic_partitions(
Expand All @@ -545,7 +547,9 @@ def has_partition_key(
check.failed(
"The instance is not available to load partitions. You may be seeing this error"
" when using dynamic partitions with a version of dagster-webserver or"
" dagster-cloud that is older than 1.1.18."
" dagster-cloud that is older than 1.1.18. The other possibility is that an"
" internal framework error where a dynamic partitions store was not properly"
" threaded down a call stack."
schrockn marked this conversation as resolved.
Show resolved Hide resolved
)

return dynamic_partitions_store.has_dynamic_partition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
MultiPartitionKey,
MultiPartitionsDefinition,
)
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.definitions.partition import (
DynamicPartitionsDefinition,
StaticPartitionsDefinition,
)
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition, TimeWindow
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._core.instance import DagsterInstance
Expand Down Expand Up @@ -255,3 +258,61 @@ def multi_dimensional(context: AssetExecutionContext) -> None: ...
assert asset_graph_view.create_latest_time_window_slice(
multi_dimensional.key
).compute_partition_keys() == set(partition_keys)


def test_dynamic_partitioning_latest_time_window() -> None:
dynamic_partition_def = DynamicPartitionsDefinition(name="letters")
instance = DagsterInstance.ephemeral()
partition_keys = {"A", "B", "C"}
instance.add_dynamic_partitions("letters", list(partition_keys))

@asset(partitions_def=dynamic_partition_def)
def dynamic_asset() -> None: ...

daily_partitions_def = DailyPartitionsDefinition(
start_date=pendulum.datetime(2020, 1, 1), end_date=pendulum.datetime(2020, 1, 3)
)
multi_partitions_definition = MultiPartitionsDefinition(
{"daily": daily_partitions_def, "dynamic": dynamic_partition_def}
)

@asset(partitions_def=multi_partitions_definition)
def dynamic_multi_dimensional() -> None: ...

defs = Definitions([dynamic_asset, dynamic_multi_dimensional])

asset_graph_view = AssetGraphView.for_test(defs, instance)
assert (
asset_graph_view.get_asset_slice(dynamic_asset.key).compute_partition_keys()
== partition_keys
)
assert (
asset_graph_view.create_latest_time_window_slice(dynamic_asset.key).compute_partition_keys()
== partition_keys
)

partition_keys = []
jan_2_keys = []
for daily_pk in daily_partitions_def.get_partition_keys(dynamic_partitions_store=instance):
for dynamic_pk in dynamic_partition_def.get_partition_keys(
dynamic_partitions_store=instance
):
if daily_pk == "2020-01-02":
jan_2_keys.append(MultiPartitionKey({"daily": daily_pk, "dynamic": dynamic_pk}))

partition_keys.append(MultiPartitionKey({"daily": daily_pk, "dynamic": dynamic_pk}))

assert asset_graph_view.get_asset_slice(
dynamic_multi_dimensional.key
).compute_partition_keys() == set(partition_keys)
assert asset_graph_view.create_latest_time_window_slice(
dynamic_multi_dimensional.key
).compute_partition_keys() == set(jan_2_keys)

assert _tw(
asset_graph_view.create_latest_time_window_slice(dynamic_multi_dimensional.key)
).start == pendulum.datetime(2020, 1, 2)

assert _tw(
asset_graph_view.create_latest_time_window_slice(dynamic_multi_dimensional.key)
).end == pendulum.datetime(2020, 1, 3)