diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 590024f54494e..15349a66f05ba 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -21,6 +21,7 @@ from dagster._core.definitions.partition import ( AllPartitionsSubset, DefaultPartitionsSubset, + DynamicPartitionsDefinition, StaticPartitionsDefinition, ) from dagster._core.definitions.time_window_partitions import ( @@ -207,8 +208,8 @@ def time_windows(self) -> Sequence[TimeWindow]: check.failed( f"Unsupported subset value in generated subset {self._compatible_subset.subset_value} created by keys {tw_partition_keys}" ) - else: - check.failed(f"Unsupported subset value: {self._compatible_subset.subset_value}") + + check.failed(f"Unsupported subset value: {self._compatible_subset.subset_value}") def _time_window_partitions_def_in_context(self) -> Optional[TimeWindowPartitionsDefinition]: pd = self._partitions_def @@ -403,10 +404,9 @@ def create_latest_time_window_slice(self, asset_key: AssetKey) -> AssetSlice: TODO: add language for dynamic partitioning when we support it """ partitions_def = self._get_partitions_def(asset_key) - if partitions_def is None: - return self.get_asset_slice(asset_key) - - if isinstance(partitions_def, StaticPartitionsDefinition): + if partitions_def is None or isinstance( + partitions_def, (DynamicPartitionsDefinition, StaticPartitionsDefinition) + ): return self.get_asset_slice(asset_key) if isinstance(partitions_def, TimeWindowPartitionsDefinition): @@ -482,7 +482,8 @@ def _build_multi_partition_slice( last_tw ) for secondary_pk in multi_dim_info.secondary_partition_def.get_partition_keys( - dynamic_partitions_store=self._queryer, current_time=self.effective_dt + current_time=self.effective_dt, + dynamic_partitions_store=self._queryer, ) } ) diff --git a/python_modules/dagster/dagster/_core/definitions/partition.py b/python_modules/dagster/dagster/_core/definitions/partition.py index 7ea58388a5cdd..3f9e003aa7543 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition.py +++ b/python_modules/dagster/dagster/_core/definitions/partition.py @@ -525,7 +525,9 @@ def get_partition_keys( check.failed( "The instance is not available to load partitions. You may be seeing this error" " when using dynamic partitions with a version of dagster-webserver or" - " dagster-cloud that is older than 1.1.18." + " dagster-cloud that is older than 1.1.18. The other possibility is that an" + " internal framework error where a dynamic partitions store was not properly" + " threaded down a call stack." ) return dynamic_partitions_store.get_dynamic_partitions( @@ -545,7 +547,9 @@ def has_partition_key( check.failed( "The instance is not available to load partitions. You may be seeing this error" " when using dynamic partitions with a version of dagster-webserver or" - " dagster-cloud that is older than 1.1.18." + " dagster-cloud that is older than 1.1.18. The other possibility is that an" + " internal framework error where a dynamic partitions store was not properly" + " threaded down a call stack." ) return dynamic_partitions_store.has_dynamic_partition( diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_latest_time_window.py b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_latest_time_window.py index 680d7e567f95e..5aa31a9d845ab 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_latest_time_window.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_graph_view_tests/test_latest_time_window.py @@ -11,7 +11,10 @@ MultiPartitionKey, MultiPartitionsDefinition, ) -from dagster._core.definitions.partition import StaticPartitionsDefinition +from dagster._core.definitions.partition import ( + DynamicPartitionsDefinition, + StaticPartitionsDefinition, +) from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition, TimeWindow from dagster._core.execution.context.compute import AssetExecutionContext from dagster._core.instance import DagsterInstance @@ -255,3 +258,61 @@ def multi_dimensional(context: AssetExecutionContext) -> None: ... assert asset_graph_view.create_latest_time_window_slice( multi_dimensional.key ).compute_partition_keys() == set(partition_keys) + + +def test_dynamic_partitioning_latest_time_window() -> None: + dynamic_partition_def = DynamicPartitionsDefinition(name="letters") + instance = DagsterInstance.ephemeral() + partition_keys = {"A", "B", "C"} + instance.add_dynamic_partitions("letters", list(partition_keys)) + + @asset(partitions_def=dynamic_partition_def) + def dynamic_asset() -> None: ... + + daily_partitions_def = DailyPartitionsDefinition( + start_date=pendulum.datetime(2020, 1, 1), end_date=pendulum.datetime(2020, 1, 3) + ) + multi_partitions_definition = MultiPartitionsDefinition( + {"daily": daily_partitions_def, "dynamic": dynamic_partition_def} + ) + + @asset(partitions_def=multi_partitions_definition) + def dynamic_multi_dimensional() -> None: ... + + defs = Definitions([dynamic_asset, dynamic_multi_dimensional]) + + asset_graph_view = AssetGraphView.for_test(defs, instance) + assert ( + asset_graph_view.get_asset_slice(dynamic_asset.key).compute_partition_keys() + == partition_keys + ) + assert ( + asset_graph_view.create_latest_time_window_slice(dynamic_asset.key).compute_partition_keys() + == partition_keys + ) + + partition_keys = [] + jan_2_keys = [] + for daily_pk in daily_partitions_def.get_partition_keys(dynamic_partitions_store=instance): + for dynamic_pk in dynamic_partition_def.get_partition_keys( + dynamic_partitions_store=instance + ): + if daily_pk == "2020-01-02": + jan_2_keys.append(MultiPartitionKey({"daily": daily_pk, "dynamic": dynamic_pk})) + + partition_keys.append(MultiPartitionKey({"daily": daily_pk, "dynamic": dynamic_pk})) + + assert asset_graph_view.get_asset_slice( + dynamic_multi_dimensional.key + ).compute_partition_keys() == set(partition_keys) + assert asset_graph_view.create_latest_time_window_slice( + dynamic_multi_dimensional.key + ).compute_partition_keys() == set(jan_2_keys) + + assert _tw( + asset_graph_view.create_latest_time_window_slice(dynamic_multi_dimensional.key) + ).start == pendulum.datetime(2020, 1, 2) + + assert _tw( + asset_graph_view.create_latest_time_window_slice(dynamic_multi_dimensional.key) + ).end == pendulum.datetime(2020, 1, 3)