Skip to content

Commit

Permalink
testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Aug 15, 2024
1 parent b927649 commit b0aa0c5
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,9 @@ def retry_partition_backfill(
)

if backfill.is_asset_backfill:
asset_backfill_data = backfill.get_asset_backfill_data()
if (
backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
== 0
):
raise DagsterInvariantViolationError(
Expand All @@ -352,13 +353,13 @@ def retry_partition_backfill(
assert_permission_for_asset_graph(
graphene_info,
asset_graph,
backfill.asset_backfill_data.failed_and_downstream_subset.asset_keys,
list(asset_backfill_data.failed_and_downstream_subset.asset_keys),
Permissions.LAUNCH_PARTITION_BACKFILL,
)

new_backfill = PartitionBackfill.from_asset_graph_subset(
backfill_id=make_new_backfill_id(),
asset_graph_subset=backfill.asset_backfill_data.failed_and_downstream_subset,
asset_graph_subset=asset_backfill_data.failed_and_downstream_subset,
dynamic_partitions_store=graphene_info.context.instance,
tags={
**backfill.tags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1615,19 +1615,26 @@ def test_retry_asset_backfill_twice(self, graphql_context):
assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id
assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id

_execute_asset_backfill_iteration_no_side_effects(
graphql_context, retried_backfill.backfill_id, asset_graph
)
# mark some partitions failed so we can retry again
_mock_asset_backfill_runs(
graphql_context,
AssetKey("upstream_daily_partitioned_asset"),
asset_graph,
backfill_id,
retried_backfill.backfill_id,
DagsterRunStatus.FAILURE,
"2023-01-09",
)
_execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph)
_execute_asset_backfill_iteration_no_side_effects(
graphql_context, retried_backfill.backfill_id, asset_graph
)

# refetch the backfill to get the updated statuses of all assets
retried_backfill = graphql_context.instance.get_backfill(retry_backfill_id)
graphql_context.instance.update_backfill(
retried_backfill.with_status(BulkActionStatus.FAILED)
retried_backfill.with_status(BulkActionStatus.COMPLETED)
)

result = execute_dagster_graphql(
Expand All @@ -1644,10 +1651,11 @@ def test_retry_asset_backfill_twice(self, graphql_context):
seconrd_retry_backfill_id = result.data["retryPartitionBackfill"]["backfillId"]

second_retried_backfill = graphql_context.instance.get_backfill(seconrd_retry_backfill_id)
retried_backfill = graphql_context.instance.get_backfill(retry_backfill_id)

assert (
second_retried_backfill.asset_backfill_data.failed_and_downstream_subset
== retried_backfill.asset_backfill_data.target_subset
retried_backfill.asset_backfill_data.failed_and_downstream_subset
== second_retried_backfill.asset_backfill_data.target_subset
)
assert second_retried_backfill.tags.get(PARENT_RUN_ID_TAG) == retry_backfill_id
assert second_retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id
Expand Down Expand Up @@ -1685,6 +1693,9 @@ def test_retry_job_backfill(self, graphql_context):
assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess"
backfill_id = result.data["launchPartitionBackfill"]["backfillId"]

backfill = graphql_context.instance.get_backfill(backfill_id)
graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))

# re-execute from failure (without the failure file)
result = execute_dagster_graphql(
graphql_context,
Expand Down Expand Up @@ -1713,3 +1724,184 @@ def test_retry_job_backfill(self, graphql_context):
assert result.data["partitionBackfillOrError"]["numCancelable"] == 2
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2
assert result.data["partitionBackfillOrError"]["fromFailure"]

def test_retry_in_progress_job_backfill(self, graphql_context):
# TestLaunchDaemonBackfillFromFailure::test_retry_in_progress_job_backfill[sqlite_with_default_run_launcher_managed_grpc_env]
repository_selector = infer_repository_selector(graphql_context)
partition_set_selector = {
"repositorySelector": repository_selector,
"partitionSetName": "chained_failure_job_partition_set",
}

# trigger failure in the conditionally_fail solid

output_file = os.path.join(
get_system_temp_directory(), "chained_failure_pipeline_conditionally_fail"
)
try:
with open(output_file, "w", encoding="utf8"):
result = execute_dagster_graphql_and_finish_runs(
graphql_context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"selector": partition_set_selector,
"partitionNames": ["2", "3"],
}
},
)
finally:
os.remove(output_file)

assert not result.errors
assert result.data
assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess"
backfill_id = result.data["launchPartitionBackfill"]["backfillId"]

result = execute_dagster_graphql(
graphql_context,
RETRY_BACKFILL_MUTATION,
variables={"backfillId": backfill_id},
)

assert not result.errors
assert result.data
assert result.data["retryPartitionBackfill"]["__typename"] == "PythonError"
assert "still in progress" in result.data["retryPartitionBackfill"]["message"]

# mark backfill as complete and confirm that we can retry it
backfill = graphql_context.instance.get_backfill(backfill_id)
graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))

