Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage endpoint for deleting backfills #25165

Closed
wants to merge 13 commits into from

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
cancel_partition_backfill as cancel_partition_backfill,
create_and_launch_partition_backfill as create_and_launch_partition_backfill,
resume_partition_backfill as resume_partition_backfill,
retry_partition_backfill as retry_partition_backfill,
)
from dagster_graphql.implementation.utils import assert_permission, assert_permission_for_location

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@
)
from dagster._core.events import AssetKey
from dagster._core.execution.asset_backfill import create_asset_backfill_data_from_asset_partitions
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.backfill import (
BULK_ACTION_TERMINAL_STATUSES,
BulkActionStatus,
PartitionBackfill,
)
from dagster._core.execution.job_backfill import submit_backfill_runs
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.remote_representation.external_data import PartitionExecutionErrorSnap
from dagster._core.storage.tags import PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG
from dagster._core.utils import make_new_backfill_id
from dagster._core.workspace.permissions import Permissions
from dagster._time import datetime_from_timestamp, get_current_timestamp
Expand Down Expand Up @@ -340,3 +346,82 @@ def resume_partition_backfill(

graphene_info.context.instance.update_backfill(backfill.with_status(BulkActionStatus.REQUESTED))
return GrapheneResumeBackfillSuccess(backfill_id=backfill_id)


def retry_partition_backfill(
graphene_info: "ResolveInfo", backfill_id: str, strategy: str
) -> "GrapheneLaunchBackfillSuccess":
from dagster_graphql.schema.backfill import GrapheneLaunchBackfillSuccess

backfill = graphene_info.context.instance.get_backfill(backfill_id)
from_failure = ReexecutionStrategy(strategy) == ReexecutionStrategy.FROM_FAILURE
if not backfill:
check.failed(f"No backfill found for id: {backfill_id}")

if backfill.status not in BULK_ACTION_TERMINAL_STATUSES:
raise DagsterInvariantViolationError(
f"Cannot re-execute backfill {backfill_id} because it is still in progress."
)

if backfill.is_asset_backfill:
asset_backfill_data = backfill.get_asset_backfill_data(graphene_info.context.asset_graph)
assets_to_request = asset_backfill_data.target_subset
if from_failure:
# determine the subset that should be retried by removing the successfully materialized subset from
# the target subset. This ensures that if the backfill was canceled or marked failed that all
# non-materialized asset partitions will be retried. asset_backfill_data.failed_and_downstream_asset
# only contains asset partitions who's materialization runs failed and their downsteam assets, not
# asset partitions that never got materialization runs.
assets_to_request = assets_to_request - asset_backfill_data.materialized_subset
if assets_to_request.num_partitions_and_non_partitioned_assets == 0:
raise DagsterInvariantViolationError(
"Cannot re-execute from failure an asset backfill that has no missing materializations."
)
asset_graph = graphene_info.context.asset_graph
assert_permission_for_asset_graph(
graphene_info,
asset_graph,
list(assets_to_request.asset_keys),
Permissions.LAUNCH_PARTITION_BACKFILL,
)

new_backfill = PartitionBackfill.from_asset_graph_subset(
backfill_id=make_new_backfill_id(),
asset_graph_subset=assets_to_request,
dynamic_partitions_store=graphene_info.context.instance,
tags={
**backfill.tags,
PARENT_BACKFILL_ID_TAG: backfill.backfill_id,
ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id),
},
backfill_timestamp=get_current_timestamp(),
title=f"Re-execution of {backfill.title}" if backfill.title else None,
description=backfill.description,
)
else: # job backfill
partition_set_origin = check.not_none(backfill.partition_set_origin)
location_name = partition_set_origin.selector.location_name
assert_permission_for_location(
graphene_info, Permissions.LAUNCH_PARTITION_BACKFILL, location_name
)

new_backfill = PartitionBackfill(
backfill_id=make_new_backfill_id(),
partition_set_origin=backfill.partition_set_origin,
status=BulkActionStatus.REQUESTED,
partition_names=backfill.partition_names,
from_failure=from_failure,
reexecution_steps=backfill.reexecution_steps,
tags={
**backfill.tags,
PARENT_BACKFILL_ID_TAG: backfill.backfill_id,
ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id),
},
backfill_timestamp=get_current_timestamp(),
asset_selection=backfill.asset_selection,
title=f"Re-execution of {backfill.title}" if backfill.title else None,
description=backfill.description,
)

graphene_info.context.instance.add_backfill(new_backfill)
return GrapheneLaunchBackfillSuccess(backfill_id=new_backfill.backfill_id)
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
cancel_partition_backfill,
create_and_launch_partition_backfill,
resume_partition_backfill,
retry_partition_backfill,
)
from dagster_graphql.implementation.execution.dynamic_partitions import (
add_dynamic_partition,
Expand Down Expand Up @@ -350,7 +351,7 @@ def mutate(self, graphene_info: ResolveInfo, backfillId: str):


class GrapheneResumeBackfillMutation(graphene.Mutation):
"""Retries a set of partition backfill runs."""
"""Resumes a set of partition backfill runs. Resuming a backfill will not retry any failed runs."""

Output = graphene.NonNull(GrapheneResumeBackfillResult)

Expand All @@ -366,6 +367,31 @@ def mutate(self, graphene_info: ResolveInfo, backfillId: str):
return resume_partition_backfill(graphene_info, backfillId)


class GrapheneReexecuteBackfillMutation(graphene.Mutation):
"""Retries a set of partition backfill runs. Retrying a backfill will create a new backfill to retry any failed partitions."""

Output = graphene.NonNull(GrapheneLaunchBackfillResult)

class Arguments:
reexecutionParams = graphene.Argument(GrapheneReexecutionParams)

class Meta:
name = "RetryBackfillMutation"

@capture_error
@require_permission_check(Permissions.LAUNCH_PARTITION_BACKFILL)
def mutate(
self,
graphene_info: ResolveInfo,
reexecutionParams: GrapheneReexecutionParams,
):
return retry_partition_backfill(
graphene_info,
backfill_id=reexecutionParams["parentRunId"],
strategy=reexecutionParams["strategy"],
)


class GrapheneAddDynamicPartitionMutation(graphene.Mutation):
"""Adds a partition to a dynamic partition set."""

Expand Down Expand Up @@ -981,6 +1007,7 @@ class Meta:
reportRunlessAssetEvents = GrapheneReportRunlessAssetEventsMutation.Field()
launchPartitionBackfill = GrapheneLaunchBackfillMutation.Field()
resumePartitionBackfill = GrapheneResumeBackfillMutation.Field()
reexecutePartitionBackfill = GrapheneReexecuteBackfillMutation.Field()
cancelPartitionBackfill = GrapheneCancelBackfillMutation.Field()
logTelemetry = GrapheneLogTelemetryMutation.Field()
setNuxSeen = GrapheneSetNuxSeenMutation.Field()
Expand Down
Loading