From 573d477b6ed73cc58076ef5c83abccc94fb6129c Mon Sep 17 00:00:00 2001 From: prha <1040172+prha@users.noreply.github.com> Date: Wed, 17 Jan 2024 09:31:21 -0800 Subject: [PATCH] add mutation to delete concurrency limits (#19205) [INTERNAL_BRANCH=prha/delete_concurrency_limit] ## Summary & Motivation Previously, we interpreted zero concurrency limits as slot deletion. Now that we accept zero as an acceptable limit value, we need an explicit delete concurrency limit storage method. ## How I Tested These Changes BK --- .../ui-core/src/graphql/schema.graphql | 1 + .../packages/ui-core/src/graphql/types.ts | 9 +++++++++ .../src/instance/InstanceConcurrency.tsx | 20 ++++++++++++------- .../types/InstanceConcurrency.types.ts | 9 +++++++++ .../dagster_graphql/schema/roots/mutation.py | 19 ++++++++++++++++++ .../dagster/_core/storage/event_log/base.py | 5 +++++ .../_core/storage/event_log/sql_event_log.py | 18 +++++++++++++++++ .../dagster/_core/storage/legacy_storage.py | 3 +++ .../storage_tests/utils/event_log_storage.py | 4 ++++ 9 files changed, 81 insertions(+), 7 deletions(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index ef31ca56fbaef..c972ea64f7f13 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -3448,6 +3448,7 @@ type Mutation { ): AddDynamicPartitionResult! setAutoMaterializePaused(paused: Boolean!): Boolean! setConcurrencyLimit(concurrencyKey: String!, limit: Int!): Boolean! + deleteConcurrencyLimit(concurrencyKey: String!): Boolean! freeConcurrencySlotsForRun(runId: String!): Boolean! freeConcurrencySlots(runId: String!, stepKey: String): Boolean! } diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 3e6591b45e749..e1d33043bf9a1 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -2081,6 +2081,7 @@ export type Mutation = { __typename: 'Mutation'; addDynamicPartition: AddDynamicPartitionResult; cancelPartitionBackfill: CancelBackfillResult; + deleteConcurrencyLimit: Scalars['Boolean']; deletePipelineRun: DeletePipelineRunResult; deleteRun: DeletePipelineRunResult; freeConcurrencySlots: Scalars['Boolean']; @@ -2122,6 +2123,10 @@ export type MutationCancelPartitionBackfillArgs = { backfillId: Scalars['String']; }; +export type MutationDeleteConcurrencyLimitArgs = { + concurrencyKey: Scalars['String']; +}; + export type MutationDeletePipelineRunArgs = { runId: Scalars['String']; }; @@ -8535,6 +8540,10 @@ export const buildMutation = ( : relationshipsToOmit.has('CancelBackfillSuccess') ? ({} as CancelBackfillSuccess) : buildCancelBackfillSuccess({}, relationshipsToOmit), + deleteConcurrencyLimit: + overrides && overrides.hasOwnProperty('deleteConcurrencyLimit') + ? overrides.deleteConcurrencyLimit! + : false, deletePipelineRun: overrides && overrides.hasOwnProperty('deletePipelineRun') ? overrides.deletePipelineRun! diff --git a/js_modules/dagster-ui/packages/ui-core/src/instance/InstanceConcurrency.tsx b/js_modules/dagster-ui/packages/ui-core/src/instance/InstanceConcurrency.tsx index 0db7307a96c17..0a7875b201741 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/instance/InstanceConcurrency.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/instance/InstanceConcurrency.tsx @@ -55,6 +55,8 @@ import { RunsForConcurrencyKeyQuery, RunsForConcurrencyKeyQueryVariables, RunQueueConfigFragment, + DeleteConcurrencyLimitMutation, + DeleteConcurrencyLimitMutationVariables, SetConcurrencyLimitMutation, SetConcurrencyLimitMutationVariables, } from './types/InstanceConcurrency.types'; @@ -634,16 +636,14 @@ const DeleteConcurrencyLimitDialog = ({ }) => { const [isSubmitting, setIsSubmitting] = React.useState(false); - const [setConcurrencyLimit] = useMutation< - SetConcurrencyLimitMutation, - SetConcurrencyLimitMutationVariables - >(SET_CONCURRENCY_LIMIT_MUTATION); + const [deleteConcurrencyLimit] = useMutation< + DeleteConcurrencyLimitMutation, + DeleteConcurrencyLimitMutationVariables + >(DELETE_CONCURRENCY_LIMIT_MUTATION); const save = async () => { setIsSubmitting(true); - await setConcurrencyLimit({ - variables: {concurrencyKey, limit: 0}, - }); + await deleteConcurrencyLimit({variables: {concurrencyKey}}); setIsSubmitting(false); onComplete(); onClose(); @@ -1016,6 +1016,12 @@ const SET_CONCURRENCY_LIMIT_MUTATION = gql` } `; +const DELETE_CONCURRENCY_LIMIT_MUTATION = gql` + mutation DeleteConcurrencyLimit($concurrencyKey: String!) { + deleteConcurrencyLimit(concurrencyKey: $concurrencyKey) + } +`; + export const FREE_CONCURRENCY_SLOTS_MUTATION = gql` mutation FreeConcurrencySlots($runId: String!, $stepKey: String) { freeConcurrencySlots(runId: $runId, stepKey: $stepKey) diff --git a/js_modules/dagster-ui/packages/ui-core/src/instance/types/InstanceConcurrency.types.ts b/js_modules/dagster-ui/packages/ui-core/src/instance/types/InstanceConcurrency.types.ts index a876b94dc6528..da205a5c6b371 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/instance/types/InstanceConcurrency.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/instance/types/InstanceConcurrency.types.ts @@ -73,6 +73,15 @@ export type SetConcurrencyLimitMutationVariables = Types.Exact<{ export type SetConcurrencyLimitMutation = {__typename: 'Mutation'; setConcurrencyLimit: boolean}; +export type DeleteConcurrencyLimitMutationVariables = Types.Exact<{ + concurrencyKey: Types.Scalars['String']; +}>; + +export type DeleteConcurrencyLimitMutation = { + __typename: 'Mutation'; + deleteConcurrencyLimit: boolean; +}; + export type FreeConcurrencySlotsMutationVariables = Types.Exact<{ runId: Types.Scalars['String']; stepKey?: Types.InputMaybe; diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py index 681df009cf04a..c3544723c54b8 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py @@ -831,6 +831,24 @@ def mutate(self, graphene_info, concurrencyKey: str, limit: int): return True +class GrapheneDeleteConcurrencyLimitMutation(graphene.Mutation): + """Sets the concurrency limit for a given concurrency key.""" + + Output = graphene.NonNull(graphene.Boolean) + + class Meta: + name = "DeleteConcurrencyLimitMutation" + + class Arguments: + concurrencyKey = graphene.Argument(graphene.NonNull(graphene.String)) + + @capture_error + @check_permission(Permissions.EDIT_CONCURRENCY_LIMIT) + def mutate(self, graphene_info, concurrencyKey: str): + graphene_info.context.instance.event_log_storage.delete_concurrency_limit(concurrencyKey) + return True + + class GrapheneFreeConcurrencySlotsMutation(graphene.Mutation): """Frees concurrency slots.""" @@ -907,5 +925,6 @@ class Meta: addDynamicPartition = GrapheneAddDynamicPartitionMutation.Field() setAutoMaterializePaused = GrapheneSetAutoMaterializePausedMutation.Field() setConcurrencyLimit = GrapheneSetConcurrencyLimitMutation.Field() + deleteConcurrencyLimit = GrapheneDeleteConcurrencyLimitMutation.Field() freeConcurrencySlotsForRun = GrapheneFreeConcurrencySlotsForRunMutation.Field() freeConcurrencySlots = GrapheneFreeConcurrencySlotsMutation.Field() diff --git a/python_modules/dagster/dagster/_core/storage/event_log/base.py b/python_modules/dagster/dagster/_core/storage/event_log/base.py index ac28795732399..f02c2f93c053d 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/base.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/base.py @@ -413,6 +413,11 @@ def set_concurrency_slots(self, concurrency_key: str, num: int) -> None: """Allocate concurrency slots for the given concurrency key.""" raise NotImplementedError() + @abstractmethod + def delete_concurrency_limit(self, concurrency_key: str) -> None: + """Delete concurrency limits and slots for the given concurrency key.""" + raise NotImplementedError() + @abstractmethod def get_concurrency_keys(self) -> Set[str]: """Get the set of concurrency limited keys.""" diff --git a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py index 604d9773ec808..0ba13ceb99a0d 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py @@ -2246,6 +2246,24 @@ def set_concurrency_slots(self, concurrency_key: str, num: int) -> None: # they will be unutilized until free_concurrency_slots is called self.assign_pending_steps(keys_to_assign) + def delete_concurrency_limit(self, concurrency_key: str) -> None: + """Delete a concurrency limit and its associated slots. + + Args: + concurrency_key (str): The key to delete. + """ + # ensure that we have concurrency limits set for all keys + self._reconcile_concurrency_limits_from_slots() + + with self.index_transaction() as conn: + if self.has_table(ConcurrencyLimitsTable.name): + conn.execute( + ConcurrencyLimitsTable.delete().where( + ConcurrencyLimitsTable.c.concurrency_key == concurrency_key + ) + ) + self._allocate_concurrency_slots(conn, concurrency_key, 0) + def _allocate_concurrency_slots(self, conn, concurrency_key: str, num: int) -> List[str]: keys_to_assign = [] count_row = conn.execute( diff --git a/python_modules/dagster/dagster/_core/storage/legacy_storage.py b/python_modules/dagster/dagster/_core/storage/legacy_storage.py index 49fa208f9f250..15cde482a4073 100644 --- a/python_modules/dagster/dagster/_core/storage/legacy_storage.py +++ b/python_modules/dagster/dagster/_core/storage/legacy_storage.py @@ -628,6 +628,9 @@ def initialize_concurrency_limit_to_default(self, concurrency_key: str) -> bool: def set_concurrency_slots(self, concurrency_key: str, num: int) -> None: return self._storage.event_log_storage.set_concurrency_slots(concurrency_key, num) + def delete_concurrency_limit(self, concurrency_key: str) -> None: + return self._storage.event_log_storage.delete_concurrency_limit(concurrency_key) + def get_concurrency_keys(self) -> Set[str]: return self._storage.event_log_storage.get_concurrency_keys() diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py index e17b33c3529e4..8938901371327 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py @@ -4467,6 +4467,10 @@ def test_zero_concurrency(self, storage: EventLogStorage): assert info.pending_step_count == 1 assert info.assigned_step_count == 0 + # delete the concurrency slot + storage.delete_concurrency_limit("foo") + assert storage.get_concurrency_keys() == set() + def test_default_concurrency( self, storage: EventLogStorage,