Skip to content

Commit

Permalink
Support dynamic partitioning in AssetSlice
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Mar 8, 2024
1 parent 02f35a4 commit d779006
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
MultiPartitionsDefinition,
PartitionDimensionDefinition,
)
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.definitions.partition import (
DynamicPartitionsDefinition,
StaticPartitionsDefinition,
)
from dagster._core.definitions.time_window_partitions import (
TimeWindow,
TimeWindowPartitionsDefinition,
Expand Down Expand Up @@ -227,6 +230,9 @@ def latest_time_window_slice(self) -> "AssetSlice":
)

# Need to handle dynamic partitioning
if isinstance(self._partitions_def, DynamicPartitionsDefinition):
return self

check.failed(f"Unsupported partitions_def: {self._partitions_def}")

def _build_multi_partition_slice(
Expand All @@ -247,7 +253,10 @@ def _build_multi_partition_slice(
for tw_pk in multi_dim_info.tw_partition_def.get_partition_keys_in_time_window(
last_tw
)
for secondary_pk in multi_dim_info.secondary_partition_def.get_partition_keys()
for secondary_pk in multi_dim_info.secondary_partition_def.get_partition_keys(
current_time=self._asset_graph_view.effective_dt,
dynamic_partitions_store=self._asset_graph_view._queryer, # noqa: SLF001
)
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
MultiPartitionKey,
MultiPartitionsDefinition,
)
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.definitions.partition import (
DynamicPartitionsDefinition,
StaticPartitionsDefinition,
)
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._core.instance import DagsterInstance
Expand Down Expand Up @@ -247,3 +250,55 @@ def multi_dimensional(context: AssetExecutionContext) -> None: ...
multi_dimensional.key
).latest_time_window_slice.is_empty
assert asset_graph_view.create_empty_slice(multi_dimensional.key).latest_time_window.is_empty


def test_dynamic_partitioning_latest_time_window() -> None:
dynamic_partition_def = DynamicPartitionsDefinition(name="letters")
instance = DagsterInstance.ephemeral()
partition_keys = {"A", "B", "C"}
instance.add_dynamic_partitions("letters", list(partition_keys))

@asset(partitions_def=dynamic_partition_def)
def dynamic_asset() -> None: ...

daily_partitions_def = DailyPartitionsDefinition(
start_date=pendulum.datetime(2020, 1, 1), end_date=pendulum.datetime(2020, 1, 3)
)
multi_partitions_definition = MultiPartitionsDefinition(
{"daily": daily_partitions_def, "dynamic": dynamic_partition_def}
)

@asset(partitions_def=multi_partitions_definition)
def dynamic_multi_dimensional() -> None: ...

defs = Definitions([dynamic_asset, dynamic_multi_dimensional])

asset_graph_view = AssetGraphView.for_test(defs, instance)
assert (
asset_graph_view.get_asset_slice(dynamic_asset.key).compute_partition_keys()
== partition_keys
)
assert (
asset_graph_view.get_asset_slice(
dynamic_asset.key
).latest_time_window_slice.compute_partition_keys()
== partition_keys
)

partition_keys = []
jan_2_keys = []
for daily_pk in daily_partitions_def.get_partition_keys(dynamic_partitions_store=instance):
for dynamic_pk in dynamic_partition_def.get_partition_keys(
dynamic_partitions_store=instance
):
if daily_pk == "2020-01-02":
jan_2_keys.append(MultiPartitionKey({"daily": daily_pk, "dynamic": dynamic_pk}))

partition_keys.append(MultiPartitionKey({"daily": daily_pk, "dynamic": dynamic_pk}))

assert asset_graph_view.get_asset_slice(
dynamic_multi_dimensional.key
).compute_partition_keys() == set(partition_keys)
assert asset_graph_view.get_asset_slice(
dynamic_multi_dimensional.key
).latest_time_window_slice.compute_partition_keys() == set(jan_2_keys)

0 comments on commit d779006

Please sign in to comment.