Skip to content

Commit

Permalink
Revert "Revert "support filtering in the runs feed"" (#24661)
Browse files Browse the repository at this point in the history
Reverts #24654

Adds runs feed filtering back in with method to skip filtering tests for
plus since filtering won't be implemented there for a few more days

internal pr dagster-io/internal#11719

NOCHANGELOG
  • Loading branch information
jamiedemaria authored Sep 23, 2024
1 parent 721126f commit 4cae513
Show file tree
Hide file tree
Showing 6 changed files with 631 additions and 40 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
)
from dagster._core.definitions.selector import JobSubsetSelector
from dagster._core.errors import DagsterInvariantViolationError, DagsterRunNotFoundError
from dagster._core.execution.backfill import BulkActionsFilter
from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter
from dagster._core.storage.event_log.base import AssetRecord
from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_tag_type
from dagster._record import record
from dagster._record import copy, record
from dagster._time import datetime_from_timestamp

from dagster_graphql.implementation.external import ensure_valid_config, get_external_job_or_raise
Expand Down Expand Up @@ -432,16 +432,14 @@ def _fetch_runs_not_in_backfill(
instance: DagsterInstance,
cursor: Optional[str],
limit: int,
created_before: Optional[datetime.datetime],
filters: Optional[RunsFilter],
) -> Sequence[RunRecord]:
"""Fetches limit RunRecords that are not part of a backfill and were created before a given timestamp."""
runs_filter = RunsFilter(created_before=created_before) if created_before else None

