Skip to content

Commit

Permalink
filter by multiple statuses in BulkActionsFilter (#23772)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria authored Aug 22, 2024
1 parent ea48f9a commit 7b1f2c3
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 25 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions python_modules/dagster-graphql/dagster_graphql/schema/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ class Meta:


class GrapheneBulkActionsFilter(graphene.InputObjectType):
status = graphene.InputField("dagster_graphql.schema.backfill.GrapheneBulkActionStatus")
statuses = graphene.List(
graphene.NonNull("dagster_graphql.schema.backfill.GrapheneBulkActionStatus")
)
createdBefore = graphene.InputField(graphene.Float)
createdAfter = graphene.InputField(graphene.Float)

Expand All @@ -388,12 +390,14 @@ class Meta:
name = "BulkActionsFilter"

def to_selector(self):
status = BulkActionStatus[self.status.value] if self.status else None
statuses = (
[BulkActionStatus[status.value] for status in self.statuses] if self.statuses else None
)
created_before = datetime_from_timestamp(self.createdBefore) if self.createdBefore else None
created_after = datetime_from_timestamp(self.createdAfter) if self.createdAfter else None

return BulkActionsFilter(
status=status,
statuses=statuses,
created_before=created_before,
created_after=created_after,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1203,5 +1203,5 @@ def test_get_backfills_with_filters():
get_backfills_result = execute_dagster_graphql(
context,
BACKFILLS_WITH_FILTERS_QUERY,
variables={"filters": {"status": "REQUESTED"}},
variables={"filters": {"statuses": ["REQUESTED"]}},
)
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ class BulkActionsFilter:
all values will be permitted for that field.
Args:
status (Optional[BulkActionStatus]): A status to filter by.
statuses (Optional[Sequence[BulkActionStatus]]): A list of statuses to filter by.
created_before (Optional[DateTime]): Filter by bulk actions that were created before this datetime. Note that the
create_time for each bulk action is stored in UTC.
created_after (Optional[DateTime]): Filter by bulk actions that were created after this datetime. Note that the
create_time for each bulk action is stored in UTC.
"""

status: Optional[BulkActionStatus] = None
statuses: Optional[Sequence[BulkActionStatus]] = None
created_before: Optional[datetime] = None
created_after: Optional[datetime] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,14 +841,16 @@ def get_backfills(
) -> Sequence[PartitionBackfill]:
check.opt_inst_param(status, "status", BulkActionStatus)
query = db_select([BulkActionsTable.c.body, BulkActionsTable.c.timestamp])
if status or (filters and filters.status):
if status and filters and filters.status and status != filters.status:
raise DagsterInvariantViolationError(
"Conflicting status filters provided to get_backfills. Choose one of status or BulkActionsFilter.status."
)
status = status or (filters.status if filters else None)
assert status
query = query.where(BulkActionsTable.c.status == status.value)
if status and filters:
raise DagsterInvariantViolationError(
"Cannot provide status and filters to get_backfills. Please use filters rather than status."
)
if status or (filters and filters.statuses):
statuses = [status] if status else (filters.statuses if filters else None)
assert statuses
query = query.where(
BulkActionsTable.c.status.in_([status.value for status in statuses])
)
if cursor:
cursor_query = db_select([BulkActionsTable.c.id]).where(
BulkActionsTable.c.key == cursor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,28 +1353,75 @@ def test_backfill_status_filtering(self, storage: RunStorage):
)
storage.add_backfill(one)
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED)))
len(
storage.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.REQUESTED])
)
)
== 1
)
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED)))
len(
storage.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED])
)
)
== 0
)
assert (
len(
storage.get_backfills(
filters=BulkActionsFilter(
statuses=[BulkActionStatus.COMPLETED, BulkActionStatus.REQUESTED]
)
)
)
== 1
)
backfills = storage.get_backfills(
filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED)
filters=BulkActionsFilter(statuses=[BulkActionStatus.REQUESTED])
)
assert backfills[0] == one

storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED))
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED)))
len(
storage.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.REQUESTED])
)
)
== 0
)
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED)))
len(
storage.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED])
)
)
== 1
)

two = PartitionBackfill(
"two",
partition_set_origin=origin,
status=BulkActionStatus.REQUESTED,
partition_names=["a", "b", "c"],
from_failure=False,
tags={},
backfill_timestamp=time.time(),
)
storage.add_backfill(two)
assert (
len(
storage.get_backfills(
filters=BulkActionsFilter(
statuses=[BulkActionStatus.COMPLETED, BulkActionStatus.REQUESTED]
)
)
)
== 2
)

def test_backfill_created_time_filtering(self, storage: RunStorage):
origin = self.fake_partition_set_origin("fake_partition_set")
backfills = storage.get_backfills()
Expand Down

1 comment on commit 7b1f2c3

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-6kwb62ogv-elementl.vercel.app

Built with commit 7b1f2c3.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.