From 695dac30ba40759e5a3fdac135cc030e4053a772 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Wed, 12 Jun 2024 11:15:41 -0400 Subject: [PATCH] Set `RunRequest.partition_key` when backfilling with policy with `max_partitions_per_run=1` --- .../_core/definitions/asset_daemon_context.py | 3 +- ...t_asset_backfill_with_backfill_policies.py | 37 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index 3c79e3629f849..08f86dd115ad8 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -421,7 +421,8 @@ def _build_run_request_for_partition_key_range( ASSET_PARTITION_RANGE_START_TAG: partition_range_start, ASSET_PARTITION_RANGE_END_TAG: partition_range_end, } - return RunRequest(asset_selection=asset_keys, tags=tags) + partition_key = partition_range_start if partition_range_start == partition_range_end else None + return RunRequest(asset_selection=asset_keys, partition_key=partition_key, tags=tags) def get_auto_observe_run_requests( diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py index b54b26e54b270..c38656c21e587 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py @@ -1072,6 +1072,43 @@ def foo_child(): ] +def test_max_partitions_per_range_1_sets_run_request_partition_key(): + @asset( + partitions_def=DailyPartitionsDefinition("2023-10-01"), + backfill_policy=BackfillPolicy.multi_run(1), + ) + def foo(): + pass + + asset_graph = get_asset_graph({"repo1": [foo]}) + + asset_backfill_data = AssetBackfillData.from_asset_partitions( + asset_graph=asset_graph, + partition_names=[ + "2023-10-05", + "2023-10-06", + ], + asset_selection=[foo.key], + dynamic_partitions_store=MagicMock(), + all_partitions=False, + backfill_start_timestamp=create_datetime(2023, 10, 7, 0, 0, 0).timestamp(), + ) + + result = execute_asset_backfill_iteration_consume_generator( + "apple", asset_backfill_data, asset_graph, DagsterInstance.ephemeral() + ) + + assert [run_request.partition_key for run_request in result.run_requests] == [ + "2023-10-05", + "2023-10-06", + ] + + assert [run_request.partition_key_range for run_request in result.run_requests] == [ + PartitionKeyRange("2023-10-05", "2023-10-05"), + PartitionKeyRange("2023-10-06", "2023-10-06"), + ] + + # 0 turns off batching # 2 will require multiple batches to fulfill the backfill # 10 will require a single to fulfill the backfill