From 4cae513e8c9231ef954c9ae2e770a2fcc5e0fcfd Mon Sep 17 00:00:00 2001 From: jamiedemaria Date: Mon, 23 Sep 2024 15:44:51 -0400 Subject: [PATCH] Revert "Revert "support filtering in the runs feed"" (#24661) Reverts dagster-io/dagster#24654 Adds runs feed filtering back in with method to skip filtering tests for plus since filtering won't be implemented there for a few more days internal pr https://github.com/dagster-io/internal/pull/11719 NOCHANGELOG --- .../ui-core/src/graphql/schema.graphql | 2 +- .../packages/ui-core/src/graphql/types.ts | 1 + .../implementation/fetch_runs.py | 118 +++- .../dagster_graphql/schema/roots/query.py | 9 +- .../graphql/test_runs_feed.py | 535 +++++++++++++++++- .../dagster/_core/execution/backfill.py | 6 +- 6 files changed, 631 insertions(+), 40 deletions(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index a4ebe4c8772ba..cd6ab68aa6f97 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -3274,7 +3274,7 @@ type Query { pipelineRunOrError(runId: ID!): RunOrError! runsOrError(filter: RunsFilter, cursor: String, limit: Int): RunsOrError! runOrError(runId: ID!): RunOrError! - runsFeedOrError(limit: Int!, cursor: String): RunsFeedConnectionOrError! + runsFeedOrError(limit: Int!, cursor: String, filter: RunsFilter): RunsFeedConnectionOrError! runTagKeysOrError: RunTagKeysOrError runTagsOrError(tagKeys: [String!], valuePrefix: String, limit: Int): RunTagsOrError runIdsOrError(filter: RunsFilter, cursor: String, limit: Int): RunIdsOrError! diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 8c7d63fb34a3d..6fa5e13a930e1 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -3957,6 +3957,7 @@ export type QueryRunTagsOrErrorArgs = { export type QueryRunsFeedOrErrorArgs = { cursor?: InputMaybe; + filter?: InputMaybe; limit: Scalars['Int']['input']; }; diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py index f8ba3ad49de96..a124f77dd7b35 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py @@ -19,12 +19,12 @@ ) from dagster._core.definitions.selector import JobSubsetSelector from dagster._core.errors import DagsterInvariantViolationError, DagsterRunNotFoundError -from dagster._core.execution.backfill import BulkActionsFilter +from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus from dagster._core.instance import DagsterInstance from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter from dagster._core.storage.event_log.base import AssetRecord from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_tag_type -from dagster._record import record +from dagster._record import copy, record from dagster._time import datetime_from_timestamp from dagster_graphql.implementation.external import ensure_valid_config, get_external_job_or_raise @@ -432,16 +432,14 @@ def _fetch_runs_not_in_backfill( instance: DagsterInstance, cursor: Optional[str], limit: int, - created_before: Optional[datetime.datetime], + filters: Optional[RunsFilter], ) -> Sequence[RunRecord]: """Fetches limit RunRecords that are not part of a backfill and were created before a given timestamp.""" - runs_filter = RunsFilter(created_before=created_before) if created_before else None - runs = [] while len(runs) < limit: # fetch runs in a loop and discard runs that are part of a backfill until we have # limit runs to return or have reached the end of the runs table - new_runs = instance.get_run_records(limit=limit, cursor=cursor, filters=runs_filter) + new_runs = instance.get_run_records(limit=limit, cursor=cursor, filters=filters) if len(new_runs) == 0: return runs cursor = new_runs[-1].dagster_run.run_id @@ -450,9 +448,77 @@ def _fetch_runs_not_in_backfill( return runs[:limit] +RUN_STATUS_TO_BULK_ACTION_STATUSES = { + DagsterRunStatus.SUCCESS: [BulkActionStatus.COMPLETED_SUCCESS], + DagsterRunStatus.FAILURE: [BulkActionStatus.FAILED, BulkActionStatus.COMPLETED_FAILED], + DagsterRunStatus.CANCELED: [BulkActionStatus.CANCELED], + DagsterRunStatus.CANCELING: [BulkActionStatus.CANCELING], + DagsterRunStatus.STARTED: [BulkActionStatus.REQUESTED], +} + + +def _bulk_action_statuses_from_run_statuses( + statuses: Sequence[DagsterRunStatus], +) -> Sequence[BulkActionStatus]: + full_list = [] + for status in statuses: + full_list.extend(RUN_STATUS_TO_BULK_ACTION_STATUSES.get(status, [])) + + return full_list + + +def _filters_apply_to_backfills(filters: RunsFilter) -> bool: + # the following filters do not apply to backfills, so skip fetching backfills if they are set + if ( + (filters.run_ids is not None and len(filters.run_ids) > 0) + or filters.updated_after is not None + or filters.updated_before is not None + or filters.snapshot_id is not None + ): + return False + # if filtering by statuses and all are not valid backfill statuses, skip fetching backfills + if filters.statuses and len(_bulk_action_statuses_from_run_statuses(filters.statuses)) == 0: + return False + + return True + + +def _bulk_action_filters_from_run_filters(filters: RunsFilter) -> BulkActionsFilter: + converted_statuses = ( + _bulk_action_statuses_from_run_statuses(filters.statuses) if filters.statuses else None + ) + return BulkActionsFilter( + created_before=filters.created_before, + created_after=filters.created_after, + statuses=converted_statuses, + job_name=filters.job_name, + tags=filters.tags, + ) + + +def _replace_created_before_with_cursor( + filters: RunsFilter, created_before_cursor: Optional[datetime.datetime] +): + """After the first page of results is returned, created_before_cursor will be less than + filters.created_before. For pagination of results to work, we need to ensure that the + created_before filter is set to the minimum of created_before_cursor and filters.created_before. + """ + if filters.created_before and created_before_cursor: + created_before = min(created_before_cursor, filters.created_before) + elif created_before_cursor: + created_before = created_before_cursor + elif filters.created_before: + created_before = filters.created_before + else: # no created_before should be applied, return filters as is + return filters + + return copy(filters, created_before=created_before) + + def get_runs_feed_entries( graphene_info: "ResolveInfo", limit: int, + filters: Optional[RunsFilter], cursor: Optional[str] = None, ) -> "GrapheneRunsFeedConnection": """Returns a GrapheneRunsFeedConnection, which contains a merged list of backfills and @@ -463,6 +529,7 @@ def get_runs_feed_entries( limit (int): max number of results to return cursor (Optional[str]): String that can be deserialized into a RunsFeedCursor. If None, indicates that querying should start at the beginning of the table for both runs and backfills. + filters (Optional[RunsFilter]): Filters to apply to the runs. If None, no filters are applied. """ from dagster_graphql.schema.backfill import GraphenePartitionBackfill from dagster_graphql.schema.pipelines.pipeline import GrapheneRun @@ -470,6 +537,7 @@ def get_runs_feed_entries( check.opt_str_param(cursor, "cursor") check.int_param(limit, "limit") + check.opt_inst_param(filters, "filters", RunsFilter) instance = graphene_info.context.instance runs_feed_cursor = RunsFeedCursor.from_string(cursor) @@ -477,25 +545,40 @@ def get_runs_feed_entries( # if using limit, fetch limit+1 of each type to know if there are more than limit remaining fetch_limit = limit + 1 # filter out any backfills/runs that are newer than the cursor timestamp. See RunsFeedCursor docstring - # for case when theis is necessary + # for case when this is necessary created_before_cursor = ( datetime_from_timestamp(runs_feed_cursor.timestamp) if runs_feed_cursor.timestamp else None ) - backfills = [ - GraphenePartitionBackfill(backfill) - for backfill in instance.get_backfills( - cursor=runs_feed_cursor.backfill_cursor, - limit=limit, - filters=BulkActionsFilter(created_before=created_before_cursor), + + should_fetch_backfills = _filters_apply_to_backfills(filters) if filters else True + if filters: + run_filters = _replace_created_before_with_cursor(filters, created_before_cursor) + backfill_filters = ( + _bulk_action_filters_from_run_filters(run_filters) if should_fetch_backfills else None ) - ] + else: + run_filters = RunsFilter(created_before=created_before_cursor) + backfill_filters = BulkActionsFilter(created_before=created_before_cursor) + + if should_fetch_backfills: + backfills = [ + GraphenePartitionBackfill(backfill) + for backfill in instance.get_backfills( + cursor=runs_feed_cursor.backfill_cursor, + limit=limit, + filters=backfill_filters, + ) + ] + else: + backfills = [] + runs = [ GrapheneRun(run) for run in _fetch_runs_not_in_backfill( instance, cursor=runs_feed_cursor.run_cursor, limit=fetch_limit, - created_before=created_before_cursor, + filters=run_filters, ) ] @@ -518,10 +601,7 @@ def get_runs_feed_entries( reverse=True, ) - if limit: - to_return = all_entries[:limit] - else: - to_return = all_entries + to_return = all_entries[:limit] new_run_cursor = None new_backfill_cursor = None diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py index 050a9af6d0d26..f764b4954ab91 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py @@ -371,7 +371,8 @@ class Meta: graphene.NonNull(GrapheneRunsFeedConnectionOrError), limit=graphene.NonNull(graphene.Int), cursor=graphene.String(), - description="Retrieve entries for the Runs Feed after applying cursor and limit.", + filter=graphene.Argument(GrapheneRunsFilter), + description="Retrieve entries for the Runs Feed after applying a filter, cursor and limit.", ) runTagKeysOrError = graphene.Field( GrapheneRunTagKeysOrError, description="Retrieve the distinct tag keys from all runs." @@ -843,8 +844,12 @@ def resolve_runsFeedOrError( graphene_info: ResolveInfo, limit: int, cursor: Optional[str] = None, + filter: Optional[GrapheneRunsFilter] = None, # noqa: A002 ): - return get_runs_feed_entries(graphene_info=graphene_info, cursor=cursor, limit=limit) + selector = filter.to_selector() if filter is not None else None + return get_runs_feed_entries( + graphene_info=graphene_info, cursor=cursor, limit=limit, filters=selector + ) @capture_error def resolve_partitionSetsOrError( diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py index 427167fa065f9..f847dd092a321 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py @@ -1,8 +1,10 @@ import time +from typing import Mapping, Optional import pytest from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill -from dagster._core.storage.dagster_run import DagsterRun +from dagster._core.remote_representation.origin import RemotePartitionSetOrigin +from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus from dagster._core.test_utils import create_run_for_test from dagster._core.utils import make_new_backfill_id from dagster._time import get_current_timestamp @@ -14,8 +16,8 @@ ) GET_RUNS_FEED_QUERY = """ -query RunsFeedEntryQuery($cursor: String, $limit: Int!) { - runsFeedOrError(cursor: $cursor, limit: $limit) { +query RunsFeedEntryQuery($cursor: String, $limit: Int!, $filter: RunsFilter) { + runsFeedOrError(cursor: $cursor, limit: $limit, filter: $filter) { ... on RunsFeedConnection { results { __typename @@ -54,30 +56,42 @@ CREATE_DELAY = 1 -def _create_run(graphql_context) -> DagsterRun: - return create_run_for_test( - instance=graphql_context.instance, - ) +def _create_run(graphql_context, **kwargs) -> DagsterRun: + return create_run_for_test(instance=graphql_context.instance, **kwargs) -def _create_run_for_backfill(graphql_context, backfill_id: str) -> DagsterRun: +def _create_run_for_backfill( + graphql_context, backfill_id: str, tags: Optional[Mapping[str, str]] = None, **kwargs +) -> DagsterRun: + if tags: + tags = {**tags, **DagsterRun.tags_for_backfill_id(backfill_id)} + else: + tags = DagsterRun.tags_for_backfill_id(backfill_id) return create_run_for_test( instance=graphql_context.instance, - tags={ - **DagsterRun.tags_for_backfill_id(backfill_id), - }, + tags=tags, + **kwargs, ) -def _create_backfill(graphql_context) -> str: +def _create_backfill( + graphql_context, + status: BulkActionStatus = BulkActionStatus.COMPLETED_SUCCESS, + tags: Optional[Mapping[str, str]] = None, + partition_set_origin: Optional[RemotePartitionSetOrigin] = None, +) -> str: + serialized_backfill_data = ( + "foo" if partition_set_origin is None else None + ) # the content of the backfill doesn't matter for testing fetching mega runs backfill = PartitionBackfill( backfill_id=make_new_backfill_id(), - serialized_asset_backfill_data="foo", # the content of the backfill doesn't matter for testing fetching mega runs - status=BulkActionStatus.COMPLETED_SUCCESS, + serialized_asset_backfill_data=serialized_backfill_data, + status=status, reexecution_steps=None, - tags=None, + tags=tags, backfill_timestamp=get_current_timestamp(), from_failure=False, + partition_set_origin=partition_set_origin, ) graphql_context.instance.add_backfill(backfill) return backfill.backfill_id @@ -105,6 +119,7 @@ def test_get_runs_feed(self, gql_context_with_runs_and_backfills): variables={ "limit": 25, "cursor": None, + "filter": None, }, ) prev_run_time = None @@ -119,6 +134,7 @@ def test_get_runs_feed(self, gql_context_with_runs_and_backfills): variables={ "limit": 10, "cursor": None, + "filter": None, }, ) @@ -142,6 +158,7 @@ def test_get_runs_feed(self, gql_context_with_runs_and_backfills): variables={ "limit": 10, "cursor": old_cursor, + "filter": None, }, ) @@ -160,6 +177,7 @@ def test_get_runs_feed_inexact_limit(self, gql_context_with_runs_and_backfills): variables={ "limit": 15, "cursor": None, + "filter": None, }, ) @@ -182,6 +200,7 @@ def test_get_runs_feed_inexact_limit(self, gql_context_with_runs_and_backfills): variables={ "limit": 10, "cursor": result.data["runsFeedOrError"]["cursor"], + "filter": None, }, ) @@ -200,6 +219,7 @@ def test_get_runs_feed_cursor_respected(self, gql_context_with_runs_and_backfill variables={ "limit": 10, "cursor": None, + "filter": None, }, ) @@ -230,6 +250,7 @@ def test_get_runs_feed_cursor_respected(self, gql_context_with_runs_and_backfill variables={ "limit": 10, "cursor": old_cursor.to_string(), + "filter": None, }, ) @@ -255,6 +276,9 @@ class TestRunsFeedUniqueSetups(ExecutingGraphQLContextTestMatrix): the other test suite. """ + def supports_filtering(self): + return True + def test_get_runs_feed_ignores_backfill_runs(self, graphql_context): for _ in range(10): _create_run_for_backfill(graphql_context, backfill_id="foo") @@ -267,6 +291,7 @@ def test_get_runs_feed_ignores_backfill_runs(self, graphql_context): variables={ "limit": 10, "cursor": None, + "filter": None, }, ) @@ -290,6 +315,7 @@ def test_get_runs_feed_no_runs_or_backfills_exist(self, graphql_context): variables={ "limit": 10, "cursor": None, + "filter": None, }, ) @@ -311,6 +337,7 @@ def test_get_runs_feed_one_backfill_long_ago(self, graphql_context): variables={ "limit": 10, "cursor": None, + "filter": None, }, ) @@ -341,6 +368,7 @@ def test_get_runs_feed_one_backfill_long_ago(self, graphql_context): variables={ "limit": 10, "cursor": result.data["runsFeedOrError"]["cursor"], + "filter": None, }, ) @@ -370,6 +398,7 @@ def test_get_runs_feed_one_new_backfill(self, graphql_context): variables={ "limit": 10, "cursor": None, + "filter": None, }, ) @@ -396,6 +425,7 @@ def test_get_runs_feed_one_new_backfill(self, graphql_context): variables={ "limit": 10, "cursor": result.data["runsFeedOrError"]["cursor"], + "filter": None, }, ) @@ -426,6 +456,7 @@ def test_get_runs_feed_backfill_created_between_calls(self, graphql_context): variables={ "limit": 5, "cursor": None, + "filter": None, }, ) @@ -457,6 +488,7 @@ def test_get_runs_feed_backfill_created_between_calls(self, graphql_context): variables={ "limit": 10, "cursor": result.data["runsFeedOrError"]["cursor"], + "filter": None, }, ) @@ -474,3 +506,476 @@ def test_get_runs_feed_backfill_created_between_calls(self, graphql_context): RunsFeedCursor.from_string(result.data["runsFeedOrError"]["cursor"]).backfill_cursor is None ) + + def test_get_runs_feed_filter_status(self, graphql_context): + _create_run(graphql_context, status=DagsterRunStatus.SUCCESS) + _create_run(graphql_context, status=DagsterRunStatus.CANCELING) + _create_run(graphql_context, status=DagsterRunStatus.FAILURE) + _create_run(graphql_context, status=DagsterRunStatus.NOT_STARTED) + time.sleep(CREATE_DELAY) + _create_backfill(graphql_context, status=BulkActionStatus.COMPLETED_SUCCESS) + _create_backfill(graphql_context, status=BulkActionStatus.COMPLETED_FAILED) + _create_backfill(graphql_context, status=BulkActionStatus.COMPLETED) + _create_backfill(graphql_context, status=BulkActionStatus.CANCELING) + _create_backfill(graphql_context, status=BulkActionStatus.CANCELED) + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": None, + }, + ) + + assert not result.errors + assert result.data + + assert len(result.data["runsFeedOrError"]["results"]) == 9 + assert not result.data["runsFeedOrError"]["hasMore"] + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": {"statuses": ["SUCCESS"]}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 2 + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": {"statuses": ["FAILURE"]}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 2 + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": {"statuses": ["CANCELING"]}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 2 + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": {"statuses": ["CANCELED"]}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 1 + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": {"statuses": ["NOT_STARTED"]}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 1 + + def test_get_runs_feed_filter_create_time(self, graphql_context): + nothing_created_ts = get_current_timestamp() + time.sleep(CREATE_DELAY) + for _ in range(5): + _create_run(graphql_context) + time.sleep(CREATE_DELAY) + _create_backfill(graphql_context) + + time.sleep(CREATE_DELAY) + half_created_ts = get_current_timestamp() + time.sleep(CREATE_DELAY) + for _ in range(5): + _create_run(graphql_context) + time.sleep(CREATE_DELAY) + _create_backfill(graphql_context) + + time.sleep(CREATE_DELAY) + all_created_ts = get_current_timestamp() + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 25, + "cursor": None, + "filter": None, + }, + ) + + assert not result.errors + assert result.data + + assert len(result.data["runsFeedOrError"]["results"]) == 20 + assert not result.data["runsFeedOrError"]["hasMore"] + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 25, + "cursor": None, + "filter": {"createdBefore": nothing_created_ts}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 0 + assert not result.data["runsFeedOrError"]["hasMore"] + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 25, + "cursor": None, + "filter": {"createdBefore": half_created_ts}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 10 + assert not result.data["runsFeedOrError"]["hasMore"] + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 25, + "cursor": None, + "filter": {"createdBefore": all_created_ts}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 20 + assert not result.data["runsFeedOrError"]["hasMore"] + + # ensure the cursor overrides the createdBefore filter when the query is called multiple times for + # pagination + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 6, + "cursor": None, + "filter": {"createdBefore": half_created_ts}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 6 + assert result.data["runsFeedOrError"]["hasMore"] + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 4, + "cursor": result.data["runsFeedOrError"]["cursor"], + "filter": {"createdBefore": half_created_ts}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 4 + assert not result.data["runsFeedOrError"]["hasMore"] + + def test_get_runs_feed_filter_job_name(self, graphql_context): + if not self.supports_filtering(): + return pytest.skip("storage does not support filtering backfills by job_name") + code_location = graphql_context.get_code_location("test") + repository = code_location.get_repository("test_repo") + + partition_set_origin = RemotePartitionSetOrigin( + repository_origin=repository.get_external_origin(), + partition_set_name="foo_partition", + ) + for _ in range(3): + _create_run(graphql_context, job_name="foo") + time.sleep(CREATE_DELAY) + backfill_id = _create_backfill( + graphql_context, partition_set_origin=partition_set_origin + ) + _create_run_for_backfill(graphql_context, backfill_id, job_name="foo") + + partition_set_origin = RemotePartitionSetOrigin( + repository_origin=repository.get_external_origin(), + partition_set_name="bar_partition", + ) + for _ in range(3): + _create_run(graphql_context, job_name="bar") + time.sleep(CREATE_DELAY) + backfill_id = _create_backfill( + graphql_context, partition_set_origin=partition_set_origin + ) + _create_run_for_backfill(graphql_context, backfill_id, job_name="bar") + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 20, + "cursor": None, + "filter": None, + }, + ) + + assert not result.errors + assert result.data + + assert len(result.data["runsFeedOrError"]["results"]) == 12 + assert not result.data["runsFeedOrError"]["hasMore"] + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": {"pipelineName": "foo"}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 6 + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": {"pipelineName": "bar"}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 6 + + def test_get_runs_feed_filter_tags(self, graphql_context): + if not self.supports_filtering(): + return pytest.skip("storage does not support filtering backfills by tag") + for _ in range(3): + _create_run(graphql_context, tags={"foo": "bar"}) + time.sleep(CREATE_DELAY) + backfill_id = _create_backfill(graphql_context, tags={"foo": "bar"}) + _create_run_for_backfill( + graphql_context, backfill_id, tags={"foo": "bar", "baz": "quux"} + ) + + for _ in range(3): + _create_run(graphql_context, tags={"foo": "baz"}) + time.sleep(CREATE_DELAY) + backfill_id = _create_backfill(graphql_context, tags={"one": "two"}) + _create_run_for_backfill(graphql_context, backfill_id, tags={"one": "two"}) + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 20, + "cursor": None, + "filter": None, + }, + ) + + assert not result.errors + assert result.data + + assert len(result.data["runsFeedOrError"]["results"]) == 12 + assert not result.data["runsFeedOrError"]["hasMore"] + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": {"tags": [{"key": "foo", "value": "bar"}]}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 6 + + # filtering for tags that are only on sub-runs of backfills should not return the backfill + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": {"tags": [{"key": "baz", "value": "quux"}]}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 0 + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": {"tags": [{"key": "foo", "value": "baz"}]}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 3 + + def test_get_runs_feed_filters_that_dont_apply_to_backfills(self, graphql_context): + run = _create_run(graphql_context) + time.sleep(CREATE_DELAY) + _create_backfill(graphql_context) + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 20, + "cursor": None, + "filter": None, + }, + ) + + assert not result.errors + assert result.data + + assert len(result.data["runsFeedOrError"]["results"]) == 2 + assert not result.data["runsFeedOrError"]["hasMore"] + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": {"runIds": [run.run_id]}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 1 + + def test_get_runs_feed_filter_tags_and_status(self, graphql_context): + if not self.supports_filtering(): + return pytest.skip("storage does not support filtering backfills by tag") + run_statuses = [ + DagsterRunStatus.SUCCESS, + DagsterRunStatus.FAILURE, + DagsterRunStatus.CANCELED, + ] + backfill_statuses = [ + BulkActionStatus.COMPLETED_SUCCESS, + BulkActionStatus.COMPLETED_FAILED, + BulkActionStatus.CANCELED, + ] + for i in range(3): + _create_run(graphql_context, tags={"foo": "bar"}, status=run_statuses[i]) + time.sleep(CREATE_DELAY) + backfill_id = _create_backfill( + graphql_context, tags={"foo": "bar"}, status=backfill_statuses[i] + ) + _create_run_for_backfill( + graphql_context, + backfill_id, + tags={"foo": "bar", "baz": "quux"}, + status=run_statuses[i], + ) + + for i in range(3): + _create_run(graphql_context, tags={"foo": "baz"}, status=run_statuses[i]) + time.sleep(CREATE_DELAY) + backfill_id = _create_backfill( + graphql_context, tags={"one": "two"}, status=backfill_statuses[i] + ) + _create_run_for_backfill( + graphql_context, backfill_id, tags={"one": "two"}, status=run_statuses[i] + ) + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 20, + "cursor": None, + "filter": None, + }, + ) + + assert not result.errors + assert result.data + + assert len(result.data["runsFeedOrError"]["results"]) == 12 + assert not result.data["runsFeedOrError"]["hasMore"] + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": {"tags": [{"key": "foo", "value": "bar"}], "statuses": ["SUCCESS"]}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 2 + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": { + "tags": [{"key": "foo", "value": "bar"}], + "statuses": ["FAILURE", "CANCELED"], + }, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 4 + + result = execute_dagster_graphql( + graphql_context, + GET_RUNS_FEED_QUERY, + variables={ + "limit": 10, + "cursor": None, + "filter": {"tags": [{"key": "foo", "value": "baz"}], "statuses": ["FAILURE"]}, + }, + ) + assert not result.errors + assert result.data + assert len(result.data["runsFeedOrError"]["results"]) == 1 diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index abb865f6def5b..2d6cd744834e9 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -58,9 +58,9 @@ class BulkActionsFilter: created_before (Optional[DateTime]): Filter by bulk actions that were created before this datetime. Note that the create_time for each bulk action is stored in UTC. created_after (Optional[DateTime]): Filter by bulk actions that were created after this datetime. Note that the - create_time for each bulk action is stored in UTC.t - tags (Optional[Dict[str, Union[str, List[str]]]]): - A dictionary of tags to query by. All tags specified here must be present for a given bulk action to pass the filter. + create_time for each bulk action is stored in UTC. + tags (Optional[Dict[str, Union[str, List[str]]]]): A dictionary of tags to query by. All tags specified + here must be present for a given bulk action to pass the filter. job_name (Optional[str]): Name of the job to query for. If blank, all job_names will be accepted. """