Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create time filtering for bulk actions table #23560

Merged
merged 13 commits into from
Aug 22, 2024
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from collections import defaultdict
from typing import (
TYPE_CHECKING,
Expand All @@ -18,13 +19,13 @@
)
from dagster._core.definitions.selector import JobSubsetSelector
from dagster._core.errors import DagsterInvariantViolationError, DagsterRunNotFoundError
from dagster._core.execution.backfill import PartitionBackfill
from dagster._core.execution.backfill import BulkActionsFilter
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter
from dagster._core.storage.event_log.base import AssetRecord
from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_tag_type
from dagster._record import record
from dagster._time import datetime_from_timestamp, get_current_timestamp
from dagster._time import datetime_from_timestamp

from .external import ensure_valid_config, get_external_job_or_raise

Expand Down Expand Up @@ -436,14 +437,10 @@ def _fetch_runs_not_in_backfill(
instance: DagsterInstance,
cursor: Optional[str],
limit: int,
created_before: Optional[float],
created_before: Optional[datetime.datetime],
) -> Sequence[RunRecord]:
"""Fetches limit RunRecords that are not part of a backfill and were created before a given timestamp."""
runs_filter = (
RunsFilter(created_before=datetime_from_timestamp(created_before))
if created_before
else None
)
runs_filter = RunsFilter(created_before=created_before) if created_before else None

