Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backfill re-execution #25137

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
from dagster_graphql.implementation.execution.backfill import (
cancel_partition_backfill as cancel_partition_backfill,
create_and_launch_partition_backfill as create_and_launch_partition_backfill,
reexecute_partition_backfill as reexecute_partition_backfill,
resume_partition_backfill as resume_partition_backfill,
retry_partition_backfill as retry_partition_backfill,
)
from dagster_graphql.implementation.utils import assert_permission, assert_permission_for_location

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@
)
from dagster._core.events import AssetKey
from dagster._core.execution.asset_backfill import create_asset_backfill_data_from_asset_partitions
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.backfill import (
BULK_ACTION_TERMINAL_STATUSES,
BulkActionStatus,
PartitionBackfill,
)
from dagster._core.execution.job_backfill import submit_backfill_runs
from dagster._core.remote_representation.external_data import PartitionExecutionErrorSnap
from dagster._core.storage.tags import PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG
from dagster._core.utils import make_new_backfill_id
from dagster._core.workspace.permissions import Permissions
from dagster._time import datetime_from_timestamp, get_current_timestamp
Expand All @@ -34,6 +39,7 @@
GrapheneCancelBackfillSuccess,
GrapheneLaunchBackfillSuccess,
GrapheneResumeBackfillSuccess,
GrapheneRetryBackfillSuccess,
)
from dagster_graphql.schema.errors import GraphenePartitionSetNotFoundError
from dagster_graphql.schema.util import ResolveInfo
Expand Down Expand Up @@ -322,3 +328,142 @@ def resume_partition_backfill(

graphene_info.context.instance.update_backfill(backfill.with_status(BulkActionStatus.REQUESTED))
return GrapheneResumeBackfillSuccess(backfill_id=backfill_id)


def retry_partition_backfill(
graphene_info: "ResolveInfo", backfill_id: str
) -> "GrapheneRetryBackfillSuccess":
from dagster_graphql.schema.backfill import GrapheneRetryBackfillSuccess

backfill = graphene_info.context.instance.get_backfill(backfill_id)
if not backfill:
check.failed(f"No backfill found for id: {backfill_id}")

if backfill.status not in BULK_ACTION_TERMINAL_STATUSES:
raise DagsterInvariantViolationError(
f"Cannot retry backfill {backfill_id} because it is still in progress."
)

if backfill.is_asset_backfill:
asset_backfill_data = backfill.get_asset_backfill_data(graphene_info.context.asset_graph)
# determine the subset that should be retried by removing the successfully materialized subset from
# the target subset. This ensures that if the backfill was canceled or marked failed that all
# non-materialized asset partitions will be retried. asset_backfill_data.failed_and_downstream_asset
# only contains asset partitions who's materialization runs failed and their downsteam assets, not
# asset partitions that never got materialization runs.
not_materialized_assets = (
asset_backfill_data.target_subset - asset_backfill_data.materialized_subset
)
if not_materialized_assets.num_partitions_and_non_partitioned_assets == 0:
raise DagsterInvariantViolationError(
"Cannot retry an asset backfill that has no missing materializations."
)
asset_graph = graphene_info.context.asset_graph
assert_permission_for_asset_graph(
graphene_info,
asset_graph,
list(not_materialized_assets.asset_keys),
Permissions.LAUNCH_PARTITION_BACKFILL,
)

new_backfill = PartitionBackfill.from_asset_graph_subset(
backfill_id=make_new_backfill_id(),
asset_graph_subset=not_materialized_assets,
dynamic_partitions_store=graphene_info.context.instance,
tags={
**backfill.tags,
PARENT_BACKFILL_ID_TAG: backfill.backfill_id,
ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id),
},
backfill_timestamp=get_current_timestamp(),
title=f"Retry of {backfill.title}" if backfill.title else None,
description=backfill.description,
)
else: # job backfill
partition_set_origin = check.not_none(backfill.partition_set_origin)
location_name = partition_set_origin.selector.location_name
assert_permission_for_location(
graphene_info, Permissions.LAUNCH_PARTITION_BACKFILL, location_name
)

new_backfill = PartitionBackfill(
backfill_id=make_new_backfill_id(),
partition_set_origin=backfill.partition_set_origin,
status=BulkActionStatus.REQUESTED,
partition_names=backfill.partition_names,
from_failure=True,
reexecution_steps=backfill.reexecution_steps,
tags={
**backfill.tags,
PARENT_BACKFILL_ID_TAG: backfill.backfill_id,
ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id),
},
backfill_timestamp=get_current_timestamp(),
asset_selection=backfill.asset_selection,
title=f"Retry of {backfill.title}" if backfill.title else None,
description=backfill.description,
)

graphene_info.context.instance.add_backfill(new_backfill)
return GrapheneRetryBackfillSuccess(backfill_id=new_backfill.backfill_id)


def reexecute_partition_backfill(
graphene_info: "ResolveInfo", backfill_id: str
) -> "GrapheneRetryBackfillSuccess":
from dagster_graphql.schema.backfill import GrapheneRetryBackfillSuccess

