Skip to content

Commit

Permalink
Remove slow validation code in get_partition_set_execution_param_data (
Browse files Browse the repository at this point in the history
…#26810)

Summary:
The current implementation here requires fetching every partition key in
the set, which can be very slow. We don't appear to use that for
anything other than filtering out keys that aren't in the set, which
seems like a reasonable thing to require the callsite to check instead.

BK

> Insert changelog entry or delete this section.

## Summary & Motivation

## How I Tested These Changes

## Changelog

> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan authored Jan 6, 2025
1 parent 35ca60c commit 90231c7
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 34 deletions.
52 changes: 19 additions & 33 deletions python_modules/dagster/dagster/_grpc/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,42 +535,28 @@ def get_partition_set_execution_param_data(
) = _get_job_partitions_and_config_for_partition_set_name(repo_def, partition_set_name)

try:
with _instance_from_ref_for_dynamic_partitions(instance_ref, partitions_def) as instance:
with user_code_error_boundary(
PartitionExecutionError,
lambda: (
"Error occurred during the partition generation for partitioned config on job"
f" '{job_def.name}'"
),
):
all_partition_keys = partitions_def.get_partition_keys(
dynamic_partitions_store=instance
)
partition_keys = [key for key in all_partition_keys if key in partition_names]

partition_data = []
for key in partition_keys:

def _error_message_fn(partition_name: str):
return lambda: (
"Error occurred during the partition config and tag generation for"
f" '{partition_name}' in partitioned config on job '{job_def.name}'"
)

with user_code_error_boundary(PartitionExecutionError, _error_message_fn(key)):
run_config = partitioned_config.get_run_config_for_partition_key(key)
tags = partitioned_config.get_tags_for_partition_key(key, job_name=job_def.name)

partition_data.append(
PartitionExecutionParamSnap(
name=key,
tags=tags,
run_config=run_config,
)
partition_data = []
for key in partition_names:

def _error_message_fn(partition_name: str):
return lambda: (
"Error occurred during the partition config and tag generation for"
f" '{partition_name}' in partitioned config on job '{job_def.name}'"
)

return PartitionSetExecutionParamSnap(partition_data=partition_data)
with user_code_error_boundary(PartitionExecutionError, _error_message_fn(key)):
run_config = partitioned_config.get_run_config_for_partition_key(key)
tags = partitioned_config.get_tags_for_partition_key(key, job_name=job_def.name)

partition_data.append(
PartitionExecutionParamSnap(
name=key,
tags=tags,
run_config=run_config,
)
)

return PartitionSetExecutionParamSnap(partition_data=partition_data)
except Exception:
return PartitionExecutionErrorSnap(
error=serializable_error_info_from_exc_info(sys.exc_info())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,9 @@ def test_dynamic_partition_set_grpc(instance: DagsterInstance):
instance=instance,
)
assert isinstance(data, PartitionSetExecutionParamSnap)
assert data.partition_data == []

# non existant partitions can still return snapshots
assert len(data.partition_data) == 1


def test_external_partition_tags_grpc_backcompat_no_job_name(instance: DagsterInstance):
Expand Down

0 comments on commit 90231c7

Please sign in to comment.