Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backfill retries #23679

Merged
merged 12 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
cancel_partition_backfill as cancel_partition_backfill,
create_and_launch_partition_backfill as create_and_launch_partition_backfill,
resume_partition_backfill as resume_partition_backfill,
retry_partition_backfill as retry_partition_backfill,
)
from dagster_graphql.implementation.utils import assert_permission, assert_permission_for_location

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@
)
from dagster._core.events import AssetKey
from dagster._core.execution.asset_backfill import create_asset_backfill_data_from_asset_partitions
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.backfill import (
BULK_ACTION_TERMINAL_STATUSES,
BulkActionStatus,
PartitionBackfill,
)
from dagster._core.execution.job_backfill import submit_backfill_runs
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.remote_representation.external_data import PartitionExecutionErrorSnap
from dagster._core.storage.tags import PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG
from dagster._core.utils import make_new_backfill_id
from dagster._core.workspace.permissions import Permissions
from dagster._time import datetime_from_timestamp, get_current_timestamp
Expand Down Expand Up @@ -340,3 +346,82 @@ def resume_partition_backfill(

graphene_info.context.instance.update_backfill(backfill.with_status(BulkActionStatus.REQUESTED))
return GrapheneResumeBackfillSuccess(backfill_id=backfill_id)


def retry_partition_backfill(
graphene_info: "ResolveInfo", backfill_id: str, strategy: str
) -> "GrapheneLaunchBackfillSuccess":
from dagster_graphql.schema.backfill import GrapheneLaunchBackfillSuccess

backfill = graphene_info.context.instance.get_backfill(backfill_id)
from_failure = ReexecutionStrategy(strategy) == ReexecutionStrategy.FROM_FAILURE
if not backfill:
check.failed(f"No backfill found for id: {backfill_id}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might need to add a GrapheneBackfillNotFound output type. will look into

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If backfills can only be retried from the UI hitting this seems unlikely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the other backfill actions have this too, and i figured it doesn't hurt to have here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be to make it more in line w the run re-execution types which have GrapheneRunNotFound as a potential return type. i dont think it's really that necessary here though


if backfill.status not in BULK_ACTION_TERMINAL_STATUSES:
raise DagsterInvariantViolationError(
f"Cannot re-execute backfill {backfill_id} because it is still in progress."
)

if backfill.is_asset_backfill:
asset_backfill_data = backfill.get_asset_backfill_data(graphene_info.context.asset_graph)
assets_to_request = asset_backfill_data.target_subset
jamiedemaria marked this conversation as resolved.
Show resolved Hide resolved
if from_failure:
# determine the subset that should be retried by removing the successfully materialized subset from
# the target subset. This ensures that if the backfill was canceled or marked failed that all
# non-materialized asset partitions will be retried. asset_backfill_data.failed_and_downstream_asset
# only contains asset partitions who's materialization runs failed and their downsteam assets, not
# asset partitions that never got materialization runs.
assets_to_request = assets_to_request - asset_backfill_data.materialized_subset
if assets_to_request.num_partitions_and_non_partitioned_assets == 0:
raise DagsterInvariantViolationError(
"Cannot re-execute from failure an asset backfill that has no missing materializations."
)
asset_graph = graphene_info.context.asset_graph
assert_permission_for_asset_graph(
graphene_info,
asset_graph,
list(assets_to_request.asset_keys),
Permissions.LAUNCH_PARTITION_BACKFILL,
)

new_backfill = PartitionBackfill.from_asset_graph_subset(
backfill_id=make_new_backfill_id(),
asset_graph_subset=assets_to_request,
dynamic_partitions_store=graphene_info.context.instance,
tags={
**backfill.tags,
PARENT_BACKFILL_ID_TAG: backfill.backfill_id,
ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id),
},
backfill_timestamp=get_current_timestamp(),
title=f"Re-execution of {backfill.title}" if backfill.title else None,
description=backfill.description,
)
else: # job backfill
partition_set_origin = check.not_none(backfill.partition_set_origin)
location_name = partition_set_origin.selector.location_name
assert_permission_for_location(
graphene_info, Permissions.LAUNCH_PARTITION_BACKFILL, location_name
)

new_backfill = PartitionBackfill(
backfill_id=make_new_backfill_id(),
partition_set_origin=backfill.partition_set_origin,
status=BulkActionStatus.REQUESTED,
partition_names=backfill.partition_names,
from_failure=from_failure,
reexecution_steps=backfill.reexecution_steps,
jamiedemaria marked this conversation as resolved.
Show resolved Hide resolved
tags={
**backfill.tags,
PARENT_BACKFILL_ID_TAG: backfill.backfill_id,
ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id),
},
backfill_timestamp=get_current_timestamp(),
asset_selection=backfill.asset_selection,
title=f"Re-execution of {backfill.title}" if backfill.title else None,
description=backfill.description,
)

graphene_info.context.instance.add_backfill(new_backfill)
return GrapheneLaunchBackfillSuccess(backfill_id=new_backfill.backfill_id)
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
cancel_partition_backfill,
create_and_launch_partition_backfill,
resume_partition_backfill,
retry_partition_backfill,
)
from dagster_graphql.implementation.execution.dynamic_partitions import (
add_dynamic_partition,
Expand Down Expand Up @@ -350,7 +351,7 @@ def mutate(self, graphene_info: ResolveInfo, backfillId: str):


class GrapheneResumeBackfillMutation(graphene.Mutation):
"""Retries a set of partition backfill runs."""
"""Resumes a set of partition backfill runs. Resuming a backfill will not retry any failed runs."""

Output = graphene.NonNull(GrapheneResumeBackfillResult)

Expand All @@ -366,6 +367,31 @@ def mutate(self, graphene_info: ResolveInfo, backfillId: str):
return resume_partition_backfill(graphene_info, backfillId)


class GrapheneReexecuteBackfillMutation(graphene.Mutation):
"""Retries a set of partition backfill runs. Retrying a backfill will create a new backfill to retry any failed partitions."""

Output = graphene.NonNull(GrapheneLaunchBackfillResult)

class Arguments:
reexecutionParams = graphene.Argument(GrapheneReexecutionParams)

class Meta:
name = "RetryBackfillMutation"
jamiedemaria marked this conversation as resolved.
Show resolved Hide resolved

@capture_error
@require_permission_check(Permissions.LAUNCH_PARTITION_BACKFILL)
def mutate(
self,
graphene_info: ResolveInfo,
reexecutionParams: GrapheneReexecutionParams,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it could make sense to make a backfill-specific version of GrapheneReexecutionParams that would take parentBackfillId and strategy where strategy can be FROM_FAILURE or ALL. i will make that update

):
return retry_partition_backfill(
graphene_info,
backfill_id=reexecutionParams["parentRunId"],
strategy=reexecutionParams["strategy"],
)


class GrapheneAddDynamicPartitionMutation(graphene.Mutation):
"""Adds a partition to a dynamic partition set."""

Expand Down Expand Up @@ -981,6 +1007,7 @@ class Meta:
reportRunlessAssetEvents = GrapheneReportRunlessAssetEventsMutation.Field()
launchPartitionBackfill = GrapheneLaunchBackfillMutation.Field()
resumePartitionBackfill = GrapheneResumeBackfillMutation.Field()
reexecutePartitionBackfill = GrapheneReexecuteBackfillMutation.Field()
cancelPartitionBackfill = GrapheneCancelBackfillMutation.Field()
logTelemetry = GrapheneLogTelemetryMutation.Field()
setNuxSeen = GrapheneSetNuxSeenMutation.Field()
Expand Down
Loading