-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Graphene layer for serving the Runs Feed #23375
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @jamiedemaria and the rest of your teammates on Graphite |
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit 0517c1a. |
0c1dd35
to
dbd5e92
Compare
|
||
# order runs and backfills by create_time. typically we sort by storage id but that won't work here since | ||
# they are different tables | ||
all_mega_runs = sorted( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically the problem can occur but I wouldn't worry about it in this context
@@ -120,6 +122,18 @@ class GrapheneBulkActionStatus(graphene.Enum): | |||
class Meta: | |||
name = "BulkActionStatus" | |||
|
|||
def to_dagster_run_status(self) -> GrapheneRunStatus: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a stacked PR I want to explore adding more BulkActionStatuses so that we can map more cleanly to DagsterRunStatuses. For example, the COMPLETED status could mean some jobs failed, so always displaying as SUCCESS could hide issues from the user
name = "RunType" | ||
|
||
|
||
class GrapheneMegaRun(graphene.ObjectType): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming thread! what do we want to call this?
- MegaRun
- BackfillOrRun
- LeadRun
- RunSet
- ??? I'm not particularly happy with any of these so far
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of a new concrete type, it may be better to use an interface or a union and return the existing underlying types for run and backfill
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Union is easiest from server but then client has to fork all rendering logic based on type , interface allows for a common set of fields to handle in one place. Talking with @bengotow and looking at what fields are common probably best way to navigate. Can always add new fields to the two concrete types to support the interface as well .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to use an interface
|
||
instance = graphene_info.context.instance | ||
|
||
run_cursor, backfill_cursor = cursor.split(";") if cursor else (None, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cursor format: For querying runs, the cursor is the ID of the last run in the returned list. I started with something similar here where the cursor is the last run id and the last backfill id separated by a ";". The difficulty with that is that users would need to iterate through the returned list to find the last run/backfill. We could add a resolver on GrapheneMegaRuns
that returns the cursor for them (i think that's reasonable), but i also want to think about cursor formats that might be easier to generate in the first place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heres what I did for similar problem recently https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/remote_representation/external.py#L85-L112
the cursor should be opaque to the client, it just gets echoed back to the server to fetch more
python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_mega_run.py
Outdated
Show resolved
Hide resolved
c6ac4ac
to
034c880
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
especially if we move away from a new concrete type, we can focus the naming on the capability we are adding more ie:
mergedRuns
, GrapheneMergedRunsConnection
, GrapheneMergedRunInterface/Union
- merged runs
- grouped runs
- collapsed runs
- run feed (run feed entry/unit)
|
||
# order runs and backfills by create_time. typically we sort by storage id but that won't work here since | ||
# they are different tables | ||
all_mega_runs = sorted( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically the problem can occur but I wouldn't worry about it in this context
|
||
instance = graphene_info.context.instance | ||
|
||
run_cursor, backfill_cursor = cursor.split(";") if cursor else (None, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heres what I did for similar problem recently https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/remote_representation/external.py#L85-L112
the cursor should be opaque to the client, it just gets echoed back to the server to fetch more
name = "RunType" | ||
|
||
|
||
class GrapheneMegaRun(graphene.ObjectType): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of a new concrete type, it may be better to use an interface or a union and return the existing underlying types for run and backfill
class GrapheneMegaRuns(graphene.ObjectType): | ||
results = non_null_list("dagster_graphql.schema.pipelines.pipeline.GrapheneMegaRun") | ||
|
||
class Meta: | ||
name = "MegaRuns" | ||
|
||
def __init__(self, cursor, limit): | ||
super().__init__() | ||
|
||
self._cursor = cursor | ||
self._limit = limit | ||
|
||
def resolve_results(self, graphene_info: ResolveInfo): | ||
return get_mega_runs(graphene_info, self._cursor, self._limit) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having an intermediate object here we should model this more as a "connection" than just a plural of results. Basically this just means returning cursor and maybe "has more" here. Also might inform a slightly different name. We are not consistent about this but if you grep for "connection" you can see some examples.
https://graphql.org/learn/pagination/#complete-connection-model
python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_mega_run.py
Outdated
Show resolved
Hide resolved
2eee30f
to
28b99d0
Compare
48aede2
to
60c8119
Compare
@alangenfeld i like the capability-focused naming option. I like |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i like the capability-focused naming option. I like RunsFeed(Connection/Entry/...). Maybe it's a bit too narrow on the use case of displaying the runs feed, but i kind of like how clear and precise it is about its purpose
This makes a lot of sense to me.
FWIW I had been imagining that "mega run" would be the new name for what used to be called "backfill", and we'd use something like "outer run" to refer to what's called "mega run" in this PR, which is a run that's not a "sub-run". Regular ol' runs don't seem particularly "mega" to me.
python_modules/dagster-graphql/dagster_graphql/schema/backfill.py
Outdated
Show resolved
Hide resolved
@alangenfeld @sryza I did a renaming pass to align everything on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice this is close, worth a roundtrip i think tho
python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py
Outdated
Show resolved
Hide resolved
python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py
Outdated
Show resolved
Hide resolved
python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py
Outdated
Show resolved
Hide resolved
while len(backfills) < limit: | ||
new_backfills = instance.get_backfills(cursor=cursor, limit=limit) | ||
if len(new_backfills) == 0: | ||
return backfills | ||
cursor = new_backfills[-1].backfill_id | ||
backfills.extend( | ||
[ | ||
backfill | ||
for backfill in new_backfills | ||
if backfill.backfill_timestamp <= created_before | ||
] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
were looping here justto support created before filter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, it's not ideal. there isn't support for filtering backfills on anything other than status. I want to add more filter support for backfills, but didn't want to expand this PR. Planning to stack a branch that adds more filtering capabilities
# they are different tables | ||
all_runs = sorted( | ||
all_runs, | ||
key=lambda x: x.resolve_creationTime(graphene_info), # ideally could just do .creationTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a method on the two python objects we can call instead ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the two objects being GrapheneRun
and GraphenePartitionBackfill
or RunRecord
and PartitionBackfill
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah right these are the graphene objects, yea i think it would still feel a bit cleaner to add a @property
creation_time
to call here and then have their resolve_
methods call in to as well but thats more of a nit pick so leave to your call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other option would be to hold off on converting to the Graphene...
objects until the very end and then iterate through the final list of objects to return and do the conversion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and add a shared prop to both RunRecord
and PartitionBackfill
python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py
Outdated
Show resolved
Hide resolved
creationTime = graphene.NonNull( | ||
graphene.Float | ||
) # for RunsFeedEntry interface - dupe of timestamp | ||
startTime = graphene.Float() # for RunsFeedEntry interface - dupe of timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe we should consolidate on to timestamp
instead since its a float
? I feel like time
is a better name for returning like a datetime object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i like that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my thinking with keeping createTime, startTime, endTime was that in the future where everything can be a single run, we won't have these duplicate fields on GrapheneRun
python_modules/dagster-graphql/dagster_graphql/schema/backfill.py
Outdated
Show resolved
Hide resolved
@@ -325,6 +353,11 @@ class Meta: | |||
graphene.NonNull("dagster_graphql.schema.instigation.GrapheneInstigationEventConnection"), | |||
cursor=graphene.String(), | |||
) | |||
jobName = graphene.String() # for RunsFeedEntry interface - dupe of partitionSetName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm not sure about this part of the interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i dont know what to do here either. i was thinking about naming it something more generic like target
but then it seems like it should encompass assetSelection
and assetCheckSelection
too. maybe that would be ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the end the question is how would this field be used on the client - so best path is to get @bengotow 's input either in this initial PR or maybe defer it to a second iteration
a375d5b
to
831dfa0
Compare
"""Placeholder for this PR. Will do a more thurough pass to accurately convert backfill status | ||
to DagsterRunStatus in a stacked branch. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably want to edit comment if this is going to land and not be just PR time placeholder
return GrapheneRunStatus.FAILURE | ||
if self.args[0] == GrapheneBulkActionStatus.CANCELED.value: | ||
return GrapheneRunStatus.CANCELED | ||
return GrapheneRunStatus.CANCELING |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe map canceling -> canceling and default to something else
# CURRENT_TIMESTAMP only has second precision for sqlite, so if we create runs and backfills without any delay | ||
# the resulting list is a chunk of runs and then a chunk of backfills when ordered by time. Adding a small | ||
# delay between creating a run and a backfill makes the resulting list more interwoven | ||
CREATE_DELAY = 0.5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how slow does this make the test? is there a way to create the runs/backfills in a setup function / fixture so we only have to do it once for all the test cases? Should we constify how many things we create?
…ill/run is made between pages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will defer to others on the Python bits but the new schema looks good to me! Just left one inline comment
runId: String! | ||
runStatus: RunStatus! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This schema looks great but giving backfills a runId
feels a bit icky -- I wonder if we could call the common interface's id and status just id
and status
, or potentially make new entryId
and entryStatus
?
Alternatively it might be nice to put a Grafene description on these fields to clarify that they're just meant to implement this common interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
id and status are already taken (by the backfill id and bulk action status respectively). My hesitation with adding entryId
and entryStatus
is that we'd have to add those to GrapheneRun
as well. I was hoping to keep GrapheneRun
as unchanged as possible so that in the future when we've eliminated the need for backfills entirely, the GrapheneRun
doesn't have attributes that are there because of an interface that is no longer necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll definitely add descriptions though. I'm also open to pushback on not doing entryId/Status
. If it adds significant confusion/code smell it might not be worth it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you might be able to get by with id
since its String
(or ID
) in both cases
the status
one i think is useful since it flattens to just one enum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alangenfeld were you thinking we'd switch GraphenRun.id of GraphenePartitionBackfill.id to either string or ID so that the types match?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the status one i think is useful since it flattens to just one enum
can you elaborate on this a bit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
were you thinking we'd switch GraphenRun.id of GraphenePartitionBackfill.id to either string or ID so that the types match?
I assumed they were the same, i think its fine to change backfill.id to ID
to align them, pretty sure that a safe change since ID is String
can you elaborate on this a bit?
as opposed to the id change above, we cant change the return type of either existing status field so we need to add a new field to return one enum type on both objects. That assumes that we want that which @bengotow can speak to
e9bda94
to
55ab112
Compare
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
1b3f915
to
89a653e
Compare
Summary & Motivation
Introduces a way to query for the Runs Feed (backfills and runs that are not part of backfills).
Main entrypoint is the
GrapheneRunsFeedConnection
that returns a list ofGrapheneRunsFeedEntry
s, a cursor, and whether more entries exist for future calls.GrapheneRun
andGraphenePartitionBackfill
both implement theGrapheneRunsFeedEntry
interface, which should provide all of the attributes necessary to render the Runs Feed UI (@bengotow if you find during implementing the UI that something is missing, lmk)How I Tested These Changes
new unit tests