From 90231c79b7aaa2f2a54094b5a12b9bcc0dce8b01 Mon Sep 17 00:00:00 2001 From: gibsondan Date: Mon, 6 Jan 2025 11:34:08 -0600 Subject: [PATCH] Remove slow validation code in get_partition_set_execution_param_data (#26810) Summary: The current implementation here requires fetching every partition key in the set, which can be very slow. We don't appear to use that for anything other than filtering out keys that aren't in the set, which seems like a reasonable thing to require the callsite to check instead. BK > Insert changelog entry or delete this section. ## Summary & Motivation ## How I Tested These Changes ## Changelog > Insert changelog entry or delete this section. --- python_modules/dagster/dagster/_grpc/impl.py | 52 +++++++------------ .../api_tests/test_api_snapshot_partition.py | 4 +- 2 files changed, 22 insertions(+), 34 deletions(-) diff --git a/python_modules/dagster/dagster/_grpc/impl.py b/python_modules/dagster/dagster/_grpc/impl.py index 3a838e684f5a9..b4a0041536c0a 100644 --- a/python_modules/dagster/dagster/_grpc/impl.py +++ b/python_modules/dagster/dagster/_grpc/impl.py @@ -535,42 +535,28 @@ def get_partition_set_execution_param_data( ) = _get_job_partitions_and_config_for_partition_set_name(repo_def, partition_set_name) try: - with _instance_from_ref_for_dynamic_partitions(instance_ref, partitions_def) as instance: - with user_code_error_boundary( - PartitionExecutionError, - lambda: ( - "Error occurred during the partition generation for partitioned config on job" - f" '{job_def.name}'" - ), - ): - all_partition_keys = partitions_def.get_partition_keys( - dynamic_partitions_store=instance - ) - partition_keys = [key for key in all_partition_keys if key in partition_names] - - partition_data = [] - for key in partition_keys: - - def _error_message_fn(partition_name: str): - return lambda: ( - "Error occurred during the partition config and tag generation for" - f" '{partition_name}' in partitioned config on job '{job_def.name}'" - ) - - with user_code_error_boundary(PartitionExecutionError, _error_message_fn(key)): - run_config = partitioned_config.get_run_config_for_partition_key(key) - tags = partitioned_config.get_tags_for_partition_key(key, job_name=job_def.name) - - partition_data.append( - PartitionExecutionParamSnap( - name=key, - tags=tags, - run_config=run_config, - ) + partition_data = [] + for key in partition_names: + + def _error_message_fn(partition_name: str): + return lambda: ( + "Error occurred during the partition config and tag generation for" + f" '{partition_name}' in partitioned config on job '{job_def.name}'" ) - return PartitionSetExecutionParamSnap(partition_data=partition_data) + with user_code_error_boundary(PartitionExecutionError, _error_message_fn(key)): + run_config = partitioned_config.get_run_config_for_partition_key(key) + tags = partitioned_config.get_tags_for_partition_key(key, job_name=job_def.name) + + partition_data.append( + PartitionExecutionParamSnap( + name=key, + tags=tags, + run_config=run_config, + ) + ) + return PartitionSetExecutionParamSnap(partition_data=partition_data) except Exception: return PartitionExecutionErrorSnap( error=serializable_error_info_from_exc_info(sys.exc_info()) diff --git a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_partition.py b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_partition.py index e040eed19148d..f9510eada44d2 100644 --- a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_partition.py +++ b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_partition.py @@ -331,7 +331,9 @@ def test_dynamic_partition_set_grpc(instance: DagsterInstance): instance=instance, ) assert isinstance(data, PartitionSetExecutionParamSnap) - assert data.partition_data == [] + + # non existant partitions can still return snapshots + assert len(data.partition_data) == 1 def test_external_partition_tags_grpc_backcompat_no_job_name(instance: DagsterInstance):