From bfd22f91476e47752c767fc30027d0e70358ea38 Mon Sep 17 00:00:00 2001 From: gibsondan Date: Thu, 6 Jun 2024 12:40:11 -0500 Subject: [PATCH] Fix per-location permissions checks for asset backfills using the "partitionsByAssets" parameter (#22329) Summary: Asset backfills were incorrectly only letting you submit backfills with the partitionsByAssets field set if you were a deployment-wide Launcher, unlike the other codepaths which correctly look at the specific assets being backfilled and checking per-location permissions. Test Plan: New test case that was failing before --- .../implementation/execution/backfill.py | 15 ++-- .../graphql/test_asset_backfill.py | 68 +++++++++++++++++++ .../dagster/_core/definitions/selector.py | 2 +- 3 files changed, 79 insertions(+), 6 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index be2cb61e33460..344eee9ae2a37 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -234,8 +234,16 @@ def create_and_launch_partition_backfill( raise DagsterError("fromFailure is not supported for pure asset backfills") asset_graph = graphene_info.context.asset_graph + + partitions_by_assets = [ + PartitionsByAssetSelector.from_graphql_input(partitions_by_asset_selector) + for partitions_by_asset_selector in partitions_by_assets + ] + + selected_assets = list({selector.asset_key for selector in partitions_by_assets}) + assert_permission_for_asset_graph( - graphene_info, asset_graph, asset_selection, Permissions.LAUNCH_PARTITION_BACKFILL + graphene_info, asset_graph, selected_assets, Permissions.LAUNCH_PARTITION_BACKFILL ) backfill = PartitionBackfill.from_partitions_by_assets( backfill_id=backfill_id, @@ -247,10 +255,7 @@ def create_and_launch_partition_backfill( asset_graph, utc_datetime_from_timestamp(backfill_timestamp), ), - partitions_by_assets=[ - PartitionsByAssetSelector.from_graphql_input(partitions_by_asset_selector) - for partitions_by_asset_selector in partitions_by_assets - ], + partitions_by_assets=partitions_by_assets, title=backfill_params.get("title"), description=backfill_params.get("description"), ) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py index cd8639e9ca25a..3259460cd1ac6 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py @@ -192,6 +192,27 @@ def test_launch_asset_backfill_read_only_context(): == "UnauthorizedError" ) + launch_backfill_result = execute_dagster_graphql( + read_only_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionsByAssets": [ + { + "assetKey": key.to_graphql_input(), + "partitions": {"range": {"start": "a", "end": "b"}}, + } + for key in all_asset_keys + ] + } + }, + ) + + assert ( + launch_backfill_result.data["launchPartitionBackfill"]["__typename"] + == "UnauthorizedError" + ) + location_name = main_repo_location_name() # context with per-location permissions on the specific location succeeds @@ -222,6 +243,30 @@ def test_launch_asset_backfill_read_only_context(): == "LaunchBackfillSuccess" ) + launch_backfill_result = execute_dagster_graphql( + read_only_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionsByAssets": [ + { + "assetKey": key.to_graphql_input(), + "partitions": {"range": {"start": "a", "end": "b"}}, + } + for key in all_asset_keys + ] + } + }, + ) + + assert launch_backfill_result + assert launch_backfill_result.data + + assert ( + launch_backfill_result.data["launchPartitionBackfill"]["__typename"] + == "LaunchBackfillSuccess" + ) + # assets that aren't in the asset graph at all fail permissions check # because they can't be mapped to a particular code location launch_backfill_result = execute_dagster_graphql( @@ -242,6 +287,29 @@ def test_launch_asset_backfill_read_only_context(): == "UnauthorizedError" ) + launch_backfill_result = execute_dagster_graphql( + read_only_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionsByAssets": [ + { + "assetKey": {"path": ["doesnot", "exist"]}, + "partitions": {"range": {"start": "a", "end": "b"}}, + } + ] + } + }, + ) + + assert launch_backfill_result + assert launch_backfill_result.data + + assert ( + launch_backfill_result.data["launchPartitionBackfill"]["__typename"] + == "UnauthorizedError" + ) + def test_launch_asset_backfill_all_partitions(): repo = get_repo() diff --git a/python_modules/dagster/dagster/_core/definitions/selector.py b/python_modules/dagster/dagster/_core/definitions/selector.py index 751c744e2635a..f3c77e6344468 100644 --- a/python_modules/dagster/dagster/_core/definitions/selector.py +++ b/python_modules/dagster/dagster/_core/definitions/selector.py @@ -403,7 +403,7 @@ def to_graphql_input(self): } @staticmethod - def from_graphql_input(graphql_data): + def from_graphql_input(graphql_data) -> "PartitionsByAssetSelector": asset_key = graphql_data["assetKey"] partitions = graphql_data.get("partitions") return PartitionsByAssetSelector(