From 61f40b7d2cd574917fe874f25912d5488d1e764f Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 9 Aug 2024 15:47:03 -0400 Subject: [PATCH 01/15] basic create filtering for bulk actions table --- .../implementation/fetch_runs.py | 42 ++----------------- .../dagster/_core/instance/__init__.py | 11 ++++- .../dagster/_core/storage/legacy_storage.py | 7 +++- .../dagster/_core/storage/runs/base.py | 3 ++ .../_core/storage/runs/sql_run_storage.py | 10 +++++ 5 files changed, 32 insertions(+), 41 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py index 3e553dc5c617d..e90929c1db6e8 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py @@ -18,13 +18,12 @@ ) from dagster._core.definitions.selector import JobSubsetSelector from dagster._core.errors import DagsterInvariantViolationError, DagsterRunNotFoundError -from dagster._core.execution.backfill import PartitionBackfill from dagster._core.instance import DagsterInstance from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter from dagster._core.storage.event_log.base import AssetRecord from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_tag_type from dagster._record import record -from dagster._time import datetime_from_timestamp, get_current_timestamp +from dagster._time import datetime_from_timestamp from .external import ensure_valid_config, get_external_job_or_raise @@ -458,38 +457,6 @@ def _fetch_runs_not_in_backfill( return runs[:limit] -def _fetch_backfills_created_before_timestamp( - instance: DagsterInstance, - cursor: Optional[str], - limit: int, - created_before: Optional[float] = None, -) -> Sequence[PartitionBackfill]: - """Fetches limit PartitionBackfills that were created before a given timestamp. - - Note: This is a reasonable filter to add to the get_backfills instance method. However, we should have a - more generalized way of adding filters than adding new parameters to get_backfills. So for now, doing this - in a separate function. - """ - created_before = created_before if created_before else get_current_timestamp() - backfills = [] - while len(backfills) < limit: - # fetch backfills in a loop discarding backfills that were created after created_before until - # we have limit backfills to return or have reached the end of the backfills table - new_backfills = instance.get_backfills(cursor=cursor, limit=limit) - if len(new_backfills) == 0: - return backfills - cursor = new_backfills[-1].backfill_id - backfills.extend( - [ - backfill - for backfill in new_backfills - if backfill.backfill_timestamp <= created_before - ] - ) - - return backfills[:limit] - - def get_runs_feed_entries( graphene_info: "ResolveInfo", limit: int, @@ -520,11 +487,8 @@ def get_runs_feed_entries( # for case when theis is necessary backfills = [ GraphenePartitionBackfill(backfill) - for backfill in _fetch_backfills_created_before_timestamp( - instance, - cursor=runs_feed_cursor.backfill_cursor, - limit=fetch_limit, - created_before=runs_feed_cursor.timestamp, + for backfill in instance.get_backfills( + cursor=cursor, limit=limit, created_before=runs_feed_cursor.timestamp ) ] runs = [ diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index ab9a797043fdf..4a82ecd372215 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -6,6 +6,7 @@ import weakref from abc import abstractmethod from collections import defaultdict +from datetime import datetime from enum import Enum from tempfile import TemporaryDirectory from types import TracebackType @@ -3076,8 +3077,16 @@ def get_backfills( status: Optional["BulkActionStatus"] = None, cursor: Optional[str] = None, limit: Optional[int] = None, + created_before: Optional[datetime] = None, + created_after: Optional[datetime] = None, ) -> Sequence["PartitionBackfill"]: - return self._run_storage.get_backfills(status=status, cursor=cursor, limit=limit) + return self._run_storage.get_backfills( + status=status, + cursor=cursor, + limit=limit, + created_before=created_before, + created_after=created_after, + ) def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]: return self._run_storage.get_backfill(backfill_id) diff --git a/python_modules/dagster/dagster/_core/storage/legacy_storage.py b/python_modules/dagster/dagster/_core/storage/legacy_storage.py index fff8e919e44aa..c18bb801878d3 100644 --- a/python_modules/dagster/dagster/_core/storage/legacy_storage.py +++ b/python_modules/dagster/dagster/_core/storage/legacy_storage.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Set, Tuple, Union from dagster import _check as check @@ -312,8 +313,12 @@ def get_backfills( status: Optional["BulkActionStatus"] = None, cursor: Optional[str] = None, limit: Optional[int] = None, + created_before: Optional[datetime] = None, + created_after: Optional[datetime] = None, ) -> Sequence["PartitionBackfill"]: - return self._storage.run_storage.get_backfills(status, cursor, limit) + return self._storage.run_storage.get_backfills( + status, cursor, limit, created_before=created_before, created_after=created_after + ) def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]: return self._storage.run_storage.get_backfill(backfill_id) diff --git a/python_modules/dagster/dagster/_core/storage/runs/base.py b/python_modules/dagster/dagster/_core/storage/runs/base.py index b7b631fec768f..3e72ede9f1827 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/base.py +++ b/python_modules/dagster/dagster/_core/storage/runs/base.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from datetime import datetime from typing import TYPE_CHECKING, Dict, Mapping, Optional, Sequence, Set, Tuple, Union from typing_extensions import TypedDict @@ -373,6 +374,8 @@ def get_backfills( status: Optional[BulkActionStatus] = None, cursor: Optional[str] = None, limit: Optional[int] = None, + created_before: Optional[datetime] = None, + created_after: Optional[datetime] = None, ) -> Sequence[PartitionBackfill]: """Get a list of partition backfills.""" diff --git a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py index 938ebaa5302f4..b87381bd75b82 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py +++ b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py @@ -837,6 +837,8 @@ def get_backfills( status: Optional[BulkActionStatus] = None, cursor: Optional[str] = None, limit: Optional[int] = None, + created_before: Optional[datetime] = None, + created_after: Optional[datetime] = None, ) -> Sequence[PartitionBackfill]: check.opt_inst_param(status, "status", BulkActionStatus) query = db_select([BulkActionsTable.c.body]) @@ -847,6 +849,14 @@ def get_backfills( BulkActionsTable.c.key == cursor ) query = query.where(BulkActionsTable.c.id < cursor_query) + if created_after: + query = query.where( + BulkActionsTable.c.timestamp > created_after.replace(tzinfo=None).timestamp() + ) + if created_before: + query = query.where( + BulkActionsTable.c.timestamp < created_before.replace(tzinfo=None).timestamp() + ) if limit: query = query.limit(limit) query = query.order_by(BulkActionsTable.c.id.desc()) From 707e547bd04c184bb74fce70ac65f0f0061205db Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 13 Aug 2024 14:52:12 -0400 Subject: [PATCH 02/15] pass single datetime rather than converting --- .../dagster_graphql/implementation/fetch_runs.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py index e90929c1db6e8..e7b82a9127aa0 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py @@ -1,3 +1,4 @@ +import datetime from collections import defaultdict from typing import ( TYPE_CHECKING, @@ -435,14 +436,10 @@ def _fetch_runs_not_in_backfill( instance: DagsterInstance, cursor: Optional[str], limit: int, - created_before: Optional[float], + created_before: Optional[datetime.datetime], ) -> Sequence[RunRecord]: """Fetches limit RunRecords that are not part of a backfill and were created before a given timestamp.""" - runs_filter = ( - RunsFilter(created_before=datetime_from_timestamp(created_before)) - if created_before - else None - ) + runs_filter = RunsFilter(created_before=created_before) if created_before else None runs = [] while len(runs) < limit: @@ -485,10 +482,13 @@ def get_runs_feed_entries( fetch_limit = limit + 1 # filter out any backfills/runs that are newer than the cursor timestamp. See RunsFeedCursor docstring # for case when theis is necessary + created_before_cursor = ( + datetime_from_timestamp(runs_feed_cursor.timestamp) if runs_feed_cursor.timestamp else None + ) backfills = [ GraphenePartitionBackfill(backfill) for backfill in instance.get_backfills( - cursor=cursor, limit=limit, created_before=runs_feed_cursor.timestamp + cursor=cursor, limit=limit, created_before=created_before_cursor ) ] runs = [ @@ -497,7 +497,7 @@ def get_runs_feed_entries( instance, cursor=runs_feed_cursor.run_cursor, limit=fetch_limit, - created_before=runs_feed_cursor.timestamp, + created_before=created_before_cursor, ) ] From 21ba3187a8beae92c538810fa92bfc3f55b1a20e Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 13 Aug 2024 16:28:09 -0400 Subject: [PATCH 03/15] wip --- .../graphql/test_runs_feed.py | 31 ++++++++++++++++++- .../_core/storage/runs/sql_run_storage.py | 8 ++--- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py index 0cae1b6df0e3e..fe5602fe9d5e7 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py @@ -51,7 +51,7 @@ # CURRENT_TIMESTAMP only has second precision for sqlite, so if we create runs and backfills without any delay # the resulting list is a chunk of runs and then a chunk of backfills when ordered by time. Adding a small # delay between creating a run and a backfill makes the resulting list more interwoven -CREATE_DELAY = 0.5 +CREATE_DELAY = 1 def _create_run(graphql_context) -> DagsterRun: @@ -99,6 +99,24 @@ def gql_context_with_runs_and_backfills(self, class_scoped_graphql_context): return class_scoped_graphql_context def test_get_runs_feed(self, gql_context_with_runs_and_backfills): + result = execute_dagster_graphql( + gql_context_with_runs_and_backfills.create_request_context(), + GET_RUNS_FEED_QUERY, + variables={ + "limit": 25, + "cursor": None, + }, + ) + prev_run_time = None + id_to_timestamp_mapping = {} + for res in result.data["runsFeedOrError"]["results"]: + id_to_timestamp_mapping[res["runId"]] = res["creationTime"] + if prev_run_time: + assert res["creationTime"] <= prev_run_time + prev_run_time = res["creationTime"] + + print(id_to_timestamp_mapping) + result = execute_dagster_graphql( gql_context_with_runs_and_backfills.create_request_context(), GET_RUNS_FEED_QUERY, @@ -113,11 +131,15 @@ def test_get_runs_feed(self, gql_context_with_runs_and_backfills): assert len(result.data["runsFeedOrError"]["results"]) == 10 prev_run_time = None + id_to_timestamp_mapping = {} for res in result.data["runsFeedOrError"]["results"]: + id_to_timestamp_mapping[res["runId"]] = res["creationTime"] if prev_run_time: assert res["creationTime"] <= prev_run_time prev_run_time = res["creationTime"] + print(id_to_timestamp_mapping) + assert result.data["runsFeedOrError"]["hasMore"] old_cursor = result.data["runsFeedOrError"]["cursor"] assert old_cursor is not None @@ -131,12 +153,19 @@ def test_get_runs_feed(self, gql_context_with_runs_and_backfills): }, ) + id_to_timestamp_mapping = {} + for res in result.data["runsFeedOrError"]["results"]: + id_to_timestamp_mapping[res["runId"]] = res["creationTime"] + assert len(result.data["runsFeedOrError"]["results"]) == 10 for res in result.data["runsFeedOrError"]["results"]: if prev_run_time: assert res["creationTime"] <= prev_run_time prev_run_time = res["creationTime"] + print(id_to_timestamp_mapping) + # assert False + assert not result.data["runsFeedOrError"]["hasMore"] def test_get_runs_feed_inexact_limit(self, gql_context_with_runs_and_backfills): diff --git a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py index b87381bd75b82..5d4bc5d7d94f8 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py +++ b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py @@ -850,13 +850,9 @@ def get_backfills( ) query = query.where(BulkActionsTable.c.id < cursor_query) if created_after: - query = query.where( - BulkActionsTable.c.timestamp > created_after.replace(tzinfo=None).timestamp() - ) + query = query.where(BulkActionsTable.c.timestamp > created_after) if created_before: - query = query.where( - BulkActionsTable.c.timestamp < created_before.replace(tzinfo=None).timestamp() - ) + query = query.where(BulkActionsTable.c.timestamp < created_before) if limit: query = query.limit(limit) query = query.order_by(BulkActionsTable.c.id.desc()) From 196289aee102b30eba94a60f7b54040c0bb7424c Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 14 Aug 2024 11:28:33 -0400 Subject: [PATCH 04/15] debugging --- .../dagster_graphql_tests/graphql/test_runs_feed.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py index fe5602fe9d5e7..1e8560b2f2bf3 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py @@ -110,7 +110,7 @@ def test_get_runs_feed(self, gql_context_with_runs_and_backfills): prev_run_time = None id_to_timestamp_mapping = {} for res in result.data["runsFeedOrError"]["results"]: - id_to_timestamp_mapping[res["runId"]] = res["creationTime"] + id_to_timestamp_mapping[res["id"]] = res["creationTime"] if prev_run_time: assert res["creationTime"] <= prev_run_time prev_run_time = res["creationTime"] @@ -133,7 +133,7 @@ def test_get_runs_feed(self, gql_context_with_runs_and_backfills): prev_run_time = None id_to_timestamp_mapping = {} for res in result.data["runsFeedOrError"]["results"]: - id_to_timestamp_mapping[res["runId"]] = res["creationTime"] + id_to_timestamp_mapping[res["id"]] = res["creationTime"] if prev_run_time: assert res["creationTime"] <= prev_run_time prev_run_time = res["creationTime"] @@ -155,15 +155,17 @@ def test_get_runs_feed(self, gql_context_with_runs_and_backfills): id_to_timestamp_mapping = {} for res in result.data["runsFeedOrError"]["results"]: - id_to_timestamp_mapping[res["runId"]] = res["creationTime"] + id_to_timestamp_mapping[res["id"]] = res["creationTime"] - assert len(result.data["runsFeedOrError"]["results"]) == 10 for res in result.data["runsFeedOrError"]["results"]: if prev_run_time: assert res["creationTime"] <= prev_run_time prev_run_time = res["creationTime"] print(id_to_timestamp_mapping) + + assert len(result.data["runsFeedOrError"]["results"]) == 10 + # assert False assert not result.data["runsFeedOrError"]["hasMore"] From 5c65a3388cf923a084caef320cd77156fce61071 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 14 Aug 2024 11:55:56 -0400 Subject: [PATCH 05/15] actually pass the correct cursor --- .../implementation/fetch_runs.py | 4 +++- .../graphql/test_runs_feed.py | 19 +------------------ .../_core/storage/runs/sql_run_storage.py | 4 ++-- 3 files changed, 6 insertions(+), 21 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py index e7b82a9127aa0..cdbb3ddf481e8 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py @@ -488,7 +488,9 @@ def get_runs_feed_entries( backfills = [ GraphenePartitionBackfill(backfill) for backfill in instance.get_backfills( - cursor=cursor, limit=limit, created_before=created_before_cursor + cursor=runs_feed_cursor.backfill_cursor, + limit=limit, + created_before=created_before_cursor, ) ] runs = [ diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py index 1e8560b2f2bf3..a8e518a9ffcaf 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py @@ -108,15 +108,11 @@ def test_get_runs_feed(self, gql_context_with_runs_and_backfills): }, ) prev_run_time = None - id_to_timestamp_mapping = {} for res in result.data["runsFeedOrError"]["results"]: - id_to_timestamp_mapping[res["id"]] = res["creationTime"] if prev_run_time: assert res["creationTime"] <= prev_run_time prev_run_time = res["creationTime"] - print(id_to_timestamp_mapping) - result = execute_dagster_graphql( gql_context_with_runs_and_backfills.create_request_context(), GET_RUNS_FEED_QUERY, @@ -131,15 +127,11 @@ def test_get_runs_feed(self, gql_context_with_runs_and_backfills): assert len(result.data["runsFeedOrError"]["results"]) == 10 prev_run_time = None - id_to_timestamp_mapping = {} for res in result.data["runsFeedOrError"]["results"]: - id_to_timestamp_mapping[res["id"]] = res["creationTime"] if prev_run_time: assert res["creationTime"] <= prev_run_time prev_run_time = res["creationTime"] - print(id_to_timestamp_mapping) - assert result.data["runsFeedOrError"]["hasMore"] old_cursor = result.data["runsFeedOrError"]["cursor"] assert old_cursor is not None @@ -153,21 +145,12 @@ def test_get_runs_feed(self, gql_context_with_runs_and_backfills): }, ) - id_to_timestamp_mapping = {} - for res in result.data["runsFeedOrError"]["results"]: - id_to_timestamp_mapping[res["id"]] = res["creationTime"] - + assert len(result.data["runsFeedOrError"]["results"]) == 10 for res in result.data["runsFeedOrError"]["results"]: if prev_run_time: assert res["creationTime"] <= prev_run_time prev_run_time = res["creationTime"] - print(id_to_timestamp_mapping) - - assert len(result.data["runsFeedOrError"]["results"]) == 10 - - # assert False - assert not result.data["runsFeedOrError"]["hasMore"] def test_get_runs_feed_inexact_limit(self, gql_context_with_runs_and_backfills): diff --git a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py index 5d4bc5d7d94f8..2e0a996028a22 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py +++ b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py @@ -841,7 +841,7 @@ def get_backfills( created_after: Optional[datetime] = None, ) -> Sequence[PartitionBackfill]: check.opt_inst_param(status, "status", BulkActionStatus) - query = db_select([BulkActionsTable.c.body]) + query = db_select([BulkActionsTable.c.body, BulkActionsTable.c.timestamp]) if status: query = query.where(BulkActionsTable.c.status == status.value) if cursor: @@ -852,7 +852,7 @@ def get_backfills( if created_after: query = query.where(BulkActionsTable.c.timestamp > created_after) if created_before: - query = query.where(BulkActionsTable.c.timestamp < created_before) + query = query.where(BulkActionsTable.c.timestamp < created_before.replace(tzinfo=None)) if limit: query = query.limit(limit) query = query.order_by(BulkActionsTable.c.id.desc()) From bf1f747c77f8abdbc443469f38f67d2e024e7a35 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 15 Aug 2024 15:43:35 -0400 Subject: [PATCH 06/15] split branch --- .../dagster/_core/execution/backfill.py | 9 +++++++ .../dagster/_core/instance/__init__.py | 16 +++++------ .../dagster/_core/storage/legacy_storage.py | 14 +++++----- .../dagster/_core/storage/runs/base.py | 6 ++--- .../_core/storage/runs/sql_run_storage.py | 27 ++++++++++++------- 5 files changed, 43 insertions(+), 29 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index 10f9d62b6ad85..e74946ba2c8e9 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -1,3 +1,4 @@ +from datetime import datetime from enum import Enum from typing import Mapping, NamedTuple, Optional, Sequence, Union @@ -14,6 +15,7 @@ from dagster._core.remote_representation.origin import RemotePartitionSetOrigin from dagster._core.storage.tags import USER_TAG from dagster._core.workspace.workspace import IWorkspace +from dagster._record import record from dagster._serdes import whitelist_for_serdes from dagster._utils.error import SerializableErrorInfo @@ -38,6 +40,13 @@ def from_graphql_input(graphql_str): return BulkActionStatus(graphql_str) +@record +class BulkActionsFilter: + status: Optional[BulkActionStatus] = None + created_before: Optional[datetime] = None + created_after: Optional[datetime] = None + + @whitelist_for_serdes class PartitionBackfill( NamedTuple( diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 4a82ecd372215..1134a8fac584a 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -6,7 +6,6 @@ import weakref from abc import abstractmethod from collections import defaultdict -from datetime import datetime from enum import Enum from tempfile import TemporaryDirectory from types import TracebackType @@ -124,7 +123,11 @@ JobFailureData, ) from dagster._core.events.log import EventLogEntry - from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill + from dagster._core.execution.backfill import ( + BulkActionsFilter, + BulkActionStatus, + PartitionBackfill, + ) from dagster._core.execution.plan.plan import ExecutionPlan from dagster._core.execution.plan.resume_retry import ReexecutionStrategy from dagster._core.execution.stats import RunStepKeyStatsSnapshot @@ -3077,15 +3080,10 @@ def get_backfills( status: Optional["BulkActionStatus"] = None, cursor: Optional[str] = None, limit: Optional[int] = None, - created_before: Optional[datetime] = None, - created_after: Optional[datetime] = None, + filters: Optional["BulkActionsFilter"] = None, ) -> Sequence["PartitionBackfill"]: return self._run_storage.get_backfills( - status=status, - cursor=cursor, - limit=limit, - created_before=created_before, - created_after=created_after, + status=status, cursor=cursor, limit=limit, filters=filters ) def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]: diff --git a/python_modules/dagster/dagster/_core/storage/legacy_storage.py b/python_modules/dagster/dagster/_core/storage/legacy_storage.py index c18bb801878d3..5b3cdb4dfbbe0 100644 --- a/python_modules/dagster/dagster/_core/storage/legacy_storage.py +++ b/python_modules/dagster/dagster/_core/storage/legacy_storage.py @@ -1,4 +1,3 @@ -from datetime import datetime from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Set, Tuple, Union from dagster import _check as check @@ -34,7 +33,11 @@ from dagster._core.event_api import AssetRecordsFilter, RunStatusChangeRecordsFilter from dagster._core.events import DagsterEvent, DagsterEventType from dagster._core.events.log import EventLogEntry - from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill + from dagster._core.execution.backfill import ( + BulkActionsFilter, + BulkActionStatus, + PartitionBackfill, + ) from dagster._core.execution.stats import RunStepKeyStatsSnapshot from dagster._core.instance import DagsterInstance from dagster._core.remote_representation.origin import RemoteJobOrigin @@ -313,12 +316,9 @@ def get_backfills( status: Optional["BulkActionStatus"] = None, cursor: Optional[str] = None, limit: Optional[int] = None, - created_before: Optional[datetime] = None, - created_after: Optional[datetime] = None, + filters: Optional["BulkActionsFilter"] = None, ) -> Sequence["PartitionBackfill"]: - return self._storage.run_storage.get_backfills( - status, cursor, limit, created_before=created_before, created_after=created_after - ) + return self._storage.run_storage.get_backfills(status, cursor, limit, filters=filters) def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]: return self._storage.run_storage.get_backfill(backfill_id) diff --git a/python_modules/dagster/dagster/_core/storage/runs/base.py b/python_modules/dagster/dagster/_core/storage/runs/base.py index 3e72ede9f1827..1f9c35d9ba768 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/base.py +++ b/python_modules/dagster/dagster/_core/storage/runs/base.py @@ -1,11 +1,10 @@ from abc import ABC, abstractmethod -from datetime import datetime from typing import TYPE_CHECKING, Dict, Mapping, Optional, Sequence, Set, Tuple, Union from typing_extensions import TypedDict from dagster._core.events import DagsterEvent -from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill from dagster._core.execution.telemetry import RunTelemetryData from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance from dagster._core.snap import ExecutionPlanSnapshot, JobSnapshot @@ -374,8 +373,7 @@ def get_backfills( status: Optional[BulkActionStatus] = None, cursor: Optional[str] = None, limit: Optional[int] = None, - created_before: Optional[datetime] = None, - created_after: Optional[datetime] = None, + filters: Optional[BulkActionsFilter] = None, ) -> Sequence[PartitionBackfill]: """Get a list of partition backfills.""" diff --git a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py index 2e0a996028a22..70e7754d51199 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py +++ b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py @@ -38,7 +38,7 @@ DagsterEventType, RunFailureReason, ) -from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill from dagster._core.remote_representation.origin import RemoteJobOrigin from dagster._core.snap import ( ExecutionPlanSnapshot, @@ -837,22 +837,31 @@ def get_backfills( status: Optional[BulkActionStatus] = None, cursor: Optional[str] = None, limit: Optional[int] = None, - created_before: Optional[datetime] = None, - created_after: Optional[datetime] = None, + filters: Optional[BulkActionsFilter] = None, ) -> Sequence[PartitionBackfill]: check.opt_inst_param(status, "status", BulkActionStatus) query = db_select([BulkActionsTable.c.body, BulkActionsTable.c.timestamp]) - if status: - query = query.where(BulkActionsTable.c.status == status.value) + if status or (filters and filters.status): + if status and filters and filters.status and status != filters.status: + raise DagsterInvariantViolationError( + "Conflicting status filters provided to get_backfills. Choose one of status or BulkActionsFilter.status." + ) + status = status or (filters.status if filters else None) + if status: # should also be non-None at this point, but pyright must be appeased + query = query.where(BulkActionsTable.c.status == status.value) if cursor: cursor_query = db_select([BulkActionsTable.c.id]).where( BulkActionsTable.c.key == cursor ) query = query.where(BulkActionsTable.c.id < cursor_query) - if created_after: - query = query.where(BulkActionsTable.c.timestamp > created_after) - if created_before: - query = query.where(BulkActionsTable.c.timestamp < created_before.replace(tzinfo=None)) + if filters and filters.created_after: + query = query.where( + BulkActionsTable.c.timestamp > filters.created_after.replace(tzinfo=None) + ) + if filters and filters.created_before: + query = query.where( + BulkActionsTable.c.timestamp < filters.created_before.replace(tzinfo=None) + ) if limit: query = query.limit(limit) query = query.order_by(BulkActionsTable.c.id.desc()) From 5ffe3cf399feb88d2089826e29492c031802435a Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 15 Aug 2024 15:53:52 -0400 Subject: [PATCH 07/15] test suite --- .../storage_tests/utils/run_storage.py | 73 ++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py index 43622082b016a..acfc329cd117e 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py @@ -14,7 +14,7 @@ DagsterSnapshotDoesNotExist, ) from dagster._core.events import DagsterEvent, DagsterEventType, JobFailureData, RunFailureReason -from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill from dagster._core.instance import DagsterInstance, InstanceType from dagster._core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher from dagster._core.remote_representation import ( @@ -1337,6 +1337,77 @@ def test_backfill(self, storage: RunStorage): assert len(storage.get_backfills()) == 1 assert len(storage.get_backfills(status=BulkActionStatus.REQUESTED)) == 0 + def test_backfill_status_filtering(self, storage: RunStorage): + origin = self.fake_partition_set_origin("fake_partition_set") + backfills = storage.get_backfills() + assert len(backfills) == 0 + + one = PartitionBackfill( + "one", + partition_set_origin=origin, + status=BulkActionStatus.REQUESTED, + partition_names=["a", "b", "c"], + from_failure=False, + tags={}, + backfill_timestamp=time.time(), + ) + storage.add_backfill(one) + assert ( + len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED))) + == 1 + ) + assert ( + len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED))) + == 0 + ) + backfill = storage.get_backfills( + filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED) + ) + assert backfill == one + + storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED)) + assert ( + len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED))) + == 0 + ) + assert ( + len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED))) + == 1 + ) + + def test_backfill_created_time_filtering(self, storage: RunStorage): + origin = self.fake_partition_set_origin("fake_partition_set") + backfills = storage.get_backfills() + assert len(backfills) == 0 + + all_backfills = [] + for i in range(5): + backfill = PartitionBackfill( + f"backfill_{i}", + partition_set_origin=origin, + status=BulkActionStatus.REQUESTED, + partition_names=["a", "b", "c"], + from_failure=False, + tags={}, + backfill_timestamp=time.time(), + ) + storage.add_backfill(backfill) + all_backfills.append(backfill) + + created_before = storage.get_backfills( + filters=BulkActionsFilter(created_before=all_backfills[3].backfill_timestamp) + ) + assert len(created_before) == 2 + for backfill in created_before: + assert backfill.backfill_timestamp < all_backfills[3].backfill_timestamp + + created_after = storage.get_backfills( + filters=BulkActionsFilter(created_after=all_backfills[3].backfill_timestamp) + ) + assert len(created_after) == 2 + for backfill in created_after: + assert backfill.backfill_timestamp > all_backfills[3].backfill_timestamp + def test_secondary_index(self, storage): self._skip_in_memory(storage) From 4015656c24871f24ee9c47e6a9da94fe97088a55 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 15 Aug 2024 16:07:35 -0400 Subject: [PATCH 08/15] fix --- .../dagster_tests/storage_tests/utils/run_storage.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py index acfc329cd117e..e485ad4d93fa6 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py @@ -44,7 +44,7 @@ from dagster._daemon.daemon import SensorDaemon from dagster._daemon.types import DaemonHeartbeat from dagster._serdes import serialize_pp -from dagster._time import create_datetime +from dagster._time import create_datetime, datetime_from_timestamp win_py36 = _seven.IS_WINDOWS and sys.version_info[0] == 3 and sys.version_info[1] == 6 @@ -1395,14 +1395,18 @@ def test_backfill_created_time_filtering(self, storage: RunStorage): all_backfills.append(backfill) created_before = storage.get_backfills( - filters=BulkActionsFilter(created_before=all_backfills[3].backfill_timestamp) + filters=BulkActionsFilter( + created_before=datetime_from_timestamp(all_backfills[3].backfill_timestamp) + ) ) assert len(created_before) == 2 for backfill in created_before: assert backfill.backfill_timestamp < all_backfills[3].backfill_timestamp created_after = storage.get_backfills( - filters=BulkActionsFilter(created_after=all_backfills[3].backfill_timestamp) + filters=BulkActionsFilter( + created_after=datetime_from_timestamp(all_backfills[3].backfill_timestamp) + ) ) assert len(created_after) == 2 for backfill in created_after: From 7b8d04fc7e2afda0caa62a0225b4d5427c36abf9 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 16 Aug 2024 10:50:30 -0400 Subject: [PATCH 09/15] fix tests --- .../dagster_tests/storage_tests/utils/run_storage.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py index e485ad4d93fa6..75f95ec862ca8 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py @@ -1360,10 +1360,10 @@ def test_backfill_status_filtering(self, storage: RunStorage): len(storage.get_backfills(filters=BulkActionsFilter(status=BulkActionStatus.COMPLETED))) == 0 ) - backfill = storage.get_backfills( + backfills = storage.get_backfills( filters=BulkActionsFilter(status=BulkActionStatus.REQUESTED) ) - assert backfill == one + assert backfills[0] == one storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED)) assert ( @@ -1396,21 +1396,21 @@ def test_backfill_created_time_filtering(self, storage: RunStorage): created_before = storage.get_backfills( filters=BulkActionsFilter( - created_before=datetime_from_timestamp(all_backfills[3].backfill_timestamp) + created_before=datetime_from_timestamp(all_backfills[2].backfill_timestamp) ) ) assert len(created_before) == 2 for backfill in created_before: - assert backfill.backfill_timestamp < all_backfills[3].backfill_timestamp + assert backfill.backfill_timestamp < all_backfills[2].backfill_timestamp created_after = storage.get_backfills( filters=BulkActionsFilter( - created_after=datetime_from_timestamp(all_backfills[3].backfill_timestamp) + created_after=datetime_from_timestamp(all_backfills[2].backfill_timestamp) ) ) assert len(created_after) == 2 for backfill in created_after: - assert backfill.backfill_timestamp > all_backfills[3].backfill_timestamp + assert backfill.backfill_timestamp > all_backfills[2].backfill_timestamp def test_secondary_index(self, storage): self._skip_in_memory(storage) From f1013a9454fd1644e439b73bb35ba164a35c5f6c Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 16 Aug 2024 12:12:56 -0400 Subject: [PATCH 10/15] missed one thing when i split the branch --- .../dagster_graphql/implementation/fetch_runs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py index cdbb3ddf481e8..20bf56921101b 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py @@ -19,6 +19,7 @@ ) from dagster._core.definitions.selector import JobSubsetSelector from dagster._core.errors import DagsterInvariantViolationError, DagsterRunNotFoundError +from dagster._core.execution.backfill import BulkActionsFilter from dagster._core.instance import DagsterInstance from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter from dagster._core.storage.event_log.base import AssetRecord @@ -490,7 +491,7 @@ def get_runs_feed_entries( for backfill in instance.get_backfills( cursor=runs_feed_cursor.backfill_cursor, limit=limit, - created_before=created_before_cursor, + filters=BulkActionsFilter(created_before=created_before_cursor), ) ] runs = [ From b5481c522d61fcc20b421dd93e2bddc89c7c8044 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 16 Aug 2024 12:53:41 -0400 Subject: [PATCH 11/15] try removing timezone none --- .../dagster/dagster/_core/storage/runs/sql_run_storage.py | 8 ++------ .../dagster_postgres_tests/test_run_storage.py | 1 + 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py index 70e7754d51199..ed9653c8b3b61 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py +++ b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py @@ -855,13 +855,9 @@ def get_backfills( ) query = query.where(BulkActionsTable.c.id < cursor_query) if filters and filters.created_after: - query = query.where( - BulkActionsTable.c.timestamp > filters.created_after.replace(tzinfo=None) - ) + query = query.where(BulkActionsTable.c.timestamp > filters.created_after) if filters and filters.created_before: - query = query.where( - BulkActionsTable.c.timestamp < filters.created_before.replace(tzinfo=None) - ) + query = query.where(BulkActionsTable.c.timestamp < filters.created_before) if limit: query = query.limit(limit) query = query.order_by(BulkActionsTable.c.id.desc()) diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_run_storage.py b/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_run_storage.py index 153a17faeab64..fd389302832c1 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_run_storage.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_run_storage.py @@ -8,6 +8,7 @@ class TestPostgresRunStorage(TestRunStorage): + # TestPostgresRunStorage::test_backfill_created_time_filtering __test__ = True @pytest.fixture(scope="function", name="storage") From 997e59104a442c9373af5c5b7828755227d36e52 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 19 Aug 2024 12:20:32 -0400 Subject: [PATCH 12/15] assert --- .../dagster/dagster/_core/storage/runs/sql_run_storage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py index ed9653c8b3b61..571bbbfdf5d78 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py +++ b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py @@ -847,8 +847,8 @@ def get_backfills( "Conflicting status filters provided to get_backfills. Choose one of status or BulkActionsFilter.status." ) status = status or (filters.status if filters else None) - if status: # should also be non-None at this point, but pyright must be appeased - query = query.where(BulkActionsTable.c.status == status.value) + assert status + query = query.where(BulkActionsTable.c.status == status.value) if cursor: cursor_query = db_select([BulkActionsTable.c.id]).where( BulkActionsTable.c.key == cursor From 42de3ccedaf58dfde47c96b01b76176ee453052d Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 20 Aug 2024 10:18:07 -0400 Subject: [PATCH 13/15] comment --- .../dagster/dagster/_core/execution/backfill.py | 15 +++++++++++++++ .../dagster_postgres_tests/test_run_storage.py | 1 - 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index e74946ba2c8e9..e6d19be2610f6 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -42,6 +42,21 @@ def from_graphql_input(graphql_str): @record class BulkActionsFilter: + """Filters to use when querying for bulk actions (i.e. backfills) from the BulkActionsTable. + + Each field of the BulkActionsFilter represents a logical AND with each other. For + example, if you specify status and created_before, then you will receive only bulk actions + with the specified states AND the created before created_before. If left blank, then + all values will be permitted for that field. + + Args: + status (Optional[BulkActionStatus]): A status to filter by. + created_before (Optional[DateTime]): Filter by bulk actions that were created before this datetime. Note that the + create_time for each bulk action is stored in UTC. + created_after (Optional[DateTime]): Filter by bulk actions that were created after this datetime. Note that the + create_time for each bulk action is stored in UTC. + """ + status: Optional[BulkActionStatus] = None created_before: Optional[datetime] = None created_after: Optional[datetime] = None diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_run_storage.py b/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_run_storage.py index fd389302832c1..153a17faeab64 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_run_storage.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_run_storage.py @@ -8,7 +8,6 @@ class TestPostgresRunStorage(TestRunStorage): - # TestPostgresRunStorage::test_backfill_created_time_filtering __test__ = True @pytest.fixture(scope="function", name="storage") From 8bbcdfc48315d04355393cb3c1bee13bb987656b Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 14 Aug 2024 14:50:56 -0400 Subject: [PATCH 14/15] filters all the way through to gql layer --- .../ui-core/src/graphql/schema.graphql | 23 +++--- .../packages/ui-core/src/graphql/types.ts | 22 ++++++ .../implementation/fetch_backfills.py | 5 +- .../dagster_graphql/schema/inputs.py | 29 ++++++++ .../dagster_graphql/schema/roots/query.py | 4 ++ .../graphql/test_asset_backfill.py | 71 ++++++++++++++++++- 6 files changed, 143 insertions(+), 11 deletions(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index ef37cb99115bb..21aef663b61ec 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -2396,6 +2396,20 @@ enum AssetEventType { ASSET_OBSERVATION } +input BulkActionsFilter { + status: [BulkActionStatus!] + createdBefore: Float + createdAfter: Float +} + +enum BulkActionStatus { + REQUESTED + COMPLETED + FAILED + CANCELED + CANCELING +} + type DaemonHealth { id: String! daemonStatus(daemonType: String): DaemonStatus! @@ -2832,14 +2846,6 @@ type PartitionBackfill implements RunsFeedEntry { logEvents(cursor: String): InstigationEventConnection! } -enum BulkActionStatus { - REQUESTED - COMPLETED - FAILED - CANCELED - CANCELING -} - type AssetBackfillTargetPartitions { ranges: [PartitionKeyRange!] partitionKeys: [String!] @@ -3294,6 +3300,7 @@ type Query { status: BulkActionStatus cursor: String limit: Int + filters: BulkActionsFilter ): PartitionBackfillsOrError! permissions: [Permission!]! canBulkTerminate: Boolean! diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 2c2fe5f5248d8..801c9323f11a3 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -730,6 +730,12 @@ export enum BulkActionStatus { REQUESTED = 'REQUESTED', } +export type BulkActionsFilter = { + createdAfter?: InputMaybe; + createdBefore?: InputMaybe; + status?: InputMaybe>; +}; + export type CancelBackfillResult = CancelBackfillSuccess | PythonError | UnauthorizedError; export type CancelBackfillSuccess = { @@ -3877,6 +3883,7 @@ export type QueryPartitionBackfillOrErrorArgs = { export type QueryPartitionBackfillsOrErrorArgs = { cursor?: InputMaybe; + filters?: InputMaybe; limit?: InputMaybe; status?: InputMaybe; }; @@ -6966,6 +6973,21 @@ export const buildBoolMetadataEntry = ( }; }; +export const buildBulkActionsFilter = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): BulkActionsFilter => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('BulkActionsFilter'); + return { + createdAfter: + overrides && overrides.hasOwnProperty('createdAfter') ? overrides.createdAfter! : 6.09, + createdBefore: + overrides && overrides.hasOwnProperty('createdBefore') ? overrides.createdBefore! : 1.5, + status: overrides && overrides.hasOwnProperty('status') ? overrides.status! : [], + }; +}; + export const buildCancelBackfillSuccess = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_backfills.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_backfills.py index 5bf37b54ac3c6..8e750639e5f92 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_backfills.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_backfills.py @@ -1,6 +1,6 @@ from typing import TYPE_CHECKING, Optional -from dagster._core.execution.backfill import BulkActionStatus +from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus if TYPE_CHECKING: from dagster_graphql.schema.util import ResolveInfo @@ -23,11 +23,12 @@ def get_backfills( status: Optional[BulkActionStatus] = None, cursor: Optional[str] = None, limit: Optional[int] = None, + filters: Optional[BulkActionsFilter] = None, ) -> "GraphenePartitionBackfills": from ..schema.backfill import GraphenePartitionBackfill, GraphenePartitionBackfills backfills = graphene_info.context.instance.get_backfills( - status=status, cursor=cursor, limit=limit + status=status, cursor=cursor, limit=limit, filters=filters ) return GraphenePartitionBackfills( results=[GraphenePartitionBackfill(backfill) for backfill in backfills] diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py b/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py index f0e01d770e230..7c6398de07a20 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py @@ -1,6 +1,7 @@ import graphene from dagster._core.definitions.asset_key import AssetKey from dagster._core.events import DagsterEventType +from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter from dagster._time import datetime_from_timestamp from dagster._utils import check @@ -377,6 +378,33 @@ class Meta: name = "TagInput" +class GrapheneBulkActionsFilter(graphene.InputObjectType): + status = graphene.List( + graphene.NonNull("dagster_graphql.schema.backfill.GrapheneBulkActionStatus") + ) + createdBefore = graphene.InputField(graphene.Float) + createdAfter = graphene.InputField(graphene.Float) + + class Meta: + description = """This type represents a filter on Dagster Bulk Actions (backfills).""" + name = "BulkActionsFilter" + + def to_selector(self): + if self.status: + status = BulkActionStatus.from_graphql_input(self.status) + else: + status = None + + created_before = datetime_from_timestamp(self.createdBefore) if self.createdBefore else None + created_after = datetime_from_timestamp(self.createdAfter) if self.createdAfter else None + + return BulkActionsFilter( + status=status, + created_before=created_before, + created_after=created_after, + ) + + types = [ GrapheneAssetKeyInput, GrapheneExecutionMetadata, @@ -398,4 +426,5 @@ class Meta: GrapheneStepOutputHandle, GrapheneTagInput, GrapheneReportRunlessAssetEventsParams, + GrapheneBulkActionsFilter, ] diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py index 4fc6c8fa70d3e..ac69793e033ba 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py @@ -127,6 +127,7 @@ GrapheneAssetBackfillPreviewParams, GrapheneAssetGroupSelector, GrapheneAssetKeyInput, + GrapheneBulkActionsFilter, GrapheneGraphSelector, GrapheneInstigationSelector, GraphenePipelineSelector, @@ -474,6 +475,7 @@ class Meta: status=graphene.Argument(GrapheneBulkActionStatus), cursor=graphene.String(), limit=graphene.Int(), + filters=graphene.Argument(GrapheneBulkActionsFilter), description="Retrieve backfills after applying a status filter, cursor, and limit.", ) @@ -1105,12 +1107,14 @@ def resolve_partitionBackfillsOrError( status: Optional[GrapheneBulkActionStatus] = None, cursor: Optional[str] = None, limit: Optional[int] = None, + filters: Optional[GrapheneBulkActionsFilter] = None, ): return get_backfills( graphene_info, status=BulkActionStatus.from_graphql_input(status) if status else None, cursor=cursor, limit=limit, + filters=filters.to_selector() if filters else None, ) def resolve_assetBackfillPreview( diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py index f6435026bed67..aaf5014daf5ec 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py @@ -129,6 +129,20 @@ } """ +BACKFILLS_WITH_FILTERS_QUERY = """ + query BackfillsWithFiltersQuery($filters: BulkActionsFilter) { + partitionBackfillsOrError(filters: $filters) { + ... on PartitionBackfills { + results { + id + timestamp + status + } + } + } + } +""" + def get_repo() -> RepositoryDefinition: partitions_def = StaticPartitionsDefinition(["a", "b", "c"]) @@ -1056,7 +1070,9 @@ def test_asset_backfill_error_raised_upon_invalid_params_provided(): def _get_backfill_data( - launch_backfill_result: GqlResult, instance: DagsterInstance, repo + launch_backfill_result: GqlResult, + instance: DagsterInstance, + repo, ) -> Tuple[str, AssetBackfillData]: assert launch_backfill_result assert launch_backfill_result.data @@ -1131,3 +1147,56 @@ def override_backfill_storage_setting(self): ) assert len(backfill_logs.data["partitionBackfillOrError"]["logEvents"]["events"]) > 0 + + +def test_get_backfills_with_filters(): + repo = get_repo() + all_asset_keys = repo.asset_graph.materializable_asset_keys + + with instance_for_test() as instance: + with define_out_of_process_context(__file__, "get_repo", instance) as context: + # launchPartitionBackfill + all_backfills = [] + for _ in range(5): + launch_backfill_result = execute_dagster_graphql( + context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "partitionNames": ["a", "b"], + "assetSelection": [key.to_graphql_input() for key in all_asset_keys], + } + }, + ) + assert ( + "backfillId" in launch_backfill_result.data["launchPartitionBackfill"] + ), _get_error_message(launch_backfill_result) + + backfill_id = launch_backfill_result.data["launchPartitionBackfill"]["backfillId"] + backfill = instance.get_backfill(backfill_id) + all_backfills.append(backfill) + + # on PartitionBackfills + get_backfills_result = execute_dagster_graphql( + context, + BACKFILLS_WITH_FILTERS_QUERY, + variables={"filters": {"createdBefore": all_backfills[3].backfill_timestamp}}, + ) + assert not get_backfills_result.errors + assert get_backfills_result.data + backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"] + + for result in backfill_results: + assert result["timestamp"] < all_backfills[3].backfill_timestamp + + get_backfills_result = execute_dagster_graphql( + context, + BACKFILLS_WITH_FILTERS_QUERY, + variables={"filters": {"createdAfter": all_backfills[3].backfill_timestamp}}, + ) + assert not get_backfills_result.errors + assert get_backfills_result.data + backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"] + + for result in backfill_results: + assert result["timestamp"] > all_backfills[3].backfill_timestamp From 868bc519db2dd4d38e436bfe1e84b86eb810e1a9 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 19 Aug 2024 13:37:42 -0400 Subject: [PATCH 15/15] [INTERNAL_BRANCH=jamie/bulk-action-filter] fix input types for filter --- .../packages/ui-core/src/graphql/schema.graphql | 2 +- .../dagster-ui/packages/ui-core/src/graphql/types.ts | 7 +++++-- .../dagster-graphql/dagster_graphql/schema/inputs.py | 10 ++-------- .../graphql/test_asset_backfill.py | 7 ++++++- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index 21aef663b61ec..2736a25369d45 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -2397,7 +2397,7 @@ enum AssetEventType { } input BulkActionsFilter { - status: [BulkActionStatus!] + status: BulkActionStatus createdBefore: Float createdAfter: Float } diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 801c9323f11a3..f0c9c97d8b034 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -733,7 +733,7 @@ export enum BulkActionStatus { export type BulkActionsFilter = { createdAfter?: InputMaybe; createdBefore?: InputMaybe; - status?: InputMaybe>; + status?: InputMaybe; }; export type CancelBackfillResult = CancelBackfillSuccess | PythonError | UnauthorizedError; @@ -6984,7 +6984,10 @@ export const buildBulkActionsFilter = ( overrides && overrides.hasOwnProperty('createdAfter') ? overrides.createdAfter! : 6.09, createdBefore: overrides && overrides.hasOwnProperty('createdBefore') ? overrides.createdBefore! : 1.5, - status: overrides && overrides.hasOwnProperty('status') ? overrides.status! : [], + status: + overrides && overrides.hasOwnProperty('status') + ? overrides.status! + : BulkActionStatus.CANCELED, }; }; diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py b/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py index 7c6398de07a20..eb3834f91c9d2 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/inputs.py @@ -379,9 +379,7 @@ class Meta: class GrapheneBulkActionsFilter(graphene.InputObjectType): - status = graphene.List( - graphene.NonNull("dagster_graphql.schema.backfill.GrapheneBulkActionStatus") - ) + status = graphene.InputField("dagster_graphql.schema.backfill.GrapheneBulkActionStatus") createdBefore = graphene.InputField(graphene.Float) createdAfter = graphene.InputField(graphene.Float) @@ -390,11 +388,7 @@ class Meta: name = "BulkActionsFilter" def to_selector(self): - if self.status: - status = BulkActionStatus.from_graphql_input(self.status) - else: - status = None - + status = BulkActionStatus[self.status.value] if self.status else None created_before = datetime_from_timestamp(self.createdBefore) if self.createdBefore else None created_after = datetime_from_timestamp(self.createdAfter) if self.createdAfter else None diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py index aaf5014daf5ec..ad6591dc4361a 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py @@ -1176,7 +1176,6 @@ def test_get_backfills_with_filters(): backfill = instance.get_backfill(backfill_id) all_backfills.append(backfill) - # on PartitionBackfills get_backfills_result = execute_dagster_graphql( context, BACKFILLS_WITH_FILTERS_QUERY, @@ -1200,3 +1199,9 @@ def test_get_backfills_with_filters(): for result in backfill_results: assert result["timestamp"] > all_backfills[3].backfill_timestamp + + get_backfills_result = execute_dagster_graphql( + context, + BACKFILLS_WITH_FILTERS_QUERY, + variables={"filters": {"status": "REQUESTED"}}, + )