Skip to content

Commit

Permalink
basic create filtering for bulk actions table
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Aug 12, 2024
1 parent 1b3f915 commit f36fa63
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
)
from dagster._core.definitions.selector import JobSubsetSelector
from dagster._core.errors import DagsterInvariantViolationError, DagsterRunNotFoundError
from dagster._core.execution.backfill import PartitionBackfill
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter
from dagster._core.storage.event_log.base import AssetRecord
from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_tag_type
from dagster._record import record
from dagster._time import datetime_from_timestamp, get_current_timestamp
from dagster._time import datetime_from_timestamp

from .external import ensure_valid_config, get_external_job_or_raise

Expand Down Expand Up @@ -458,38 +457,6 @@ def _fetch_runs_not_in_backfill(
return runs[:limit]


def _fetch_backfills_created_before_timestamp(
instance: DagsterInstance,
cursor: Optional[str],
limit: int,
created_before: Optional[float] = None,
) -> Sequence[PartitionBackfill]:
"""Fetches limit PartitionBackfills that were created before a given timestamp.
Note: This is a reasonable filter to add to the get_backfills instance method. However, we should have a
more generalized way of adding filters than adding new parameters to get_backfills. So for now, doing this
in a separate function.
"""
created_before = created_before if created_before else get_current_timestamp()
backfills = []
while len(backfills) < limit:
# fetch backfills in a loop discarding backfills that were created after created_before until
# we have limit backfills to return or have reached the end of the backfills table
new_backfills = instance.get_backfills(cursor=cursor, limit=limit)
if len(new_backfills) == 0:
return backfills
cursor = new_backfills[-1].backfill_id
backfills.extend(
[
backfill
for backfill in new_backfills
if backfill.backfill_timestamp <= created_before
]
)

return backfills[:limit]


def get_runs_feed_entries(
graphene_info: "ResolveInfo",
limit: int,
Expand Down Expand Up @@ -520,11 +487,8 @@ def get_runs_feed_entries(
# for case when theis is necessary
backfills = [
GraphenePartitionBackfill(backfill)
for backfill in _fetch_backfills_created_before_timestamp(
instance,
cursor=runs_feed_cursor.backfill_cursor,
limit=fetch_limit,
created_before=runs_feed_cursor.timestamp,
for backfill in instance.get_backfills(
cursor=cursor, limit=limit, created_before=runs_feed_cursor.timestamp
)
]
runs = [
Expand Down
11 changes: 10 additions & 1 deletion python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import weakref
from abc import abstractmethod
from collections import defaultdict
from datetime import datetime
from enum import Enum
from tempfile import TemporaryDirectory
from types import TracebackType
Expand Down Expand Up @@ -3076,8 +3077,16 @@ def get_backfills(
status: Optional["BulkActionStatus"] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
created_before: Optional[datetime] = None,
created_after: Optional[datetime] = None,
) -> Sequence["PartitionBackfill"]:
return self._run_storage.get_backfills(status=status, cursor=cursor, limit=limit)
return self._run_storage.get_backfills(
status=status,
cursor=cursor,
limit=limit,
created_before=created_before,
created_after=created_after,
)

def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]:
return self._run_storage.get_backfill(backfill_id)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Set, Tuple, Union

from dagster import _check as check
Expand Down Expand Up @@ -312,8 +313,12 @@ def get_backfills(
status: Optional["BulkActionStatus"] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
created_before: Optional[datetime] = None,
created_after: Optional[datetime] = None,
) -> Sequence["PartitionBackfill"]:
return self._storage.run_storage.get_backfills(status, cursor, limit)
return self._storage.run_storage.get_backfills(
status, cursor, limit, created_before=created_before, created_after=created_after
)

def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]:
return self._storage.run_storage.get_backfill(backfill_id)
Expand Down
3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/_core/storage/runs/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import TYPE_CHECKING, Dict, Mapping, Optional, Sequence, Set, Tuple, Union

from typing_extensions import TypedDict
Expand Down Expand Up @@ -373,6 +374,8 @@ def get_backfills(
status: Optional[BulkActionStatus] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
created_before: Optional[datetime] = None,
created_after: Optional[datetime] = None,
) -> Sequence[PartitionBackfill]:
"""Get a list of partition backfills."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,8 @@ def get_backfills(
status: Optional[BulkActionStatus] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
created_before: Optional[datetime] = None,
created_after: Optional[datetime] = None,
) -> Sequence[PartitionBackfill]:
check.opt_inst_param(status, "status", BulkActionStatus)
query = db_select([BulkActionsTable.c.body])
Expand All @@ -847,6 +849,14 @@ def get_backfills(
BulkActionsTable.c.key == cursor
)
query = query.where(BulkActionsTable.c.id < cursor_query)
if created_after:
query = query.where(
BulkActionsTable.c.timestamp > created_after.replace(tzinfo=None).timestamp()
)
if created_before:
query = query.where(
BulkActionsTable.c.timestamp < created_before.replace(tzinfo=None).timestamp()
)
if limit:
query = query.limit(limit)
query = query.order_by(BulkActionsTable.c.id.desc())
Expand Down

0 comments on commit f36fa63

Please sign in to comment.