From 17227797349042caa05b5c2f4b2a546ee010d314 Mon Sep 17 00:00:00 2001 From: gibsondan Date: Wed, 4 Sep 2024 11:15:47 -0500 Subject: [PATCH] [3/n] Use a consistent workspace throughout a single invocation of submit_asset_run (#24070) Summary: Ensures that the workspace and asset graph going into a given invocation of submit_asset_run are always in sync, preventing an issue that we saw while switching over to single implicit asset jobs where the collapsing of many jobs down into a single job caused issues. To make this performant and not generate a brand new asset graph for every single run, depends on the changes earlier in the stack in https://github.com/dagster-io/dagster/pull/24069 that caches the asset graph for a given CodeLocationEntriesSnapshot object. Test Plan: BK, do an asset backfill that logs on asset graph creation and verify that despite being property-d on each run, the asset graph is not regenerated each run ## Summary & Motivation ## How I Tested These Changes ## Changelog [New | Bug | Docs] > Replace this message with a changelog entry, or `NOCHANGELOG` --- .../dagster/dagster/_core/execution/asset_backfill.py | 1 - .../dagster/dagster/_core/execution/submit_asset_runs.py | 4 +--- python_modules/dagster/dagster/_daemon/asset_daemon.py | 1 - 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 558e8fe2226e5..cacde7b2ee4e4 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -756,7 +756,6 @@ def _submit_runs_and_update_backfill_in_chunks( run_request_idx, instance, workspace_process_context, - asset_graph, run_request_execution_data_cache, {}, logger, diff --git a/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py b/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py index b81897b03892b..ee8ef79783ebb 100644 --- a/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py +++ b/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py @@ -110,7 +110,6 @@ def _create_asset_run( run_request_index: int, instance: DagsterInstance, run_request_execution_data_cache: Dict[int, RunRequestExecutionData], - asset_graph: RemoteAssetGraph, workspace_process_context: IWorkspaceProcessContext, debug_crash_flags: SingleInstigatorDebugCrashFlags, logger: logging.Logger, @@ -133,6 +132,7 @@ def _create_asset_run( # create a new request context for each run in case the code location server # is swapped out in the middle of the submission process workspace = workspace_process_context.create_request_context() + asset_graph = workspace.asset_graph execution_data = _get_job_execution_data_from_run_request( asset_graph, run_request, @@ -225,7 +225,6 @@ def submit_asset_run( run_request_index: int, instance: DagsterInstance, workspace_process_context: IWorkspaceProcessContext, - asset_graph: RemoteAssetGraph, run_request_execution_data_cache: Dict[int, RunRequestExecutionData], debug_crash_flags: SingleInstigatorDebugCrashFlags, logger: logging.Logger, @@ -263,7 +262,6 @@ def submit_asset_run( run_request_index, instance, run_request_execution_data_cache, - asset_graph, workspace_process_context, debug_crash_flags, logger, diff --git a/python_modules/dagster/dagster/_daemon/asset_daemon.py b/python_modules/dagster/dagster/_daemon/asset_daemon.py index 6ab531504075e..663794f32658c 100644 --- a/python_modules/dagster/dagster/_daemon/asset_daemon.py +++ b/python_modules/dagster/dagster/_daemon/asset_daemon.py @@ -1060,7 +1060,6 @@ def _evaluate_auto_materialize_tick( instance=instance, workspace_process_context=workspace_process_context, run_request_execution_data_cache=run_request_execution_data_cache, - asset_graph=asset_graph, debug_crash_flags=debug_crash_flags, logger=self._logger, )