Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BulkActions filters in GQL layer #23682

Merged
merged 15 commits into from
Aug 22, 2024
23 changes: 15 additions & 8 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import TYPE_CHECKING, Optional

from dagster._core.execution.backfill import BulkActionStatus
from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus

if TYPE_CHECKING:
from dagster_graphql.schema.util import ResolveInfo
Expand All @@ -23,11 +23,12 @@ def get_backfills(
status: Optional[BulkActionStatus] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
filters: Optional[BulkActionsFilter] = None,
) -> "GraphenePartitionBackfills":
from ..schema.backfill import GraphenePartitionBackfill, GraphenePartitionBackfills

backfills = graphene_info.context.instance.get_backfills(
status=status, cursor=cursor, limit=limit
status=status, cursor=cursor, limit=limit, filters=filters
)
return GraphenePartitionBackfills(
results=[GraphenePartitionBackfill(backfill) for backfill in backfills]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from collections import defaultdict
from typing import (
TYPE_CHECKING,
Expand All @@ -18,13 +19,13 @@
)
from dagster._core.definitions.selector import JobSubsetSelector
from dagster._core.errors import DagsterInvariantViolationError, DagsterRunNotFoundError
from dagster._core.execution.backfill import PartitionBackfill
from dagster._core.execution.backfill import BulkActionsFilter
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter
from dagster._core.storage.event_log.base import AssetRecord
from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_tag_type
from dagster._record import record
from dagster._time import datetime_from_timestamp, get_current_timestamp
from dagster._time import datetime_from_timestamp

from .external import ensure_valid_config, get_external_job_or_raise

Expand Down Expand Up @@ -436,14 +437,10 @@ def _fetch_runs_not_in_backfill(
instance: DagsterInstance,
cursor: Optional[str],
limit: int,
created_before: Optional[float],
created_before: Optional[datetime.datetime],
) -> Sequence[RunRecord]:
"""Fetches limit RunRecords that are not part of a backfill and were created before a given timestamp."""
runs_filter = (
RunsFilter(created_before=datetime_from_timestamp(created_before))
if created_before
else None
)
runs_filter = RunsFilter(created_before=created_before) if created_before else None

runs = []
while len(runs) < limit:
Expand All @@ -458,38 +455,6 @@ def _fetch_runs_not_in_backfill(
return runs[:limit]


def _fetch_backfills_created_before_timestamp(
instance: DagsterInstance,
cursor: Optional[str],
limit: int,
created_before: Optional[float] = None,
) -> Sequence[PartitionBackfill]:
"""Fetches limit PartitionBackfills that were created before a given timestamp.

Note: This is a reasonable filter to add to the get_backfills instance method. However, we should have a
more generalized way of adding filters than adding new parameters to get_backfills. So for now, doing this
in a separate function.
"""
created_before = created_before if created_before else get_current_timestamp()
backfills = []
while len(backfills) < limit:
# fetch backfills in a loop discarding backfills that were created after created_before until
# we have limit backfills to return or have reached the end of the backfills table
new_backfills = instance.get_backfills(cursor=cursor, limit=limit)
if len(new_backfills) == 0:
return backfills
cursor = new_backfills[-1].backfill_id
backfills.extend(
[
backfill
for backfill in new_backfills
if backfill.backfill_timestamp <= created_before
]
)

return backfills[:limit]


def get_runs_feed_entries(
graphene_info: "ResolveInfo",
limit: int,
Expand Down Expand Up @@ -518,13 +483,15 @@ def get_runs_feed_entries(
fetch_limit = limit + 1
# filter out any backfills/runs that are newer than the cursor timestamp. See RunsFeedCursor docstring
# for case when theis is necessary
created_before_cursor = (
datetime_from_timestamp(runs_feed_cursor.timestamp) if runs_feed_cursor.timestamp else None
)
backfills = [
GraphenePartitionBackfill(backfill)
for backfill in _fetch_backfills_created_before_timestamp(
instance,
for backfill in instance.get_backfills(
cursor=runs_feed_cursor.backfill_cursor,
limit=fetch_limit,
created_before=runs_feed_cursor.timestamp,
limit=limit,
filters=BulkActionsFilter(created_before=created_before_cursor),
)
]
runs = [
Expand All @@ -533,7 +500,7 @@ def get_runs_feed_entries(
instance,
cursor=runs_feed_cursor.run_cursor,
limit=fetch_limit,
created_before=runs_feed_cursor.timestamp,
created_before=created_before_cursor,
)
]

Expand Down
23 changes: 23 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/schema/inputs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import graphene
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.events import DagsterEventType
from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus
from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter
from dagster._time import datetime_from_timestamp
from dagster._utils import check
Expand Down Expand Up @@ -377,6 +378,27 @@ class Meta:
name = "TagInput"


class GrapheneBulkActionsFilter(graphene.InputObjectType):
status = graphene.InputField("dagster_graphql.schema.backfill.GrapheneBulkActionStatus")
createdBefore = graphene.InputField(graphene.Float)
createdAfter = graphene.InputField(graphene.Float)

class Meta:
description = """This type represents a filter on Dagster Bulk Actions (backfills)."""
name = "BulkActionsFilter"

def to_selector(self):
status = BulkActionStatus[self.status.value] if self.status else None
created_before = datetime_from_timestamp(self.createdBefore) if self.createdBefore else None
created_after = datetime_from_timestamp(self.createdAfter) if self.createdAfter else None

return BulkActionsFilter(
status=status,
created_before=created_before,
created_after=created_after,
)


types = [
GrapheneAssetKeyInput,
GrapheneExecutionMetadata,
Expand All @@ -398,4 +420,5 @@ class Meta:
GrapheneStepOutputHandle,
GrapheneTagInput,
GrapheneReportRunlessAssetEventsParams,
GrapheneBulkActionsFilter,
]
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
GrapheneAssetBackfillPreviewParams,
GrapheneAssetGroupSelector,
GrapheneAssetKeyInput,
GrapheneBulkActionsFilter,
GrapheneGraphSelector,
GrapheneInstigationSelector,
GraphenePipelineSelector,
Expand Down Expand Up @@ -474,6 +475,7 @@ class Meta:
status=graphene.Argument(GrapheneBulkActionStatus),
cursor=graphene.String(),
limit=graphene.Int(),
filters=graphene.Argument(GrapheneBulkActionsFilter),
description="Retrieve backfills after applying a status filter, cursor, and limit.",
)

Expand Down Expand Up @@ -1105,12 +1107,14 @@ def resolve_partitionBackfillsOrError(
status: Optional[GrapheneBulkActionStatus] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
filters: Optional[GrapheneBulkActionsFilter] = None,
):
return get_backfills(
graphene_info,
status=BulkActionStatus.from_graphql_input(status) if status else None,
cursor=cursor,
limit=limit,
filters=filters.to_selector() if filters else None,
)

def resolve_assetBackfillPreview(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,20 @@
}
"""

BACKFILLS_WITH_FILTERS_QUERY = """
query BackfillsWithFiltersQuery($filters: BulkActionsFilter) {
partitionBackfillsOrError(filters: $filters) {
... on PartitionBackfills {
results {
id
timestamp
status
}
}
}
}
"""


def get_repo() -> RepositoryDefinition:
partitions_def = StaticPartitionsDefinition(["a", "b", "c"])
Expand Down Expand Up @@ -1056,7 +1070,9 @@ def test_asset_backfill_error_raised_upon_invalid_params_provided():


def _get_backfill_data(
launch_backfill_result: GqlResult, instance: DagsterInstance, repo
launch_backfill_result: GqlResult,
instance: DagsterInstance,
repo,
) -> Tuple[str, AssetBackfillData]:
assert launch_backfill_result
assert launch_backfill_result.data
Expand Down Expand Up @@ -1131,3 +1147,61 @@ def override_backfill_storage_setting(self):
)

assert len(backfill_logs.data["partitionBackfillOrError"]["logEvents"]["events"]) > 0


def test_get_backfills_with_filters():
repo = get_repo()
all_asset_keys = repo.asset_graph.materializable_asset_keys

with instance_for_test() as instance:
with define_out_of_process_context(__file__, "get_repo", instance) as context:
# launchPartitionBackfill
all_backfills = []
for _ in range(5):
launch_backfill_result = execute_dagster_graphql(
context,
LAUNCH_PARTITION_BACKFILL_MUTATION,
variables={
"backfillParams": {
"partitionNames": ["a", "b"],
"assetSelection": [key.to_graphql_input() for key in all_asset_keys],
}
},
)
assert (
"backfillId" in launch_backfill_result.data["launchPartitionBackfill"]
), _get_error_message(launch_backfill_result)

backfill_id = launch_backfill_result.data["launchPartitionBackfill"]["backfillId"]
backfill = instance.get_backfill(backfill_id)
all_backfills.append(backfill)

get_backfills_result = execute_dagster_graphql(
context,
BACKFILLS_WITH_FILTERS_QUERY,
variables={"filters": {"createdBefore": all_backfills[3].backfill_timestamp}},
)
assert not get_backfills_result.errors
assert get_backfills_result.data
backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"]

for result in backfill_results:
assert result["timestamp"] < all_backfills[3].backfill_timestamp

get_backfills_result = execute_dagster_graphql(
context,
BACKFILLS_WITH_FILTERS_QUERY,
variables={"filters": {"createdAfter": all_backfills[3].backfill_timestamp}},
)
assert not get_backfills_result.errors
assert get_backfills_result.data
backfill_results = get_backfills_result.data["partitionBackfillsOrError"]["results"]

for result in backfill_results:
assert result["timestamp"] > all_backfills[3].backfill_timestamp

get_backfills_result = execute_dagster_graphql(
context,
BACKFILLS_WITH_FILTERS_QUERY,
variables={"filters": {"status": "REQUESTED"}},
)
Loading