Skip to content

Commit

Permalink
sort that should account for spicy window
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Aug 5, 2024
1 parent bee36ec commit dbd5e92
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from dagster._core.definitions.selector import JobSubsetSelector
from dagster._core.errors import DagsterRunNotFoundError
from dagster._core.execution.backfill import PartitionBackfill
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter
from dagster._core.storage.event_log.base import AssetRecord
Expand Down Expand Up @@ -399,6 +400,52 @@ def _fetch_runs_not_in_backfill(
return runs[:limit]


def _spicy_window_safe_sort(
runs: Sequence[RunRecord], backfills: Sequence[PartitionBackfill], limit: Optional[int]
) -> Sequence[Union[RunRecord, PartitionBackfill]]:
"""Merges a list of runs and backfills into a single list ordered by creation time, but maintains
the relative order of each list to account for spicy window shenanigans.
For example, if two runs are created at t1 and t2, but the spicy window stores the t2 run before the
t1 run, we need to ensure that the order of those runs relative to each other is maintianed.
DB ID run time
n run2 t2
n+1 run1 t1
if we sorted just by creation time, run1 would come before run2. If run1 was the final run returned, the
user would pass run1 as the cursor for the next query, which means run2 would never get returned.
Instead we can do a special sort that ensures that run2 is always returned before run1 so that
all runs are returned to the user.
"""
runs_idx = 0
backfills_idx = 0
sorted_runs = []

for _ in range(limit or len(runs) + len(backfills)):
if (
runs[runs_idx].create_timestamp.timestamp()
> backfills[backfills_idx].backfill_timestamp
):
sorted_runs.append(runs[runs_idx])
runs_idx += 1
else:
sorted_runs.append(backfills[backfills_idx])
backfills_idx += 1

if limit and len(sorted_runs) == limit:
return sorted_runs
if runs_idx == len(runs):
sorted_runs.extend(backfills[backfills_idx:])
return sorted_runs[:limit]
if backfills_idx == len(backfills):
sorted_runs.extend(runs[runs_idx:])
return sorted_runs[:limit]

return sorted_runs


def get_mega_runs(
graphene_info: "ResolveInfo",
cursor: Optional[str] = None,
Expand All @@ -424,13 +471,14 @@ def get_mega_runs(

# order runs and backfills by create_time. typically we sort by storage id but that won't work here since
# they are different tables
all_mega_runs = sorted(
backfills + runs,
key=lambda x: x.create_timestamp.timestamp()
if isinstance(x, RunRecord)
else x.backfill_timestamp,
reverse=True,
)
# all_mega_runs = sorted(
# backfills + runs,
# key=lambda x: x.create_timestamp.timestamp()
# if isinstance(x, RunRecord)
# else x.backfill_timestamp,
# reverse=True,
# )
all_mega_runs = _spicy_window_safe_sort(runs=runs, backfills=backfills, limit=limit)
if limit:
all_mega_runs = all_mega_runs[:limit]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,48 +657,49 @@ def __init__(self, record: Union[RunRecord, PartitionBackfill]):
else GrapheneRunType.BACKFILL,
)

# TODO - return types
def resolve_runId(self, _graphene_info: ResolveInfo):
def resolve_runId(self, _graphene_info: ResolveInfo) -> str:
if self.runType == GrapheneRunType.RUN:
return self.singleRun.runId
return self.backfill.id

def resolve_status(self, _graphene_info: ResolveInfo):
def resolve_status(self, _graphene_info: ResolveInfo) -> GrapheneRunStatus:
if self.runType == GrapheneRunType.RUN:
return self.singleRun.status
return GrapheneBulkActionStatus(self.backfill.status).to_dagster_run_status()

def resolve_creationTime(self, graphene_info: ResolveInfo):
def resolve_creationTime(self, graphene_info: ResolveInfo) -> float:
if self.runType == GrapheneRunType.RUN:
return self.singleRun.resolve_creationTime(graphene_info)
return self.backfill.timestamp

def resolve_startTime(self, graphene_info: ResolveInfo):
def resolve_startTime(self, graphene_info: ResolveInfo) -> float:
if self.runType == GrapheneRunType.RUN:
return self.singleRun.resolve_startTime(graphene_info)
return self.backfill.timestamp

def resolve_endTime(self, graphene_info: ResolveInfo):
def resolve_endTime(self, graphene_info: ResolveInfo) -> float:
if self.runType == GrapheneRunType.RUN:
return self.singleRun.resolve_endTime(graphene_info)
return self.backfill.resolve_endTimestamp(graphene_info)

def resolve_tags(self, graphene_info: ResolveInfo):
def resolve_tags(self, graphene_info: ResolveInfo) -> Sequence[GraphenePipelineTag]:
if self.runType == GrapheneRunType.RUN:
return self.singleRun.resolve_tags(graphene_info)
return self.backfill.resolve_tags(graphene_info)

def resolve_jobName(self, graphene_info: ResolveInfo):
def resolve_jobName(self, graphene_info: ResolveInfo) -> str:
if self.runType == GrapheneRunType.RUN:
return self.singleRun.resolve_jobName(graphene_info)
return self.backfill.partitionSetName

def resolve_assetSelection(self, graphene_info: ResolveInfo):
def resolve_assetSelection(self, graphene_info: ResolveInfo) -> Sequence[GrapheneAssetKey]:
if self.runType == GrapheneRunType.RUN:
return self.singleRun.resolve_assetSelection(graphene_info)
return self.backfill.assetSelection

def resolve_assetCheckSelection(self, graphene_info: ResolveInfo):
def resolve_assetCheckSelection(
self, graphene_info: ResolveInfo
) -> Sequence[GrapheneAssetCheckHandle]:
if self.runType == GrapheneRunType.RUN:
return self.singleRun.resolve_assetCheckSelection(graphene_info)
return []
Expand Down
15 changes: 0 additions & 15 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
from dagster._core.execution.bulk_actions import BulkActionType
from dagster._core.instance import DynamicPartitionsStore
from dagster._core.remote_representation.origin import RemotePartitionSetOrigin
from dagster._core.storage.dagster_run import MegaRun
from dagster._core.storage.tags import USER_TAG
from dagster._core.workspace.workspace import IWorkspace
from dagster._serdes import whitelist_for_serdes
from dagster._time import datetime_from_timestamp
from dagster._utils.error import SerializableErrorInfo

from ..definitions.selector import PartitionsByAssetSelector
Expand Down Expand Up @@ -455,16 +453,3 @@ def from_asset_graph_subset(
title=title,
description=description,
)

def to_mega_run(self) -> MegaRun:
# TODO - we don't have the job name on the backfill object, just the partition set?
target = self.asset_selection if self.asset_selection else self.partition_set_name
return MegaRun(
run_id=self.backfill_id,
status=self.status.to_dagster_run_status(),
create_timestamp=datetime_from_timestamp(self.backfill_timestamp),
start_time=self.backfill_timestamp,
end_time=None,
tags=self.tags,
target=target,
)
30 changes: 0 additions & 30 deletions python_modules/dagster/dagster/_core/storage/dagster_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from dagster._core.origin import JobPythonOrigin
from dagster._core.storage.tags import PARENT_RUN_ID_TAG, ROOT_RUN_ID_TAG
from dagster._core.utils import make_new_run_id
from dagster._record import record
from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes

from .tags import (
Expand Down Expand Up @@ -190,17 +189,6 @@ def __new__(
)


@record
class MegaRun:
run_id: str
status: DagsterRunStatus
create_timestamp: datetime
start_time: Optional[float]
end_time: Optional[float]
target: Union[str, AbstractSet[Union[AssetKey, AssetCheckKey]]]
tags: Mapping[str, str]


class DagsterRunSerializer(NamedTupleSerializer["DagsterRun"]):
# serdes log
# * removed reexecution_config - serdes logic expected to strip unknown keys so no need to preserve
Expand Down Expand Up @@ -662,24 +650,6 @@ async def _batch_load(

return result_map.values()

def to_mega_run(self) -> MegaRun:
from dagster._core.definitions.asset_job import is_base_asset_job_name

target = (
self.dagster_run.job_name
if not is_base_asset_job_name(self.dagster_run.job_name)
else self.dagster_job.asset_selection + self.dagster_job.asset_check_selection
)
return MegaRun(
run_id=self.dagster_run.run_id,
status=self.dagster_run.status,
create_timestamp=self.create_timestamp,
start_time=self.start_time,
end_time=self.end_time,
target=target,
tags=self.dagster_run.tags,
)


@whitelist_for_serdes
class RunPartitionData(
Expand Down

0 comments on commit dbd5e92

Please sign in to comment.