Skip to content

Commit

Permalink
add mutation to delete concurrency limits (#19205)
Browse files Browse the repository at this point in the history
[INTERNAL_BRANCH=prha/delete_concurrency_limit]

## Summary & Motivation
Previously, we interpreted zero concurrency limits as slot deletion.

Now that we accept zero as an acceptable limit value, we need an
explicit delete concurrency limit storage method.

## How I Tested These Changes
BK
  • Loading branch information
prha authored and salazarm committed Jan 17, 2024
1 parent 5ceec59 commit 573d477
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 7 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ import {
RunsForConcurrencyKeyQuery,
RunsForConcurrencyKeyQueryVariables,
RunQueueConfigFragment,
DeleteConcurrencyLimitMutation,
DeleteConcurrencyLimitMutationVariables,
SetConcurrencyLimitMutation,
SetConcurrencyLimitMutationVariables,
} from './types/InstanceConcurrency.types';
Expand Down Expand Up @@ -634,16 +636,14 @@ const DeleteConcurrencyLimitDialog = ({
}) => {
const [isSubmitting, setIsSubmitting] = React.useState(false);

const [setConcurrencyLimit] = useMutation<
SetConcurrencyLimitMutation,
SetConcurrencyLimitMutationVariables
>(SET_CONCURRENCY_LIMIT_MUTATION);
const [deleteConcurrencyLimit] = useMutation<
DeleteConcurrencyLimitMutation,
DeleteConcurrencyLimitMutationVariables
>(DELETE_CONCURRENCY_LIMIT_MUTATION);

const save = async () => {
setIsSubmitting(true);
await setConcurrencyLimit({
variables: {concurrencyKey, limit: 0},
});
await deleteConcurrencyLimit({variables: {concurrencyKey}});
setIsSubmitting(false);
onComplete();
onClose();
Expand Down Expand Up @@ -1016,6 +1016,12 @@ const SET_CONCURRENCY_LIMIT_MUTATION = gql`
}
`;

const DELETE_CONCURRENCY_LIMIT_MUTATION = gql`
mutation DeleteConcurrencyLimit($concurrencyKey: String!) {
deleteConcurrencyLimit(concurrencyKey: $concurrencyKey)
}
`;

export const FREE_CONCURRENCY_SLOTS_MUTATION = gql`
mutation FreeConcurrencySlots($runId: String!, $stepKey: String) {
freeConcurrencySlots(runId: $runId, stepKey: $stepKey)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,24 @@ def mutate(self, graphene_info, concurrencyKey: str, limit: int):
return True


class GrapheneDeleteConcurrencyLimitMutation(graphene.Mutation):
"""Sets the concurrency limit for a given concurrency key."""

Output = graphene.NonNull(graphene.Boolean)

class Meta:
name = "DeleteConcurrencyLimitMutation"

class Arguments:
concurrencyKey = graphene.Argument(graphene.NonNull(graphene.String))

@capture_error
@check_permission(Permissions.EDIT_CONCURRENCY_LIMIT)
def mutate(self, graphene_info, concurrencyKey: str):
graphene_info.context.instance.event_log_storage.delete_concurrency_limit(concurrencyKey)
return True


class GrapheneFreeConcurrencySlotsMutation(graphene.Mutation):
"""Frees concurrency slots."""

Expand Down Expand Up @@ -907,5 +925,6 @@ class Meta:
addDynamicPartition = GrapheneAddDynamicPartitionMutation.Field()
setAutoMaterializePaused = GrapheneSetAutoMaterializePausedMutation.Field()
setConcurrencyLimit = GrapheneSetConcurrencyLimitMutation.Field()
deleteConcurrencyLimit = GrapheneDeleteConcurrencyLimitMutation.Field()
freeConcurrencySlotsForRun = GrapheneFreeConcurrencySlotsForRunMutation.Field()
freeConcurrencySlots = GrapheneFreeConcurrencySlotsMutation.Field()
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,11 @@ def set_concurrency_slots(self, concurrency_key: str, num: int) -> None:
"""Allocate concurrency slots for the given concurrency key."""
raise NotImplementedError()

@abstractmethod
def delete_concurrency_limit(self, concurrency_key: str) -> None:
"""Delete concurrency limits and slots for the given concurrency key."""
raise NotImplementedError()

@abstractmethod
def get_concurrency_keys(self) -> Set[str]:
"""Get the set of concurrency limited keys."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2246,6 +2246,24 @@ def set_concurrency_slots(self, concurrency_key: str, num: int) -> None:
# they will be unutilized until free_concurrency_slots is called
self.assign_pending_steps(keys_to_assign)

def delete_concurrency_limit(self, concurrency_key: str) -> None:
"""Delete a concurrency limit and its associated slots.
Args:
concurrency_key (str): The key to delete.
"""
# ensure that we have concurrency limits set for all keys
self._reconcile_concurrency_limits_from_slots()

with self.index_transaction() as conn:
if self.has_table(ConcurrencyLimitsTable.name):
conn.execute(
ConcurrencyLimitsTable.delete().where(
ConcurrencyLimitsTable.c.concurrency_key == concurrency_key
)
)
self._allocate_concurrency_slots(conn, concurrency_key, 0)

def _allocate_concurrency_slots(self, conn, concurrency_key: str, num: int) -> List[str]:
keys_to_assign = []
count_row = conn.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,9 @@ def initialize_concurrency_limit_to_default(self, concurrency_key: str) -> bool:
def set_concurrency_slots(self, concurrency_key: str, num: int) -> None:
return self._storage.event_log_storage.set_concurrency_slots(concurrency_key, num)

def delete_concurrency_limit(self, concurrency_key: str) -> None:
return self._storage.event_log_storage.delete_concurrency_limit(concurrency_key)

def get_concurrency_keys(self) -> Set[str]:
return self._storage.event_log_storage.get_concurrency_keys()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4467,6 +4467,10 @@ def test_zero_concurrency(self, storage: EventLogStorage):
assert info.pending_step_count == 1
assert info.assigned_step_count == 0

# delete the concurrency slot
storage.delete_concurrency_limit("foo")
assert storage.get_concurrency_keys() == set()

def test_default_concurrency(
self,
storage: EventLogStorage,
Expand Down

0 comments on commit 573d477

Please sign in to comment.