Skip to content

Commit

Permalink
Support dynamic partitioning in AssetSlice
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Mar 11, 2024
1 parent c8e57dd commit 2460929
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
MultiPartitionsDefinition,
PartitionDimensionDefinition,
)
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.definitions.partition import (
DynamicPartitionsDefinition,
StaticPartitionsDefinition,
)
from dagster._core.definitions.time_window_partitions import (
TimeWindow,
TimeWindowPartitionsDefinition,
Expand Down Expand Up @@ -222,6 +225,9 @@ def latest_time_window_slice(self) -> "AssetSlice":
)

# Need to handle dynamic partitioning
if isinstance(self._partitions_def, DynamicPartitionsDefinition):
return self

check.failed(f"Unsupported partitions_def: {self._partitions_def}")

def _build_multi_partition_slice(
Expand All @@ -231,7 +237,9 @@ def _build_multi_partition_slice(
# in the underlying PartitionsSet. We could add a specialized PartitionsSubset
# subclass that itself composed two PartitionsSubset to avoid materializing the entire
# partitions range.
return self._asset_graph_view.get_asset_slice(self.asset_key).only_partition_keys(
return self._asset_graph_view.get_asset_slice(
self.asset_key
).compute_intersection_with_partition_keys(
{
MultiPartitionKey(
{
Expand All @@ -242,7 +250,10 @@ def _build_multi_partition_slice(
for tw_pk in multi_dim_info.tw_partition_def.get_partition_keys_in_time_window(
last_tw
)
for secondary_pk in multi_dim_info.secondary_partition_def.get_partition_keys()
for secondary_pk in multi_dim_info.secondary_partition_def.get_partition_keys(
current_time=self._asset_graph_view.effective_dt,
dynamic_partitions_store=self._asset_graph_view._queryer, # noqa: SLF001
)
}
)

Expand Down
8 changes: 6 additions & 2 deletions python_modules/dagster/dagster/_core/definitions/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,9 @@ def get_partition_keys(
check.failed(
"The instance is not available to load partitions. You may be seeing this error"
" when using dynamic partitions with a version of dagster-webserver or"
" dagster-cloud that is older than 1.1.18."
" dagster-cloud that is older than 1.1.18. The other possibility is that an"
" internal framework error where a dynamic partitions store was not properly"
" threaded down a call stack."
)

return dynamic_partitions_store.get_dynamic_partitions(
Expand All @@ -545,7 +547,9 @@ def has_partition_key(
check.failed(
"The instance is not available to load partitions. You may be seeing this error"
" when using dynamic partitions with a version of dagster-webserver or"
" dagster-cloud that is older than 1.1.18."
" dagster-cloud that is older than 1.1.18. The other possibility is that an"
" internal framework error where a dynamic partitions store was not properly"
" threaded down a call stack."
)

return dynamic_partitions_store.has_dynamic_partition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
MultiPartitionKey,
MultiPartitionsDefinition,
)
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.definitions.partition import (
DynamicPartitionsDefinition,
StaticPartitionsDefinition,
)
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._core.instance import DagsterInstance
Expand Down Expand Up @@ -247,3 +250,63 @@ def multi_dimensional(context: AssetExecutionContext) -> None: ...
multi_dimensional.key
).latest_time_window_slice.is_empty
assert asset_graph_view.create_empty_slice(multi_dimensional.key).latest_time_window.is_empty


def test_dynamic_partitioning_latest_time_window() -> None:
dynamic_partition_def = DynamicPartitionsDefinition(name="letters")
instance = DagsterInstance.ephemeral()
partition_keys = {"A", "B", "C"}
instance.add_dynamic_partitions("letters", list(partition_keys))

@asset(partitions_def=dynamic_partition_def)
def dynamic_asset() -> None: ...

daily_partitions_def = DailyPartitionsDefinition(
start_date=pendulum.datetime(2020, 1, 1), end_date=pendulum.datetime(2020, 1, 3)
)
multi_partitions_definition = MultiPartitionsDefinition(
{"daily": daily_partitions_def, "dynamic": dynamic_partition_def}
)

@asset(partitions_def=multi_partitions_definition)
def dynamic_multi_dimensional() -> None: ...

defs = Definitions([dynamic_asset, dynamic_multi_dimensional])

asset_graph_view = AssetGraphView.for_test(defs, instance)
assert (
asset_graph_view.get_asset_slice(dynamic_asset.key).compute_partition_keys()
== partition_keys
)
assert (
asset_graph_view.get_asset_slice(
dynamic_asset.key
).latest_time_window_slice.compute_partition_keys()
== partition_keys
)

partition_keys = []
jan_2_keys = []
for daily_pk in daily_partitions_def.get_partition_keys(dynamic_partitions_store=instance):
for dynamic_pk in dynamic_partition_def.get_partition_keys(
dynamic_partitions_store=instance
):
if daily_pk == "2020-01-02":
jan_2_keys.append(MultiPartitionKey({"daily": daily_pk, "dynamic": dynamic_pk}))

partition_keys.append(MultiPartitionKey({"daily": daily_pk, "dynamic": dynamic_pk}))

assert asset_graph_view.get_asset_slice(
dynamic_multi_dimensional.key
).compute_partition_keys() == set(partition_keys)
assert asset_graph_view.get_asset_slice(
dynamic_multi_dimensional.key
).latest_time_window_slice.compute_partition_keys() == set(jan_2_keys)

assert asset_graph_view.get_asset_slice(
dynamic_multi_dimensional.key
).latest_time_window.start == pendulum.datetime(2020, 1, 2)

assert asset_graph_view.get_asset_slice(
dynamic_multi_dimensional.key
).latest_time_window.end == pendulum.datetime(2020, 1, 3)

0 comments on commit 2460929

Please sign in to comment.