Skip to content

Commit

Permalink
Early check for invalid partition keys on launching a partition backf…
Browse files Browse the repository at this point in the history
…ill (#25107)

## Summary & Motivation

This adds an early check for on the partition keys that are supplied
during a backfill mutation call. If an invalid partition key is found, a
`PartitionKeysNotFoundError` is returned and the backfill is not
scheduled for execution.

Contributes to #23801

## How I Tested These Changes

Unit tests

## Changelog

If a `LaunchPartitionBackfill` mutation is submitted to GQL with invalid partition keys, it will return an early `PartitionKeysNotFoundError`.

- [x] `NEW` _(added new feature or capability)_
- [ ] `BUGFIX` _(fixed a bug)_
- [ ] `DOCS` _(added or updated documentation)_
  • Loading branch information
marijncv authored Oct 8, 2024
1 parent d8b2cb3 commit 71b739d
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 18 deletions.
2 changes: 1 addition & 1 deletion js_modules/dagster-ui/packages/ui-core/client.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ export const LAUNCH_PARTITION_BACKFILL_MUTATION = gql`
... on PartitionSetNotFoundError {
message
}
... on PartitionKeysNotFoundError {
message
}
... on InvalidStepError {
invalidStepKey
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@
... on PartitionSetNotFoundError {
message
}
... on PartitionKeysNotFoundError {
message
}
... on LaunchBackfillSuccess {
backfillId
launchedRunIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
BackfillParams,
assert_permission_for_asset_graph,
assert_permission_for_location,
assert_valid_asset_partition_backfill,
assert_valid_job_partition_backfill,
)

BACKFILL_CHUNK_SIZE = 25
Expand Down Expand Up @@ -85,6 +87,14 @@ def create_and_launch_partition_backfill(
from dagster_graphql.schema.errors import GraphenePartitionSetNotFoundError

backfill_id = make_new_backfill_id()
backfill_timestamp = get_current_timestamp()
backfill_datetime = datetime_from_timestamp(backfill_timestamp)
dynamic_partitions_store = CachingInstanceQueryer(
instance=graphene_info.context.instance,
asset_graph=graphene_info.context.asset_graph,
loading_context=graphene_info.context,
evaluation_time=backfill_datetime,
)

asset_selection = (
[
Expand All @@ -109,8 +119,6 @@ def create_and_launch_partition_backfill(

tags = {**tags, **graphene_info.context.get_viewer_tags()}

backfill_timestamp = get_current_timestamp()

if backfill_params.get("selector") is not None: # job backfill
partition_set_selector = backfill_params["selector"]
partition_set_name = partition_set_selector.get("partitionSetName")
Expand Down Expand Up @@ -172,6 +180,13 @@ def create_and_launch_partition_backfill(
title=backfill_params.get("title"),
description=backfill_params.get("description"),
)
assert_valid_job_partition_backfill(
graphene_info,
backfill,
external_partition_set.get_partitions_definition(),
dynamic_partitions_store,
backfill_datetime,
)

if backfill_params.get("forceSynchronousSubmission"):
# should only be used in a test situation
Expand Down Expand Up @@ -216,16 +231,17 @@ def create_and_launch_partition_backfill(
backfill_timestamp=backfill_timestamp,
asset_selection=asset_selection,
partition_names=backfill_params.get("partitionNames"),
dynamic_partitions_store=CachingInstanceQueryer(
instance=graphene_info.context.instance,
asset_graph=asset_graph,
loading_context=graphene_info.context,
evaluation_time=datetime_from_timestamp(backfill_timestamp),
),
dynamic_partitions_store=dynamic_partitions_store,
all_partitions=backfill_params.get("allPartitions", False),
title=backfill_params.get("title"),
description=backfill_params.get("description"),
)
assert_valid_asset_partition_backfill(
graphene_info,
backfill,
dynamic_partitions_store,
backfill_datetime,
)
elif partitions_by_assets is not None:
if backfill_params.get("forceSynchronousSubmission"):
raise DagsterError(
Expand All @@ -252,16 +268,18 @@ def create_and_launch_partition_backfill(
asset_graph=asset_graph,
backfill_timestamp=backfill_timestamp,
tags=tags,
dynamic_partitions_store=CachingInstanceQueryer(
instance=graphene_info.context.instance,
asset_graph=asset_graph,
loading_context=graphene_info.context,
evaluation_time=datetime_from_timestamp(backfill_timestamp),
),
dynamic_partitions_store=dynamic_partitions_store,
partitions_by_assets=partitions_by_assets,
title=backfill_params.get("title"),
description=backfill_params.get("description"),
)
assert_valid_asset_partition_backfill(
graphene_info,
backfill,
dynamic_partitions_store,
backfill_datetime,
)

else:
raise DagsterError(
"Backfill requested without specifying partition set selector or asset selection"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
from contextlib import contextmanager
from contextvars import ContextVar
from datetime import datetime
from types import TracebackType
from typing import (
TYPE_CHECKING,
Expand All @@ -25,9 +26,12 @@
import dagster._check as check
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph
from dagster._core.definitions.selector import GraphSelector, JobSubsetSelector
from dagster._core.execution.backfill import PartitionBackfill
from dagster._core.workspace.context import BaseWorkspaceRequestContext
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer
from dagster._utils.error import serializable_error_info_from_exc_info
from typing_extensions import ParamSpec, TypeAlias

Expand Down Expand Up @@ -131,6 +135,60 @@ def assert_permission_for_asset_graph(
raise UserFacingGraphQLError(GrapheneUnauthorizedError())


def assert_valid_job_partition_backfill(
graphene_info: "ResolveInfo",
backfill: PartitionBackfill,
partitions_def: PartitionsDefinition,
dynamic_partitions_store: CachingInstanceQueryer,
backfill_datetime: datetime,
) -> None:
from dagster_graphql.schema.errors import GraphenePartitionKeysNotFoundError

partition_names = backfill.get_partition_names(graphene_info.context)

if not partition_names:
return

invalid_keys = set(partition_names) - set(
partitions_def.get_partition_keys(backfill_datetime, dynamic_partitions_store)
)

if invalid_keys:
raise UserFacingGraphQLError(GraphenePartitionKeysNotFoundError(invalid_keys))


def assert_valid_asset_partition_backfill(
graphene_info: "ResolveInfo",
backfill: PartitionBackfill,
dynamic_partitions_store: CachingInstanceQueryer,
backfill_datetime: datetime,
) -> None:
from dagster_graphql.schema.errors import GraphenePartitionKeysNotFoundError

asset_graph = graphene_info.context.asset_graph
asset_backfill_data = backfill.asset_backfill_data

if not asset_backfill_data:
return

partition_subset_by_asset_key = (
asset_backfill_data.target_subset.partitions_subsets_by_asset_key
)

for asset_key, partition_subset in partition_subset_by_asset_key.items():
partitions_def = asset_graph.get(asset_key).partitions_def

if not partitions_def:
continue

invalid_keys = set(partition_subset.get_partition_keys()) - set(
partitions_def.get_partition_keys(backfill_datetime, dynamic_partitions_store)
)

if invalid_keys:
raise UserFacingGraphQLError(GraphenePartitionKeysNotFoundError(invalid_keys))


def _noop(_) -> None:
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
GrapheneInvalidOutputError,
GrapheneInvalidStepError,
GrapheneInvalidSubsetError,
GraphenePartitionKeysNotFoundError,
GraphenePartitionSetNotFoundError,
GraphenePipelineNotFoundError,
GraphenePythonError,
Expand Down Expand Up @@ -83,6 +84,7 @@ class Meta:
types = (
GrapheneLaunchBackfillSuccess,
GraphenePartitionSetNotFoundError,
GraphenePartitionKeysNotFoundError,
) + pipeline_execution_error_types
name = "LaunchBackfillResult"

Expand Down
17 changes: 16 additions & 1 deletion python_modules/dagster-graphql/dagster_graphql/schema/errors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import List, Optional, Set

import dagster._check as check
import graphene
Expand Down Expand Up @@ -451,6 +451,21 @@ def __init__(self, partition_set_name):
self.message = f"Partition set {self.partition_set_name} could not be found."


class GraphenePartitionKeysNotFoundError(graphene.ObjectType):
class Meta:
interfaces = (GrapheneError,)
name = "PartitionKeysNotFoundError"

partition_keys = non_null_list(graphene.String)

def __init__(self, partition_keys: Set[str]):
super().__init__()
self.partition_keys = check.list_param(
sorted(partition_keys), "partition_keys", of_type=str
)
self.message = f"Partition keys `{self.partition_keys}` could not be found."


class GrapheneRepositoryNotFoundError(graphene.ObjectType):
class Meta:
interfaces = (GrapheneError,)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,33 @@ def test_launch_asset_backfill():
)


def test_launch_asset_backfill_with_nonexistent_partition_key():
repo = get_repo()
all_asset_keys = repo.asset_graph.materializable_asset_keys

with instance_for_test() as instance:
with define_out_of_process_context(__file__, "get_repo", instance) as context:
# launchPartitionBackfill
launch_backfill_result = execute_dagster_graphql(
context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"partitionNames": ["a", "nonexistent1", "nonexistent2"],
"assetSelection": [key.to_graphql_input() for key in all_asset_keys],
}
},
)
assert (
launch_backfill_result.data["launchPartitionBackfill"]["__typename"]
== "PartitionKeysNotFoundError"
)
assert (
"Partition keys `['nonexistent1', 'nonexistent2']` could not be found"
in launch_backfill_result.data["launchPartitionBackfill"]["message"]
)


def test_remove_partitions_defs_after_backfill_backcompat():
repo = get_repo()
all_asset_keys = repo.asset_graph.materializable_asset_keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,31 @@ def test_set_title_and_description_for_backfill_invalid_title(self, graphql_cont
in result.data["launchPartitionBackfill"]["message"]
)

def test_asset_job_backfill_with_nonexistent_partition_key(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
# launch a backfill for this partition set
launch_result = execute_dagster_graphql(
graphql_context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"selector": {
"repositorySelector": repository_selector,
"partitionSetName": "integers_partition_set",
},
"partitionNames": ["1", "nonexistent1", "nonexistent2"],
}
},
)
assert (
launch_result.data["launchPartitionBackfill"]["__typename"]
== "PartitionKeysNotFoundError"
)
assert (
"Partition keys `['nonexistent1', 'nonexistent2']` could not be found"
in launch_result.data["launchPartitionBackfill"]["message"]
)


class TestLaunchDaemonBackfillFromFailure(ExecutingGraphQLContextTestMatrix):
def test_launch_from_failure(self, graphql_context):
Expand Down

1 comment on commit 71b739d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-ixpdmob0q-elementl.vercel.app

Built with commit 71b739d.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.