backfill = graphene_info.context.instance.get_backfill(backfill_id)
if not backfill:
check.failed(f"No backfill found for id: {backfill_id}")

if backfill.is_asset_backfill:
asset_backfill_data = backfill.get_asset_backfill_data(graphene_info.context.asset_graph)
asset_graph = graphene_info.context.asset_graph
assert_permission_for_asset_graph(
graphene_info,
asset_graph,
list(asset_backfill_data.target_subset.asset_keys),
Permissions.LAUNCH_PARTITION_BACKFILL,
)

new_backfill = PartitionBackfill.from_asset_graph_subset(
backfill_id=make_new_backfill_id(),
asset_graph_subset=asset_backfill_data.target_subset,
dynamic_partitions_store=graphene_info.context.instance,
tags={
**backfill.tags,
PARENT_BACKFILL_ID_TAG: backfill.backfill_id,
ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id),
},
backfill_timestamp=get_current_timestamp(),
title=f"Re-execution of {backfill.title}" if backfill.title else None,
description=backfill.description,
)
else: # job backfill
partition_set_origin = check.not_none(backfill.partition_set_origin)
location_name = partition_set_origin.selector.location_name
assert_permission_for_location(
graphene_info, Permissions.LAUNCH_PARTITION_BACKFILL, location_name
)

new_backfill = PartitionBackfill(
backfill_id=make_new_backfill_id(),
partition_set_origin=backfill.partition_set_origin,
status=BulkActionStatus.REQUESTED,
partition_names=backfill.partition_names,
from_failure=False,
reexecution_steps=backfill.reexecution_steps,
tags={
**backfill.tags,
PARENT_BACKFILL_ID_TAG: backfill.backfill_id,
ROOT_BACKFILL_ID_TAG: backfill.tags.get(ROOT_BACKFILL_ID_TAG, backfill.backfill_id),
},
backfill_timestamp=get_current_timestamp(),
asset_selection=backfill.asset_selection,
title=f"Re-execution of {backfill.title}" if backfill.title else None,
description=backfill.description,
)

graphene_info.context.instance.add_backfill(new_backfill)
return GrapheneRetryBackfillSuccess(backfill_id=new_backfill.backfill_id)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from dagster_graphql.implementation.execution.backfill import (
cancel_partition_backfill,
create_and_launch_partition_backfill,
reexecute_partition_backfill,
resume_partition_backfill,
retry_partition_backfill,
)
from dagster_graphql.implementation.execution.dynamic_partitions import (
add_dynamic_partition,
Expand Down Expand Up @@ -350,7 +352,7 @@ def mutate(self, graphene_info: ResolveInfo, backfillId: str):


class GrapheneResumeBackfillMutation(graphene.Mutation):
"""Retries a set of partition backfill runs."""
"""Resumes a set of partition backfill runs. Resuming a backfill will not retry any failed runs."""

Output = graphene.NonNull(GrapheneResumeBackfillResult)

Expand All @@ -366,6 +368,40 @@ def mutate(self, graphene_info: ResolveInfo, backfillId: str):
return resume_partition_backfill(graphene_info, backfillId)


class GrapheneRetryBackfillMutation(graphene.Mutation):
"""Retries a set of partition backfill runs. Retrying a backfill will create a new backfill to retry any failed partitions."""

Output = graphene.NonNull(GrapheneLaunchBackfillResult)

class Arguments:
backfillId = graphene.NonNull(graphene.String)

class Meta:
name = "RetryBackfillMutation"

@capture_error
@require_permission_check(Permissions.LAUNCH_PARTITION_BACKFILL)
def mutate(self, graphene_info: ResolveInfo, backfillId: str):
return retry_partition_backfill(graphene_info, backfillId)


class GrapheneReexecuteBackfillMutation(graphene.Mutation):
"""Re-executes a backfill. Re-executing a backfill will create a new backfill that targets the same partitions as the existing backfill."""

Output = graphene.NonNull(GrapheneLaunchBackfillResult)

class Arguments:
backfillId = graphene.NonNull(graphene.String)

class Meta:
name = "RetryBackfillMutation"

@capture_error
@require_permission_check(Permissions.LAUNCH_PARTITION_BACKFILL)
def mutate(self, graphene_info: ResolveInfo, backfillId: str):
return reexecute_partition_backfill(graphene_info, backfillId)


class GrapheneAddDynamicPartitionMutation(graphene.Mutation):
"""Adds a partition to a dynamic partition set."""

Expand Down Expand Up @@ -981,6 +1017,7 @@ class Meta:
reportRunlessAssetEvents = GrapheneReportRunlessAssetEventsMutation.Field()
launchPartitionBackfill = GrapheneLaunchBackfillMutation.Field()
resumePartitionBackfill = GrapheneResumeBackfillMutation.Field()
retryPartitionBackfill = GrapheneRetryBackfillMutation.Field()
cancelPartitionBackfill = GrapheneCancelBackfillMutation.Field()
logTelemetry = GrapheneLogTelemetryMutation.Field()
setNuxSeen = GrapheneSetNuxSeenMutation.Field()
Expand Down
Loading