runs = []
while len(runs) < limit:
# fetch runs in a loop and discard runs that are part of a backfill until we have
# limit runs to return or have reached the end of the runs table
new_runs = instance.get_run_records(limit=limit, cursor=cursor, filters=runs_filter)
new_runs = instance.get_run_records(limit=limit, cursor=cursor, filters=filters)
if len(new_runs) == 0:
return runs
cursor = new_runs[-1].dagster_run.run_id
Expand All @@ -450,9 +448,77 @@ def _fetch_runs_not_in_backfill(
return runs[:limit]


RUN_STATUS_TO_BULK_ACTION_STATUSES = {
DagsterRunStatus.SUCCESS: [BulkActionStatus.COMPLETED_SUCCESS],
DagsterRunStatus.FAILURE: [BulkActionStatus.FAILED, BulkActionStatus.COMPLETED_FAILED],
DagsterRunStatus.CANCELED: [BulkActionStatus.CANCELED],
DagsterRunStatus.CANCELING: [BulkActionStatus.CANCELING],
DagsterRunStatus.STARTED: [BulkActionStatus.REQUESTED],
}


def _bulk_action_statuses_from_run_statuses(
statuses: Sequence[DagsterRunStatus],
) -> Sequence[BulkActionStatus]:
full_list = []
for status in statuses:
full_list.extend(RUN_STATUS_TO_BULK_ACTION_STATUSES.get(status, []))

return full_list


def _filters_apply_to_backfills(filters: RunsFilter) -> bool:
# the following filters do not apply to backfills, so skip fetching backfills if they are set
if (
(filters.run_ids is not None and len(filters.run_ids) > 0)
or filters.updated_after is not None
or filters.updated_before is not None
or filters.snapshot_id is not None
):
return False
# if filtering by statuses and all are not valid backfill statuses, skip fetching backfills
if filters.statuses and len(_bulk_action_statuses_from_run_statuses(filters.statuses)) == 0:
return False

return True


def _bulk_action_filters_from_run_filters(filters: RunsFilter) -> BulkActionsFilter:
converted_statuses = (
_bulk_action_statuses_from_run_statuses(filters.statuses) if filters.statuses else None
)
return BulkActionsFilter(
created_before=filters.created_before,
created_after=filters.created_after,
statuses=converted_statuses,
job_name=filters.job_name,
tags=filters.tags,
)


def _replace_created_before_with_cursor(
filters: RunsFilter, created_before_cursor: Optional[datetime.datetime]
):
"""After the first page of results is returned, created_before_cursor will be less than
filters.created_before. For pagination of results to work, we need to ensure that the
created_before filter is set to the minimum of created_before_cursor and filters.created_before.
"""
if filters.created_before and created_before_cursor:
created_before = min(created_before_cursor, filters.created_before)
elif created_before_cursor:
created_before = created_before_cursor
elif filters.created_before:
created_before = filters.created_before
else: # no created_before should be applied, return filters as is
return filters

return copy(filters, created_before=created_before)


def get_runs_feed_entries(
graphene_info: "ResolveInfo",
limit: int,
filters: Optional[RunsFilter],
cursor: Optional[str] = None,
) -> "GrapheneRunsFeedConnection":
"""Returns a GrapheneRunsFeedConnection, which contains a merged list of backfills and
Expand All @@ -463,39 +529,56 @@ def get_runs_feed_entries(
limit (int): max number of results to return
cursor (Optional[str]): String that can be deserialized into a RunsFeedCursor. If None, indicates
that querying should start at the beginning of the table for both runs and backfills.
filters (Optional[RunsFilter]): Filters to apply to the runs. If None, no filters are applied.
"""
from dagster_graphql.schema.backfill import GraphenePartitionBackfill
from dagster_graphql.schema.pipelines.pipeline import GrapheneRun
from dagster_graphql.schema.runs_feed import GrapheneRunsFeedConnection

check.opt_str_param(cursor, "cursor")
check.int_param(limit, "limit")
check.opt_inst_param(filters, "filters", RunsFilter)

instance = graphene_info.context.instance
runs_feed_cursor = RunsFeedCursor.from_string(cursor)

# if using limit, fetch limit+1 of each type to know if there are more than limit remaining
fetch_limit = limit + 1
# filter out any backfills/runs that are newer than the cursor timestamp. See RunsFeedCursor docstring
# for case when theis is necessary
# for case when this is necessary
created_before_cursor = (
datetime_from_timestamp(runs_feed_cursor.timestamp) if runs_feed_cursor.timestamp else None
)
backfills = [
GraphenePartitionBackfill(backfill)
for backfill in instance.get_backfills(
cursor=runs_feed_cursor.backfill_cursor,
limit=limit,
filters=BulkActionsFilter(created_before=created_before_cursor),

should_fetch_backfills = _filters_apply_to_backfills(filters) if filters else True
if filters:
run_filters = _replace_created_before_with_cursor(filters, created_before_cursor)
backfill_filters = (
_bulk_action_filters_from_run_filters(run_filters) if should_fetch_backfills else None
)
]
else:
run_filters = RunsFilter(created_before=created_before_cursor)
backfill_filters = BulkActionsFilter(created_before=created_before_cursor)

if should_fetch_backfills:
backfills = [
GraphenePartitionBackfill(backfill)
for backfill in instance.get_backfills(
cursor=runs_feed_cursor.backfill_cursor,
limit=limit,
filters=backfill_filters,
)
]
else:
backfills = []

runs = [
GrapheneRun(run)
for run in _fetch_runs_not_in_backfill(
instance,
cursor=runs_feed_cursor.run_cursor,
limit=fetch_limit,
created_before=created_before_cursor,
filters=run_filters,
)
]

Expand All @@ -518,10 +601,7 @@ def get_runs_feed_entries(
reverse=True,
)

if limit:
to_return = all_entries[:limit]
else:
to_return = all_entries
to_return = all_entries[:limit]

new_run_cursor = None
new_backfill_cursor = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ class Meta:
graphene.NonNull(GrapheneRunsFeedConnectionOrError),
limit=graphene.NonNull(graphene.Int),
cursor=graphene.String(),
description="Retrieve entries for the Runs Feed after applying cursor and limit.",
filter=graphene.Argument(GrapheneRunsFilter),
description="Retrieve entries for the Runs Feed after applying a filter, cursor and limit.",
)
runTagKeysOrError = graphene.Field(
GrapheneRunTagKeysOrError, description="Retrieve the distinct tag keys from all runs."
Expand Down Expand Up @@ -843,8 +844,12 @@ def resolve_runsFeedOrError(
graphene_info: ResolveInfo,
limit: int,
cursor: Optional[str] = None,
filter: Optional[GrapheneRunsFilter] = None, # noqa: A002
):
return get_runs_feed_entries(graphene_info=graphene_info, cursor=cursor, limit=limit)
selector = filter.to_selector() if filter is not None else None
return get_runs_feed_entries(
graphene_info=graphene_info, cursor=cursor, limit=limit, filters=selector
)

@capture_error
def resolve_partitionSetsOrError(
Expand Down
Loading

1 comment on commit 4cae513

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-avezd8vk8-elementl.vercel.app

Built with commit 4cae513.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.