# re-execute from failure (without the failure file)
result = execute_dagster_graphql(
graphql_context,
RETRY_BACKFILL_MUTATION,
variables={"backfillId": backfill_id},
)

assert not result.errors
assert result.data
assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess"
retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"]
retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id)

assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id
assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id

def test_retry_job_backfill_twice(self, graphql_context):
# TestLaunchDaemonBackfillFromFailure::test_retry_in_progress_job_backfill[sqlite_with_default_run_launcher_managed_grpc_env]
repository_selector = infer_repository_selector(graphql_context)
partition_set_selector = {
"repositorySelector": repository_selector,
"partitionSetName": "chained_failure_job_partition_set",
}

# trigger failure in the conditionally_fail solid

output_file = os.path.join(
get_system_temp_directory(), "chained_failure_pipeline_conditionally_fail"
)
with open(output_file, "w", encoding="utf8"):
result = execute_dagster_graphql_and_finish_runs(
graphql_context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"selector": partition_set_selector,
"partitionNames": ["2", "3"],
}
},
)

assert not result.errors
assert result.data
assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess"
backfill_id = result.data["launchPartitionBackfill"]["backfillId"]

# mark backfill as complete and confirm that we can retry it
backfill = graphql_context.instance.get_backfill(backfill_id)
graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))
try:
with open(output_file, "w", encoding="utf8"):
# re-execute from failure (still with the failure file)
result = execute_dagster_graphql(
graphql_context,
RETRY_BACKFILL_MUTATION,
variables={"backfillId": backfill_id},
)
finally:
os.remove(output_file)

assert not result.errors
assert result.data
assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess"
retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"]
retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id)

assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id
assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id

graphql_context.instance.update_backfill(
retried_backfill.with_status(BulkActionStatus.COMPLETED)
)

# re-execute from failure (without the failure file)
result = execute_dagster_graphql(
graphql_context,
RETRY_BACKFILL_MUTATION,
variables={"backfillId": retried_backfill.backfill_id},
)

assert not result.errors
assert result.data
assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess"
second_retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"]
second_retried_backfill = graphql_context.instance.get_backfill(second_retried_backfill_id)

assert second_retried_backfill.tags.get(PARENT_RUN_ID_TAG) == retried_backfill_id
assert second_retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id

def test_retry_successful_job_backfill(self, graphql_context):
# TestLaunchDaemonBackfillFromFailure::test_retry_successful_job_backfill[sqlite_with_default_run_launcher_managed_grpc_env]
repository_selector = infer_repository_selector(graphql_context)
partition_set_selector = {
"repositorySelector": repository_selector,
"partitionSetName": "chained_failure_job_partition_set",
}

result = execute_dagster_graphql_and_finish_runs(
graphql_context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"selector": partition_set_selector,
"partitionNames": ["2", "3"],
}
},
)

assert not result.errors
assert result.data
assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess"
backfill_id = result.data["launchPartitionBackfill"]["backfillId"]

# mark backfill as complete
backfill = graphql_context.instance.get_backfill(backfill_id)
graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))

result = execute_dagster_graphql(
graphql_context,
RETRY_BACKFILL_MUTATION,
variables={"backfillId": backfill_id},
)
# Unlike asset backfills, we don't currently have a way to see if a job backfill has runs that
# failed without querying the DB. So we always allow retries
assert not result.errors
assert result.data
assert result.data["retryPartitionBackfill"]["__typename"] == "RetryBackfillSuccess"
retried_backfill_id = result.data["retryPartitionBackfill"]["backfillId"]
retried_backfill = graphql_context.instance.get_backfill(retried_backfill_id)

assert retried_backfill.tags.get(PARENT_RUN_ID_TAG) == backfill_id
assert retried_backfill.tags.get(ROOT_RUN_ID_TAG) == backfill_id

0 comments on commit b0aa0c5

Please sign in to comment.