Skip to content

Commit

Permalink
Support dynamic partitioning in AssetSlice
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Mar 12, 2024
1 parent 905ef8a commit 73e54c2
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dagster._core.definitions.partition import (
AllPartitionsSubset,
DefaultPartitionsSubset,
DynamicPartitionsDefinition,
StaticPartitionsDefinition,
)
from dagster._core.definitions.time_window_partitions import (
Expand Down Expand Up @@ -207,8 +208,8 @@ def time_windows(self) -> Sequence[TimeWindow]:
check.failed(
f"Unsupported subset value in generated subset {self._compatible_subset.subset_value} created by keys {tw_partition_keys}"
)
else:
check.failed(f"Unsupported subset value: {self._compatible_subset.subset_value}")

check.failed(f"Unsupported subset value: {self._compatible_subset.subset_value}")

def _time_window_partitions_def_in_context(self) -> Optional[TimeWindowPartitionsDefinition]:
pd = self._partitions_def
Expand Down Expand Up @@ -403,10 +404,9 @@ def create_latest_time_window_slice(self, asset_key: AssetKey) -> AssetSlice:
TODO: add language for dynamic partitioning when we support it
"""
partitions_def = self._get_partitions_def(asset_key)
if partitions_def is None:
return self.get_asset_slice(asset_key)

if isinstance(partitions_def, StaticPartitionsDefinition):
if partitions_def is None or isinstance(
partitions_def, (DynamicPartitionsDefinition, StaticPartitionsDefinition)
):
return self.get_asset_slice(asset_key)

if isinstance(partitions_def, TimeWindowPartitionsDefinition):
Expand Down Expand Up @@ -482,7 +482,8 @@ def _build_multi_partition_slice(
last_tw
)
for secondary_pk in multi_dim_info.secondary_partition_def.get_partition_keys(
dynamic_partitions_store=self._queryer, current_time=self.effective_dt
current_time=self.effective_dt,
dynamic_partitions_store=self._queryer,
)
}
)
Expand Down
8 changes: 6 additions & 2 deletions python_modules/dagster/dagster/_core/definitions/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,9 @@ def get_partition_keys(
check.failed(
"The instance is not available to load partitions. You may be seeing this error"
" when using dynamic partitions with a version of dagster-webserver or"
" dagster-cloud that is older than 1.1.18."
" dagster-cloud that is older than 1.1.18. The other possibility is that an"
" internal framework error where a dynamic partitions store was not properly"
" threaded down a call stack."
)

return dynamic_partitions_store.get_dynamic_partitions(
Expand All @@ -545,7 +547,9 @@ def has_partition_key(
check.failed(
"The instance is not available to load partitions. You may be seeing this error"
" when using dynamic partitions with a version of dagster-webserver or"
" dagster-cloud that is older than 1.1.18."
" dagster-cloud that is older than 1.1.18. The other possibility is that an"
" internal framework error where a dynamic partitions store was not properly"
" threaded down a call stack."
)

return dynamic_partitions_store.has_dynamic_partition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
MultiPartitionKey,
MultiPartitionsDefinition,
)
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.definitions.partition import (
DynamicPartitionsDefinition,
StaticPartitionsDefinition,
)
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition, TimeWindow
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._core.instance import DagsterInstance
Expand Down Expand Up @@ -255,3 +258,61 @@ def multi_dimensional(context: AssetExecutionContext) -> None: ...
assert asset_graph_view.create_latest_time_window_slice(
multi_dimensional.key
).compute_partition_keys() == set(partition_keys)


def test_dynamic_partitioning_latest_time_window() -> None:
dynamic_partition_def = DynamicPartitionsDefinition(name="letters")
instance = DagsterInstance.ephemeral()
partition_keys = {"A", "B", "C"}
instance.add_dynamic_partitions("letters", list(partition_keys))

@asset(partitions_def=dynamic_partition_def)
def dynamic_asset() -> None: ...

daily_partitions_def = DailyPartitionsDefinition(
start_date=pendulum.datetime(2020, 1, 1), end_date=pendulum.datetime(2020, 1, 3)
)
multi_partitions_definition = MultiPartitionsDefinition(
{"daily": daily_partitions_def, "dynamic": dynamic_partition_def}
)

@asset(partitions_def=multi_partitions_definition)
def dynamic_multi_dimensional() -> None: ...

defs = Definitions([dynamic_asset, dynamic_multi_dimensional])

asset_graph_view = AssetGraphView.for_test(defs, instance)
assert (
asset_graph_view.get_asset_slice(dynamic_asset.key).compute_partition_keys()
== partition_keys
)
assert (
asset_graph_view.create_latest_time_window_slice(dynamic_asset.key).compute_partition_keys()
== partition_keys
)

partition_keys = []
jan_2_keys = []
for daily_pk in daily_partitions_def.get_partition_keys(dynamic_partitions_store=instance):
for dynamic_pk in dynamic_partition_def.get_partition_keys(
dynamic_partitions_store=instance
):
if daily_pk == "2020-01-02":
jan_2_keys.append(MultiPartitionKey({"daily": daily_pk, "dynamic": dynamic_pk}))

partition_keys.append(MultiPartitionKey({"daily": daily_pk, "dynamic": dynamic_pk}))

assert asset_graph_view.get_asset_slice(
dynamic_multi_dimensional.key
).compute_partition_keys() == set(partition_keys)
assert asset_graph_view.create_latest_time_window_slice(
dynamic_multi_dimensional.key
).compute_partition_keys() == set(jan_2_keys)

assert _tw(
asset_graph_view.create_latest_time_window_slice(dynamic_multi_dimensional.key)
).start == pendulum.datetime(2020, 1, 2)

assert _tw(
asset_graph_view.create_latest_time_window_slice(dynamic_multi_dimensional.key)
).end == pendulum.datetime(2020, 1, 3)

0 comments on commit 73e54c2

Please sign in to comment.