Skip to content

Commit

Permalink
test passing
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Aug 6, 2024
1 parent 3470e5b commit 28b99d0
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ def get_mega_runs(
Cursor format: run_id;backfill_id
"""
from ..schema.mega_run import GrapheneMegaRun
from ..schema.backfill import GraphenePartitionBackfill
from ..schema.pipelines.pipeline import GrapheneRun

check.opt_str_param(cursor, "cursor")
check.opt_int_param(limit, "limit")
Expand All @@ -423,16 +424,19 @@ def get_mega_runs(
backfills = instance.get_backfills(cursor=backfill_cursor, limit=limit)
runs = _fetch_runs_not_in_backfill(instance, cursor=run_cursor, limit=limit)

all_mega_runs = [GraphenePartitionBackfill(backfill) for backfill in backfills] + [
GrapheneRun(run) for run in runs
]

# order runs and backfills by create_time. typically we sort by storage id but that won't work here since
# they are different tables
all_mega_runs = sorted(
backfills + runs,
key=lambda x: x.create_timestamp.timestamp()
if isinstance(x, RunRecord)
else x.backfill_timestamp,
all_mega_runs,
key=lambda x: x.resolve_creationTime(graphene_info), # ideally could just do .creationTime
reverse=True,
)

if limit:
all_mega_runs = all_mega_runs[:limit]
return all_mega_runs[:limit]

return [GrapheneMegaRun(record) for record in all_mega_runs]
return all_mega_runs
21 changes: 13 additions & 8 deletions python_modules/dagster-graphql/dagster_graphql/schema/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,18 @@ class Meta:
name = "BulkActionStatus"

def to_dagster_run_status(self) -> GrapheneRunStatus:
if self == GrapheneBulkActionStatus.REQUESTED:
"""Placeholder for this PR. Will do a more thurough pass to accurately convert backfill status
to DagsterRunStatus in a stacked branch.
"""
if self.args[0] == GrapheneBulkActionStatus.REQUESTED.value:
return GrapheneRunStatus.STARTED
if self == GrapheneBulkActionStatus.COMPLETED:
if self.args[0] == GrapheneBulkActionStatus.COMPLETED.value:
return GrapheneRunStatus.SUCCESS
if self == GrapheneBulkActionStatus.FAILED:
if self.args[0] == GrapheneBulkActionStatus.FAILED.value:
return GrapheneRunStatus.FAILURE
if self == GrapheneBulkActionStatus.CANCELED:
if self.args[0] == GrapheneBulkActionStatus.CANCELED.value:
return GrapheneRunStatus.CANCELED
if self == GrapheneBulkActionStatus.CANCELING:
return GrapheneRunStatus.CANCELING
return GrapheneRunStatus.CANCELING


class GrapheneAssetBackfillTargetPartitions(graphene.ObjectType):
Expand Down Expand Up @@ -371,7 +373,7 @@ def __init__(self, backfill_job: PartitionBackfill):
fromFailure=bool(backfill_job.from_failure),
reexecutionSteps=backfill_job.reexecution_steps,
timestamp=backfill_job.backfill_timestamp,
creationTime=backfill_job.backfill_timestamp,
# creationTime=backfill_job.backfill_timestamp,
startTime=backfill_job.backfill_timestamp,
assetSelection=backfill_job.asset_selection,
)
Expand Down Expand Up @@ -459,6 +461,9 @@ def _get_partition_run_data_for_ranged_job_backfill(
)
]

def resolve_creationTime(self, graphene_info: ResolveInfo):
return self.timestamp

def resolve_unfinishedRuns(self, graphene_info: ResolveInfo) -> Sequence["GrapheneRun"]:
from .pipelines.pipeline import GrapheneRun

Expand Down Expand Up @@ -487,7 +492,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo):
]

def resolve_runStatus(self, _graphene_info: ResolveInfo) -> GrapheneRunStatus:
return self.status.to_dagster_run_status()
return GrapheneBulkActionStatus(self.status).to_dagster_run_status()

def resolve_endTimestamp(self, graphene_info: ResolveInfo) -> Optional[float]:
if self._backfill_job.status == BulkActionStatus.REQUESTED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
1 run and a bunch of backfills and vis versa.
"""

import time

from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.storage.dagster_run import DagsterRun
from dagster._core.test_utils import create_run_for_test, freeze_time
from dagster._core.test_utils import create_run_for_test
from dagster._core.utils import make_new_backfill_id
from dagster._time import add_absolute_time, create_datetime
from dagster._time import get_current_timestamp
from dagster_graphql.test.utils import execute_dagster_graphql

from dagster_graphql_tests.graphql.graphql_context_test_suite import (
Expand All @@ -30,7 +32,7 @@
... on MegaRuns {
results {
runId
status
runStatus
creationTime
startTime
endTime
Expand Down Expand Up @@ -72,14 +74,14 @@ def _create_run_for_backfill(graphql_context, backfill_id: str) -> DagsterRun:
)


def _create_backfill(graphql_context, timestamp: float) -> str:
def _create_backfill(graphql_context) -> str:
backfill = PartitionBackfill(
backfill_id=make_new_backfill_id(),
serialized_asset_backfill_data="foo", # the content of the backfill doesn't matter for testing fetching mega runs
status=BulkActionStatus.COMPLETED,
reexecution_steps=None,
tags=None,
backfill_timestamp=timestamp, # truncate to an integer to make the ordering deterministic since the runs db is in ints
backfill_timestamp=get_current_timestamp(), # truncate to an integer to make the ordering deterministic since the runs db is in ints
from_failure=False,
)
graphql_context.instance.add_backfill(backfill)
Expand All @@ -89,32 +91,26 @@ def _create_backfill(graphql_context, timestamp: float) -> str:
class TestMegaRuns(ExecutingGraphQLContextTestMatrix):
def test_get_mega_runs(self, graphql_context):
# seed some runs and backfills in an alternating order
expected_order = []
start_datetime = create_datetime(year=2022, month=1, day=1, hour=1)
for i in range(10):
with freeze_time(start_datetime):
run_id = _create_run(graphql_context).run_id
expected_order.append(run_id)
backfill_id = _create_backfill(
graphql_context, timestamp=start_datetime.timestamp() + 2
)
expected_order.append(backfill_id)

start_datetime = add_absolute_time(start_datetime, seconds=10)
for _ in range(10):
_create_run(graphql_context)
time.sleep(1)
_create_backfill(graphql_context)

result = execute_dagster_graphql(
graphql_context,
GET_MEGA_RUNS_QUERY,
variables={
"limit": 20,
"limit": 10,
"cursor": None,
},
)

assert not result.errors
assert result.data

assert result.data["megaRunsOrError"]["__typename"] == "LaunchBackfillSuccess"
assert len(result.data["megaRunsOrError"]["results"]) == 10
prev_run_time = None
for res in result.data["megaRunsOrError"]["results"]:
assert res["runId"] == expected_order.pop(-1)
if prev_run_time:
assert res["creationTime"] <= prev_run_time
prev_run_time = res["creationTime"]

0 comments on commit 28b99d0

Please sign in to comment.