Skip to content

Commit

Permalink
Fix per-location permissions checks for asset backfills using the "pa…
Browse files Browse the repository at this point in the history
…rtitionsByAssets" parameter (#22329)

Summary:
Asset backfills were incorrectly only letting you submit backfills with
the partitionsByAssets field set if you were a deployment-wide Launcher,
unlike the other codepaths which correctly look at the specific assets
being backfilled and checking per-location permissions.

Test Plan: New test case that was failing before
  • Loading branch information
gibsondan authored Jun 6, 2024
1 parent 521bf24 commit bfd22f9
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,16 @@ def create_and_launch_partition_backfill(
raise DagsterError("fromFailure is not supported for pure asset backfills")

asset_graph = graphene_info.context.asset_graph

partitions_by_assets = [
PartitionsByAssetSelector.from_graphql_input(partitions_by_asset_selector)
for partitions_by_asset_selector in partitions_by_assets
]

selected_assets = list({selector.asset_key for selector in partitions_by_assets})

assert_permission_for_asset_graph(
graphene_info, asset_graph, asset_selection, Permissions.LAUNCH_PARTITION_BACKFILL
graphene_info, asset_graph, selected_assets, Permissions.LAUNCH_PARTITION_BACKFILL
)
backfill = PartitionBackfill.from_partitions_by_assets(
backfill_id=backfill_id,
Expand All @@ -247,10 +255,7 @@ def create_and_launch_partition_backfill(
asset_graph,
utc_datetime_from_timestamp(backfill_timestamp),
),
partitions_by_assets=[
PartitionsByAssetSelector.from_graphql_input(partitions_by_asset_selector)
for partitions_by_asset_selector in partitions_by_assets
],
partitions_by_assets=partitions_by_assets,
title=backfill_params.get("title"),
description=backfill_params.get("description"),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,27 @@ def test_launch_asset_backfill_read_only_context():
== "UnauthorizedError"
)

launch_backfill_result = execute_dagster_graphql(
read_only_context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"partitionsByAssets": [
{
"assetKey": key.to_graphql_input(),
"partitions": {"range": {"start": "a", "end": "b"}},
}
for key in all_asset_keys
]
}
},
)

assert (
launch_backfill_result.data["launchPartitionBackfill"]["__typename"]
== "UnauthorizedError"
)

location_name = main_repo_location_name()

# context with per-location permissions on the specific location succeeds
Expand Down Expand Up @@ -222,6 +243,30 @@ def test_launch_asset_backfill_read_only_context():
== "LaunchBackfillSuccess"
)

launch_backfill_result = execute_dagster_graphql(
read_only_context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"partitionsByAssets": [
{
"assetKey": key.to_graphql_input(),
"partitions": {"range": {"start": "a", "end": "b"}},
}
for key in all_asset_keys
]
}
},
)

assert launch_backfill_result
assert launch_backfill_result.data

assert (
launch_backfill_result.data["launchPartitionBackfill"]["__typename"]
== "LaunchBackfillSuccess"
)

# assets that aren't in the asset graph at all fail permissions check
# because they can't be mapped to a particular code location
launch_backfill_result = execute_dagster_graphql(
Expand All @@ -242,6 +287,29 @@ def test_launch_asset_backfill_read_only_context():
== "UnauthorizedError"
)

launch_backfill_result = execute_dagster_graphql(
read_only_context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"partitionsByAssets": [
{
"assetKey": {"path": ["doesnot", "exist"]},
"partitions": {"range": {"start": "a", "end": "b"}},
}
]
}
},
)

assert launch_backfill_result
assert launch_backfill_result.data

assert (
launch_backfill_result.data["launchPartitionBackfill"]["__typename"]
== "UnauthorizedError"
)


def test_launch_asset_backfill_all_partitions():
repo = get_repo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ def to_graphql_input(self):
}

@staticmethod
def from_graphql_input(graphql_data):
def from_graphql_input(graphql_data) -> "PartitionsByAssetSelector":
asset_key = graphql_data["assetKey"]
partitions = graphql_data.get("partitions")
return PartitionsByAssetSelector(
Expand Down

0 comments on commit bfd22f9

Please sign in to comment.