Skip to content

Commit

Permalink
[3/n] Use a consistent workspace throughout a single invocation of su…
Browse files Browse the repository at this point in the history
…bmit_asset_run (#24070)

Summary:
Ensures that the workspace and asset graph going into a given invocation
of submit_asset_run are always in sync, preventing an issue that we saw
while switching over to single implicit asset jobs where the collapsing
of many jobs down into a single job caused issues. To make this
performant and not generate a brand new asset graph for every single
run, depends on the changes earlier in the stack in
#24069 that caches the asset
graph for a given CodeLocationEntriesSnapshot object.

Test Plan: BK, do an asset backfill that logs on asset graph creation
and verify that despite being property-d on each run, the asset graph is
not regenerated each run

## Summary & Motivation

## How I Tested These Changes

## Changelog [New | Bug | Docs]

> Replace this message with a changelog entry, or `NOCHANGELOG`
  • Loading branch information
gibsondan authored Sep 4, 2024
1 parent 32e3f8c commit 1722779
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,6 @@ def _submit_runs_and_update_backfill_in_chunks(
run_request_idx,
instance,
workspace_process_context,
asset_graph,
run_request_execution_data_cache,
{},
logger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ def _create_asset_run(
run_request_index: int,
instance: DagsterInstance,
run_request_execution_data_cache: Dict[int, RunRequestExecutionData],
asset_graph: RemoteAssetGraph,
workspace_process_context: IWorkspaceProcessContext,
debug_crash_flags: SingleInstigatorDebugCrashFlags,
logger: logging.Logger,
Expand All @@ -133,6 +132,7 @@ def _create_asset_run(
# create a new request context for each run in case the code location server
# is swapped out in the middle of the submission process
workspace = workspace_process_context.create_request_context()
asset_graph = workspace.asset_graph
execution_data = _get_job_execution_data_from_run_request(
asset_graph,
run_request,
Expand Down Expand Up @@ -225,7 +225,6 @@ def submit_asset_run(
run_request_index: int,
instance: DagsterInstance,
workspace_process_context: IWorkspaceProcessContext,
asset_graph: RemoteAssetGraph,
run_request_execution_data_cache: Dict[int, RunRequestExecutionData],
debug_crash_flags: SingleInstigatorDebugCrashFlags,
logger: logging.Logger,
Expand Down Expand Up @@ -263,7 +262,6 @@ def submit_asset_run(
run_request_index,
instance,
run_request_execution_data_cache,
asset_graph,
workspace_process_context,
debug_crash_flags,
logger,
Expand Down
1 change: 0 additions & 1 deletion python_modules/dagster/dagster/_daemon/asset_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,6 @@ def _evaluate_auto_materialize_tick(
instance=instance,
workspace_process_context=workspace_process_context,
run_request_execution_data_cache=run_request_execution_data_cache,
asset_graph=asset_graph,
debug_crash_flags=debug_crash_flags,
logger=self._logger,
)
Expand Down

0 comments on commit 1722779

Please sign in to comment.