runs = []
while len(runs) < limit:
Expand All @@ -458,38 +455,6 @@ def _fetch_runs_not_in_backfill(
return runs[:limit]


def _fetch_backfills_created_before_timestamp(
instance: DagsterInstance,
cursor: Optional[str],
limit: int,
created_before: Optional[float] = None,
) -> Sequence[PartitionBackfill]:
"""Fetches limit PartitionBackfills that were created before a given timestamp.

Note: This is a reasonable filter to add to the get_backfills instance method. However, we should have a
more generalized way of adding filters than adding new parameters to get_backfills. So for now, doing this
in a separate function.
"""
created_before = created_before if created_before else get_current_timestamp()
backfills = []
while len(backfills) < limit:
# fetch backfills in a loop discarding backfills that were created after created_before until
# we have limit backfills to return or have reached the end of the backfills table
new_backfills = instance.get_backfills(cursor=cursor, limit=limit)
if len(new_backfills) == 0:
return backfills
cursor = new_backfills[-1].backfill_id
backfills.extend(
[
backfill
for backfill in new_backfills
if backfill.backfill_timestamp <= created_before
]
)

return backfills[:limit]


def get_runs_feed_entries(
graphene_info: "ResolveInfo",
limit: int,
Expand Down Expand Up @@ -518,13 +483,15 @@ def get_runs_feed_entries(
fetch_limit = limit + 1
# filter out any backfills/runs that are newer than the cursor timestamp. See RunsFeedCursor docstring
# for case when theis is necessary
created_before_cursor = (
datetime_from_timestamp(runs_feed_cursor.timestamp) if runs_feed_cursor.timestamp else None
)
backfills = [
GraphenePartitionBackfill(backfill)
for backfill in _fetch_backfills_created_before_timestamp(
instance,
for backfill in instance.get_backfills(
cursor=runs_feed_cursor.backfill_cursor,
limit=fetch_limit,
created_before=runs_feed_cursor.timestamp,
limit=limit,
filters=BulkActionsFilter(created_before=created_before_cursor),
)
]
runs = [
Expand All @@ -533,7 +500,7 @@ def get_runs_feed_entries(
instance,
cursor=runs_feed_cursor.run_cursor,
limit=fetch_limit,
created_before=runs_feed_cursor.timestamp,
created_before=created_before_cursor,
)
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
# CURRENT_TIMESTAMP only has second precision for sqlite, so if we create runs and backfills without any delay
# the resulting list is a chunk of runs and then a chunk of backfills when ordered by time. Adding a small
# delay between creating a run and a backfill makes the resulting list more interwoven
CREATE_DELAY = 0.5
CREATE_DELAY = 1


def _create_run(graphql_context) -> DagsterRun:
Expand Down Expand Up @@ -99,6 +99,20 @@ def gql_context_with_runs_and_backfills(self, class_scoped_graphql_context):
return class_scoped_graphql_context

def test_get_runs_feed(self, gql_context_with_runs_and_backfills):
result = execute_dagster_graphql(
gql_context_with_runs_and_backfills.create_request_context(),
GET_RUNS_FEED_QUERY,
variables={
"limit": 25,
"cursor": None,
},
)
prev_run_time = None
for res in result.data["runsFeedOrError"]["results"]:
if prev_run_time:
assert res["creationTime"] <= prev_run_time
prev_run_time = res["creationTime"]

result = execute_dagster_graphql(
gql_context_with_runs_and_backfills.create_request_context(),
GET_RUNS_FEED_QUERY,
Expand Down
24 changes: 24 additions & 0 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from enum import Enum
from typing import Mapping, NamedTuple, Optional, Sequence, Union

Expand All @@ -14,6 +15,7 @@
from dagster._core.remote_representation.origin import RemotePartitionSetOrigin
from dagster._core.storage.tags import USER_TAG
from dagster._core.workspace.workspace import IWorkspace
from dagster._record import record
from dagster._serdes import whitelist_for_serdes
from dagster._utils.error import SerializableErrorInfo

Expand All @@ -38,6 +40,28 @@ def from_graphql_input(graphql_str):
return BulkActionStatus(graphql_str)


@record
class BulkActionsFilter:
"""Filters to use when querying for bulk actions (i.e. backfills) from the BulkActionsTable.

Each field of the BulkActionsFilter represents a logical AND with each other. For
example, if you specify status and created_before, then you will receive only bulk actions
with the specified states AND the created before created_before. If left blank, then
all values will be permitted for that field.

Args:
status (Optional[BulkActionStatus]): A status to filter by.
created_before (Optional[DateTime]): Filter by bulk actions that were created before this datetime. Note that the
create_time for each bulk action is stored in UTC.
created_after (Optional[DateTime]): Filter by bulk actions that were created after this datetime. Note that the
create_time for each bulk action is stored in UTC.
"""

status: Optional[BulkActionStatus] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we will need to expose most of the fields that we currently do on the RunsFilter. Support for those fields is probably out of scope for this PR though.

The question about supporting single vs multiple statuses though will probably depend on the mapping of BulkActionStatus to the various types in the Runs view UI status filters.

We should flag that as a thing we need to figure out (status), as well as some of the other filters (e.g. job_name, tags, etc).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep - to add more context if someone ends up looking at this PR in the future: the main goal of adding this filter now is to remove this block of code https://github.com/dagster-io/dagster/pull/23560/files#diff-c46424223e58f81b69323dee30256f0a38733eac79198279ebbcf5cec19ee16eL461-L492

I noted these things down for the filtering discussion

created_before: Optional[datetime] = None
created_after: Optional[datetime] = None


@whitelist_for_serdes
class PartitionBackfill(
NamedTuple(
Expand Down
11 changes: 9 additions & 2 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@
JobFailureData,
)
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.backfill import (
BulkActionsFilter,
BulkActionStatus,
PartitionBackfill,
)
from dagster._core.execution.plan.plan import ExecutionPlan
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.execution.stats import RunStepKeyStatsSnapshot
Expand Down Expand Up @@ -3076,8 +3080,11 @@ def get_backfills(
status: Optional["BulkActionStatus"] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
filters: Optional["BulkActionsFilter"] = None,
) -> Sequence["PartitionBackfill"]:
return self._run_storage.get_backfills(status=status, cursor=cursor, limit=limit)
return self._run_storage.get_backfills(
status=status, cursor=cursor, limit=limit, filters=filters
)

def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]:
return self._run_storage.get_backfill(backfill_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
from dagster._core.event_api import AssetRecordsFilter, RunStatusChangeRecordsFilter
from dagster._core.events import DagsterEvent, DagsterEventType
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.backfill import (
BulkActionsFilter,
BulkActionStatus,
PartitionBackfill,
)
from dagster._core.execution.stats import RunStepKeyStatsSnapshot
from dagster._core.instance import DagsterInstance
from dagster._core.remote_representation.origin import RemoteJobOrigin
Expand Down Expand Up @@ -312,8 +316,9 @@ def get_backfills(
status: Optional["BulkActionStatus"] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
filters: Optional["BulkActionsFilter"] = None,
) -> Sequence["PartitionBackfill"]:
return self._storage.run_storage.get_backfills(status, cursor, limit)
return self._storage.run_storage.get_backfills(status, cursor, limit, filters=filters)

def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]:
return self._storage.run_storage.get_backfill(backfill_id)
Expand Down
3 changes: 2 additions & 1 deletion python_modules/dagster/dagster/_core/storage/runs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing_extensions import TypedDict

from dagster._core.events import DagsterEvent
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill
from dagster._core.execution.telemetry import RunTelemetryData
from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance
from dagster._core.snap import ExecutionPlanSnapshot, JobSnapshot
Expand Down Expand Up @@ -373,6 +373,7 @@ def get_backfills(
status: Optional[BulkActionStatus] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
filters: Optional[BulkActionsFilter] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider changing the order of args? And maybe (in a separate PR) switching callers from using status to using filters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i think ideally filters would be before status, but there could be callsites that dont use kwargs right? get_backfills(BulkActionStatus.COMPLETED)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any callsites that don't use kwargs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would potentially worry about user code callsites, but get_backfills is not marked as @public, so I think you could add a changelog note and be fine about changing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll stack a branch to do this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

branch for re-ordering #23773

) -> Sequence[PartitionBackfill]:
"""Get a list of partition backfills."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
DagsterEventType,
RunFailureReason,
)
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill
from dagster._core.remote_representation.origin import RemoteJobOrigin
from dagster._core.snap import (
ExecutionPlanSnapshot,
Expand Down Expand Up @@ -837,16 +837,27 @@ def get_backfills(
status: Optional[BulkActionStatus] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
filters: Optional[BulkActionsFilter] = None,
) -> Sequence[PartitionBackfill]:
check.opt_inst_param(status, "status", BulkActionStatus)
query = db_select([BulkActionsTable.c.body])
if status:
query = db_select([BulkActionsTable.c.body, BulkActionsTable.c.timestamp])
if status or (filters and filters.status):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunate that we have to do all of this checking around status. I assume we'd have to do a deprecation warning cycle if we wanted to have status filtering be done via the BulkActionsFilter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so? I don't have a great sense for how often people are manually calling this from user code with the status arg.

It's not marked as a @public method on the instance, but it is exposed on the cloud graphql implementation. If you wanted to be super-conservative, you could log some volume metrics in Cloud first before deprecating.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly, should we take this opportunity to make it a statuses (plural) filter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Will we need them to render the different tabs in the Runs view?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we need the plural statuses, but RunsFilter does statuses and this internal fn also uses statuses so it could be nice to align

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the Runs page supports filtering for multiple statuses, so i'm updating the BulkActions filter to support multiple statuses in this stacked PR #23772

if status and filters and filters.status and status != filters.status:
raise DagsterInvariantViolationError(
"Conflicting status filters provided to get_backfills. Choose one of status or BulkActionsFilter.status."
)
status = status or (filters.status if filters else None)
assert status
query = query.where(BulkActionsTable.c.status == status.value)
if cursor:
cursor_query = db_select([BulkActionsTable.c.id]).where(
BulkActionsTable.c.key == cursor
)
query = query.where(BulkActionsTable.c.id < cursor_query)
if filters and filters.created_after:
query = query.where(BulkActionsTable.c.timestamp > filters.created_after)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prha the time-based filters for the RunsTable replace the timezone filters.created_after.replace(tzinfo=None) but the columns in the RunsTable are DATETIME. In the BulkActionsTable they are TIMESTAMP and i found that the postgres run storage tests failed when i replaced the timezone with None. The tests all pass when i leave the tzinfo as is, but i haven't been able to find a reason why online. Do you forsee any issues with not doing the timezone conversion here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more info -
when we add the backfill to the DB we call datetime_from_timestamp which will return a UTC datetime

timestamp=datetime_from_timestamp(partition_backfill.backfill_timestamp),

In the graphene layer, I'm doing the same conversion so anything coming from the UI will also get converted to UTC and filtering will be correct https://github.com/dagster-io/dagster/pull/23682/files#diff-61fefa33db2a378b1c50f360e3aa830124007f7bc8f4195005286b529b6e60cdR388

But maybe the right thing to do is to have a custom constructor on BulkActionFilters that takes timestamp instead of datetime and does the conversion there so that all of the datetimes on the filter are guranteed to be converted to UTC

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either way works...

if filters and filters.created_before:
query = query.where(BulkActionsTable.c.timestamp < filters.created_before)
if limit:
query = query.limit(limit)
query = query.order_by(BulkActionsTable.c.id.desc())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
DagsterSnapshotDoesNotExist,
)
from dagster._core.events import DagsterEvent, DagsterEventType, JobFailureData, RunFailureReason
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill
from dagster._core.instance import DagsterInstance, InstanceType
from dagster._core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher
from dagster._core.remote_representation import (
Expand Down Expand Up @@ -44,7 +44,7 @@
from dagster._daemon.daemon import SensorDaemon
from dagster._daemon.types import DaemonHeartbeat
from dagster._serdes import serialize_pp
from dagster._time import create_datetime
from dagster._time import create_datetime, datetime_from_timestamp

win_py36 = _seven.IS_WINDOWS and sys.version_info[0] == 3 and sys.version_info[1] == 6

Expand Down Expand Up @@ -1337,6 +1337,81 @@ def test_backfill(self, storage: RunStorage):
assert len(storage.get_backfills()) == 1
assert len(storage.get_backfills(status=BulkActionStatus.REQUESTED)) == 0

def test_backfill_status_filtering(self, storage: RunStorage):
origin = self.fake_partition_set_origin("fake_partition_set")
backfills = storage.get_backfills()
assert len(backfills) == 0

one = PartitionBackfill(
"one",
partition_set_origin=origin,
status=BulkActionStatus.REQUESTED,
partition_names=["a", "b", "c"],
from_failure=False,
tags={},
backfill_timestamp=time.time(),
)
storage.add_backfill(one)
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED)))
== 1
)
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED)))
== 0
)
backfills = storage.get_backfills(
filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED)
)
assert backfills[0] == one

storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED))
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED)))
== 0
)
assert (
len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED)))
== 1
)

def test_backfill_created_time_filtering(self, storage: RunStorage):
origin = self.fake_partition_set_origin("fake_partition_set")
backfills = storage.get_backfills()
assert len(backfills) == 0

all_backfills = []
for i in range(5):
backfill = PartitionBackfill(
f"backfill_{i}",
partition_set_origin=origin,
status=BulkActionStatus.REQUESTED,
partition_names=["a", "b", "c"],
from_failure=False,
tags={},
backfill_timestamp=time.time(),
)
storage.add_backfill(backfill)
all_backfills.append(backfill)

created_before = storage.get_backfills(
filters=BulkActionsFilter(
created_before=datetime_from_timestamp(all_backfills[2].backfill_timestamp)
)
)
assert len(created_before) == 2
for backfill in created_before:
assert backfill.backfill_timestamp < all_backfills[2].backfill_timestamp

created_after = storage.get_backfills(
filters=BulkActionsFilter(
created_after=datetime_from_timestamp(all_backfills[2].backfill_timestamp)
)
)
assert len(created_after) == 2
for backfill in created_after:
assert backfill.backfill_timestamp > all_backfills[2].backfill_timestamp

def test_secondary_index(self, storage):
self._skip_in_memory(storage)

Expand Down