Skip to content

Commit

Permalink
pr comments, enforce limit
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Aug 9, 2024
1 parent 08fbc1a commit 831dfa0
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -435,21 +435,20 @@ def from_string(serialized: Optional[str]):
def _fetch_runs_not_in_backfill(
instance: DagsterInstance,
cursor: Optional[str],
limit: Optional[int],
created_before: Optional[float] = None,
limit: int,
created_before: Optional[float],
) -> Sequence[RunRecord]:
"""Fetches limit RunRecords that are not part of a backfill and were created before a given timestamp."""
runs_filter = (
RunsFilter(created_before=datetime_from_timestamp(created_before))
if created_before
else None
)
if limit is None:
new_runs = instance.get_run_records(cursor=cursor, filters=runs_filter)
return [run for run in new_runs if run.dagster_run.tags.get(BACKFILL_ID_TAG) is None]

runs = []
while len(runs) < limit:
# fetch runs in a loop and discard runs that are part of a backfill until we have
# limit runs to return or have reached the end of the runs table
new_runs = instance.get_run_records(limit=limit, cursor=cursor, filters=runs_filter)
if len(new_runs) == 0:
return runs
Expand All @@ -462,7 +461,7 @@ def _fetch_runs_not_in_backfill(
def _fetch_backfills_created_before_timestamp(
instance: DagsterInstance,
cursor: Optional[str],
limit: Optional[int],
limit: int,
created_before: Optional[float] = None,
) -> Sequence[PartitionBackfill]:
"""Fetches limit PartitionBackfills that were created before a given timestamp.
Expand All @@ -472,14 +471,10 @@ def _fetch_backfills_created_before_timestamp(
in a separate function.
"""
created_before = created_before if created_before else get_current_timestamp()
if limit is None:
new_backfills = instance.get_backfills(cursor=cursor, limit=limit)
return [
backfill for backfill in new_backfills if backfill.backfill_timestamp <= created_before
]

backfills = []
while len(backfills) < limit:
# fetch backfills in a loop discarding backfills that were created after created_before until
# we have limit backfills to return or have reached the end of the backfills table
new_backfills = instance.get_backfills(cursor=cursor, limit=limit)
if len(new_backfills) == 0:
return backfills
Expand All @@ -497,30 +492,30 @@ def _fetch_backfills_created_before_timestamp(

def get_runs_feed_entries(
graphene_info: "ResolveInfo",
limit: int,
cursor: Optional[str] = None,
limit: Optional[int] = None,
) -> "GrapheneRunsFeedConnection":
"""Returns a GrapheneRunsFeedConnection, which contains a merged list of backfills and
single runs (runs that are not part of a backfill), the cursor to fetch the next page,
and a boolean indicating if there are more results to fetch.
Args:
limit (int): max number of results to return
cursor (Optional[str]): String that can be deserialized into a RunsFeedCursor. If None, indicates
that querying should start at the beginning of the table for both runs and backfills.
limit (Optional[int]): max number of results to return. If None, will return all results.
"""
from ..schema.backfill import GraphenePartitionBackfill
from ..schema.pipelines.pipeline import GrapheneRun
from ..schema.runs_feed import GrapheneRunsFeedConnection

check.opt_str_param(cursor, "cursor")
check.opt_int_param(limit, "limit")
check.int_param(limit, "limit")

instance = graphene_info.context.instance
runs_feed_cursor = RunsFeedCursor.from_string(cursor)

# if using limit, fetch limit+1 of each type to know if there are more than limit remaining
fetch_limit = limit + 1 if limit else None
fetch_limit = limit + 1
# filter out any backfills/runs that are newer than the cursor timestamp. See RunsFeedCursor docstring
# for case when theis is necessary
backfills = [
Expand All @@ -542,35 +537,31 @@ def get_runs_feed_entries(
)
]

if (
fetch_limit and limit
): # if fetch_limit is non-None then limit must also be non-None, but check for pyright
# if we fetched limit+1 of either runs or backfills, we know there must be more results
# to fetch on the next call since we will return limit results for this call. Additionally,
# if we fetched more than limit of runs and backfill combined, we know there are more results
has_more = (
len(backfills) == fetch_limit
or len(runs) == fetch_limit
or len(backfills) + len(runs) > limit
)
else:
# case when limit was not passed, so we fetched all of the results
has_more = False
# if we fetched limit+1 of either runs or backfills, we know there must be more results
# to fetch on the next call since we will return limit results for this call. Additionally,
# if we fetched more than limit of runs and backfill combined, we know there are more results
has_more = (
len(backfills) == fetch_limit
or len(runs) == fetch_limit
or len(backfills) + len(runs) > limit
)

all_runs = backfills + runs
all_entries = backfills + runs

# order runs and backfills by create_time. typically we sort by storage id but that won't work here since
# they are different tables
all_runs = sorted(
all_runs,
key=lambda x: x.resolve_creationTime(graphene_info), # ideally could just do .creationTime
all_entries = sorted(
all_entries,
key=lambda x: x.resolve_creationTimestamp(
graphene_info
), # ideally could just do .creationTime
reverse=True,
)

if limit:
to_return = all_runs[:limit]
to_return = all_entries[:limit]
else:
to_return = all_runs
to_return = all_entries

new_run_cursor = None
new_backfill_cursor = None
Expand All @@ -582,7 +573,7 @@ def get_runs_feed_entries(
if new_run_cursor is None and isinstance(run, GrapheneRun):
new_run_cursor = run.runId

new_timestamp = to_return[-1].resolve_creationTime(graphene_info) if to_return else None
new_timestamp = to_return[-1].resolve_creationTimestamp(graphene_info) if to_return else None

# if either of the new cursors are None, replace with the cursor passed in so the next call doesn't
# restart at the top the table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
create_execution_params_error_types,
)
from .pipelines.config import GrapheneRunConfigValidationInvalid
from .runs_feed import GrapheneRunsFeedEntry, GrapheneRunsFeedEntryType
from .runs_feed import GrapheneRunsFeedEntry
from .util import ResolveInfo, non_null_list

if TYPE_CHECKING:
Expand Down Expand Up @@ -311,7 +311,7 @@ class Meta:
assetSelection = graphene.List(graphene.NonNull(GrapheneAssetKey))
partitionSetName = graphene.Field(graphene.String)
timestamp = graphene.NonNull(graphene.Float)
creationTime = graphene.NonNull(
creationTimestamp = graphene.NonNull(
graphene.Float
) # for RunsFeedEntry interface - dupe of timestamp
startTime = graphene.Float() # for RunsFeedEntry interface - dupe of timestamp
Expand Down Expand Up @@ -357,7 +357,6 @@ class Meta:
assetCheckSelection = graphene.List(
graphene.NonNull("dagster_graphql.schema.asset_checks.GrapheneAssetCheckHandle")
)
runType = graphene.NonNull(GrapheneRunsFeedEntryType)

def __init__(self, backfill_job: PartitionBackfill):
self._backfill_job = check.inst_param(backfill_job, "backfill_job", PartitionBackfill)
Expand All @@ -376,7 +375,6 @@ def __init__(self, backfill_job: PartitionBackfill):
timestamp=backfill_job.backfill_timestamp,
startTime=backfill_job.backfill_timestamp,
assetSelection=backfill_job.asset_selection,
runType=GrapheneRunsFeedEntryType.BACKFILL,
assetCheckSelection=[],
)

Expand Down Expand Up @@ -463,10 +461,10 @@ def _get_partition_run_data_for_ranged_job_backfill(
)
]

def resolve_creationTime(self, graphene_info: ResolveInfo):
# Needed to have this as a resolver, rather than pass on __init__ because creationTime is
def resolve_creationTimestamp(self, graphene_info: ResolveInfo):
# Needed to have this as a resolver, rather than pass on __init__ because creationTimestamp is
# fulfilled via a resolver for GrapheneRun and we need to use the same method of getting
# creationTime in get_runs_feed_entries
# creationTimestamp in get_runs_feed_entries
return self.timestamp

def resolve_unfinishedRuns(self, graphene_info: ResolveInfo) -> Sequence["GrapheneRun"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
)
from ..repository_origin import GrapheneRepositoryOrigin
from ..runs import GrapheneRunConfigData
from ..runs_feed import GrapheneRunsFeedEntry, GrapheneRunsFeedEntryType
from ..runs_feed import GrapheneRunsFeedEntry
from ..schedules.schedules import GrapheneSchedule
from ..sensors import GrapheneSensor
from ..solids import (
Expand Down Expand Up @@ -355,6 +355,9 @@ class GrapheneRun(graphene.ObjectType):
limit=graphene.Argument(graphene.Int),
)
creationTime = graphene.NonNull(graphene.Float)
creationTimestamp = graphene.NonNull(
graphene.Float
) # for RunsFeedEntry interface - dupe of creationTime
startTime = graphene.Float()
endTime = graphene.Float()
updateTime = graphene.Float()
Expand All @@ -364,7 +367,6 @@ class GrapheneRun(graphene.ObjectType):
hasConcurrencyKeySlots = graphene.NonNull(graphene.Boolean)
rootConcurrencyKeys = graphene.List(graphene.NonNull(graphene.String))
hasUnconstrainedRootNodes = graphene.NonNull(graphene.Boolean)
runType = graphene.NonNull(GrapheneRunsFeedEntryType)

class Meta:
interfaces = (GraphenePipelineRun, GrapheneRunsFeedEntry)
Expand All @@ -378,7 +380,6 @@ def __init__(self, record: RunRecord):
status=dagster_run.status.value,
runStatus=dagster_run.status.value,
mode=DEFAULT_MODE_NAME,
runType=GrapheneRunsFeedEntryType.RUN,
)
self.dagster_run = dagster_run
self._run_record = record
Expand Down Expand Up @@ -573,6 +574,9 @@ def resolve_updateTime(self, graphene_info: ResolveInfo):
def resolve_creationTime(self, graphene_info: ResolveInfo):
return self._run_record.create_timestamp.timestamp()

def resolve_creationTimestamp(self, graphene_info: ResolveInfo):
return self.resolve_creationTime(graphene_info)

def resolve_hasConcurrencyKeySlots(self, graphene_info: ResolveInfo):
instance = graphene_info.context.instance
if not instance.event_log_storage.supports_global_concurrency_limits:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,8 @@ class Meta:
)
runsFeedOrError = graphene.Field(
graphene.NonNull(GrapheneRunsFeedConnectionOrError),
limit=graphene.NonNull(graphene.Int),
cursor=graphene.String(),
limit=graphene.Int(),
description="Retrieve entries for the Runs Feed after applying cursor and limit.",
)
runTagKeysOrError = graphene.Field(
Expand Down Expand Up @@ -811,7 +811,10 @@ async def resolve_runOrError(self, graphene_info: ResolveInfo, runId):
return await gen_run_by_id(graphene_info, runId)

def resolve_runsFeedOrError(
self, graphene_info: ResolveInfo, cursor: Optional[str] = None, limit: Optional[int] = None
self,
graphene_info: ResolveInfo,
limit: int,
cursor: Optional[str] = None,
):
return get_runs_feed_entries(graphene_info=graphene_info, cursor=cursor, limit=limit)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,10 @@
from .util import non_null_list


class GrapheneRunsFeedEntryType(graphene.Enum):
BACKFILL = "BACKFILL"
RUN = "RUN"

class Meta:
name = "RunsFeedType"


class GrapheneRunsFeedEntry(graphene.Interface):
runId = graphene.NonNull(graphene.String)
runStatus = graphene.Field("dagster_graphql.schema.pipelines.pipeline.GrapheneRunStatus")
creationTime = graphene.NonNull(graphene.Float)
creationTimestamp = graphene.NonNull(graphene.Float)
startTime = graphene.Float()
endTime = graphene.Float()
tags = non_null_list("dagster_graphql.schema.tags.GraphenePipelineTag")
Expand All @@ -25,7 +17,6 @@ class GrapheneRunsFeedEntry(graphene.Interface):
assetCheckSelection = graphene.List(
graphene.NonNull("dagster_graphql.schema.asset_checks.GrapheneAssetCheckHandle")
)
runType = graphene.NonNull(GrapheneRunsFeedEntryType)

class Meta:
name = "RunsFeedEntry"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
runsFeedOrError(cursor: $cursor, limit: $limit) {
... on RunsFeedConnection {
results {
__typename
runId
runStatus
creationTime
Expand All @@ -33,7 +34,6 @@
key
value
}
runType
}
cursor
hasMore
Expand Down Expand Up @@ -153,7 +153,7 @@ def test_get_runs_feed_ignores_backfill_runs(self, graphql_context):
if prev_run_time:
assert res["creationTime"] <= prev_run_time
prev_run_time = res["creationTime"]
assert res["runType"] == "BACKFILL"
assert res["__typename"] == "GraphenePartitionBackfill"

assert not result.data["runsFeedOrError"]["hasMore"]

Expand Down Expand Up @@ -299,7 +299,7 @@ def test_get_runs_feed_one_backfill_long_ago(self, graphql_context):
prev_run_time = res["creationTime"]

# first 10 results should all be runs
assert res["runType"] == "RUN"
assert res["__typename"] == "GrapheneRun"

assert result.data["runsFeedOrError"]["hasMore"]
assert result.data["runsFeedOrError"]["cursor"] is not None
Expand Down Expand Up @@ -380,7 +380,7 @@ def test_get_runs_feed_one_new_backfill(self, graphql_context):
prev_run_time = res["creationTime"]

# all remaining results should be runs
assert res["runType"] == "RUN"
assert res["__typename"] == "GrapheneRun"

assert not result.data["runsFeedOrError"]["hasMore"]
assert result.data["runsFeedOrError"]["cursor"] is not None
Expand Down Expand Up @@ -441,7 +441,7 @@ def test_get_runs_feed_backfill_created_between_calls(self, graphql_context):
assert res["creationTime"] <= prev_run_time
prev_run_time = res["creationTime"]

assert res["runType"] == "RUN"
assert res["__typename"] == "GrapheneRun"

assert not result.data["runsFeedOrError"]["hasMore"]
assert result.data["runsFeedOrError"]["cursor"] is not None
Expand Down

0 comments on commit 831dfa0

Please sign in to comment.