diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index db27a3290f78c..8e5c3f03e81c2 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -1290,13 +1290,13 @@ def _ensure_persisted_job_snapshot( job_snapshot: "JobSnap", parent_job_snapshot: "Optional[JobSnap]", ) -> str: - from dagster._core.snap import JobSnap, create_job_snapshot_id + from dagster._core.snap import JobSnap check.inst_param(job_snapshot, "job_snapshot", JobSnap) check.opt_inst_param(parent_job_snapshot, "parent_job_snapshot", JobSnap) if job_snapshot.lineage_snapshot: - parent_snapshot_id = create_job_snapshot_id(check.not_none(parent_job_snapshot)) + parent_snapshot_id = check.not_none(parent_job_snapshot).snapshot_id if job_snapshot.lineage_snapshot.parent_snapshot_id != parent_snapshot_id: warnings.warn( @@ -1308,7 +1308,7 @@ def _ensure_persisted_job_snapshot( check.not_none(parent_job_snapshot), parent_snapshot_id ) - job_snapshot_id = create_job_snapshot_id(job_snapshot) + job_snapshot_id = job_snapshot.snapshot_id if not self._run_storage.has_job_snapshot(job_snapshot_id): returned_job_snapshot_id = self._run_storage.add_job_snapshot(job_snapshot) check.invariant(job_snapshot_id == returned_job_snapshot_id) diff --git a/python_modules/dagster/dagster/_core/remote_representation/job_index.py b/python_modules/dagster/dagster/_core/remote_representation/job_index.py index 8279514908f7f..0cc1964996e5a 100644 --- a/python_modules/dagster/dagster/_core/remote_representation/job_index.py +++ b/python_modules/dagster/dagster/_core/remote_representation/job_index.py @@ -1,9 +1,8 @@ -from threading import Lock from typing import Any, Mapping, Optional, Sequence, Union import dagster._check as check from dagster._config import ConfigSchemaSnapshot -from dagster._core.snap import DependencyStructureIndex, JobSnap, create_job_snapshot_id +from dagster._core.snap import DependencyStructureIndex, JobSnap from dagster._core.snap.dagster_types import DagsterTypeSnap from dagster._core.snap.mode import ModeDefSnap from dagster._core.snap.node import GraphDefSnap, OpDefSnap @@ -47,9 +46,6 @@ def __init__( for comp_snap in job_snapshot.node_defs_snapshot.graph_def_snaps } - self._memo_lock = Lock() - self._job_snapshot_id = None - @property def name(self) -> str: return self.job_snapshot.name @@ -68,10 +64,7 @@ def metadata(self): @property def job_snapshot_id(self) -> str: - with self._memo_lock: - if not self._job_snapshot_id: - self._job_snapshot_id = create_job_snapshot_id(self.job_snapshot) - return self._job_snapshot_id + return self.job_snapshot.snapshot_id def has_dagster_type_name(self, type_name: str) -> bool: return type_name in self._dagster_type_snaps_by_name_index diff --git a/python_modules/dagster/dagster/_core/snap/__init__.py b/python_modules/dagster/dagster/_core/snap/__init__.py index 0703e304bc392..02fa0a0486dd6 100644 --- a/python_modules/dagster/dagster/_core/snap/__init__.py +++ b/python_modules/dagster/dagster/_core/snap/__init__.py @@ -54,10 +54,7 @@ create_execution_plan_snapshot_id as create_execution_plan_snapshot_id, snapshot_from_execution_plan as snapshot_from_execution_plan, ) -from dagster._core.snap.job_snapshot import ( - JobSnap as JobSnap, - create_job_snapshot_id as create_job_snapshot_id, -) +from dagster._core.snap.job_snapshot import JobSnap as JobSnap from dagster._core.snap.mode import ( LoggerDefSnap as LoggerDefSnap, ModeDefSnap as ModeDefSnap, diff --git a/python_modules/dagster/dagster/_core/snap/job_snapshot.py b/python_modules/dagster/dagster/_core/snap/job_snapshot.py index e8dd6e0c5edd6..a6751ca0e8106 100644 --- a/python_modules/dagster/dagster/_core/snap/job_snapshot.py +++ b/python_modules/dagster/dagster/_core/snap/job_snapshot.py @@ -1,3 +1,4 @@ +from functools import cached_property from typing import AbstractSet, Any, Dict, Mapping, Optional, Sequence, Union, cast from dagster import _check as check @@ -52,9 +53,8 @@ from dagster._serdes.serdes import RecordSerializer -def create_job_snapshot_id(snapshot: "JobSnap") -> str: - check.inst_param(snapshot, "snapshot", JobSnap) - return create_snapshot_id(snapshot) +def _create_job_snapshot_id(job_snap: "JobSnap"): + return create_snapshot_id(job_snap) class JobSnapSerializer(RecordSerializer["JobSnap"]): @@ -157,17 +157,13 @@ def from_job_def(cls, job_def: JobDefinition) -> "JobSnap": lineage = None if job_def.op_selection_data: lineage = JobLineageSnap( - parent_snapshot_id=create_job_snapshot_id( - cls.from_job_def(job_def.op_selection_data.parent_job_def) - ), + parent_snapshot_id=job_def.op_selection_data.parent_job_def.get_job_snapshot_id(), op_selection=sorted(job_def.op_selection_data.op_selection), resolved_op_selection=job_def.op_selection_data.resolved_op_selection, ) if job_def.asset_selection_data: lineage = JobLineageSnap( - parent_snapshot_id=create_job_snapshot_id( - cls.from_job_def(job_def.asset_selection_data.parent_job_def) - ), + parent_snapshot_id=job_def.asset_selection_data.parent_job_def.get_job_snapshot_id(), asset_selection=job_def.asset_selection_data.asset_selection, asset_check_selection=job_def.asset_selection_data.asset_check_selection, ) @@ -187,6 +183,10 @@ def from_job_def(cls, job_def: JobDefinition) -> "JobSnap": graph_def_name=job_def.graph.name, ) + @cached_property + def snapshot_id(self) -> str: + return _create_job_snapshot_id(self) + def get_node_def_snap(self, node_def_name: str) -> Union[OpDefSnap, GraphDefSnap]: check.str_param(node_def_name, "node_def_name") for node_def_snap in self.node_defs_snapshot.op_def_snaps: diff --git a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py index cbdd76922c3e0..d3ff11eadf561 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py +++ b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py @@ -40,12 +40,7 @@ ) from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill from dagster._core.remote_representation.origin import RemoteJobOrigin -from dagster._core.snap import ( - ExecutionPlanSnapshot, - JobSnap, - create_execution_plan_snapshot_id, - create_job_snapshot_id, -) +from dagster._core.snap import ExecutionPlanSnapshot, JobSnap, create_execution_plan_snapshot_id from dagster._core.storage.dagster_run import ( DagsterRun, DagsterRunStatus, @@ -580,7 +575,7 @@ def add_job_snapshot(self, job_snapshot: JobSnap, snapshot_id: Optional[str] = N check.opt_str_param(snapshot_id, "snapshot_id") if not snapshot_id: - snapshot_id = create_job_snapshot_id(job_snapshot) + snapshot_id = job_snapshot.snapshot_id return self._add_snapshot( snapshot_id=snapshot_id, diff --git a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py index c599260aab6d5..01a05aa141e95 100644 --- a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py +++ b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py @@ -36,11 +36,7 @@ from dagster._core.instance.config import DEFAULT_LOCAL_CODE_SERVER_STARTUP_TIMEOUT from dagster._core.launcher import LaunchRunContext, RunLauncher from dagster._core.run_coordinator.queued_run_coordinator import QueuedRunCoordinator -from dagster._core.snap import ( - create_execution_plan_snapshot_id, - create_job_snapshot_id, - snapshot_from_execution_plan, -) +from dagster._core.snap import create_execution_plan_snapshot_id, snapshot_from_execution_plan from dagster._core.storage.partition_status_cache import AssetPartitionStatus, AssetStatusCacheValue from dagster._core.storage.sqlite_storage import ( _event_logs_directory, @@ -266,7 +262,7 @@ def test_create_job_snapshot(): run = instance.get_run_by_id(result.run_id) - assert run.job_snapshot_id == create_job_snapshot_id(noop_job.get_job_snapshot()) + assert run.job_snapshot_id == noop_job.get_job_snapshot().snapshot_id def test_create_execution_plan_snapshot(): diff --git a/python_modules/dagster/dagster_tests/core_tests/snap_tests/test_active_data.py b/python_modules/dagster/dagster_tests/core_tests/snap_tests/test_active_data.py index 9130492538d05..26bb5d2a09572 100644 --- a/python_modules/dagster/dagster_tests/core_tests/snap_tests/test_active_data.py +++ b/python_modules/dagster/dagster_tests/core_tests/snap_tests/test_active_data.py @@ -9,9 +9,8 @@ RepositorySnap, TimeWindowPartitionsSnap, ) -from dagster._core.snap.job_snapshot import create_job_snapshot_id -from dagster._core.test_utils import in_process_test_workspace, instance_for_test -from dagster._core.types.loadable_target_origin import LoadableTargetOrigin +from dagster._core.snap.job_snapshot import _create_job_snapshot_id +from dagster._core.test_utils import create_test_daemon_workspace_context, instance_for_test from dagster._serdes import serialize_pp from dagster._time import get_current_datetime @@ -73,44 +72,60 @@ def test_remote_job_data(snapshot): ) -@mock.patch("dagster._core.remote_representation.job_index.create_job_snapshot_id") -def test_remote_repo_shared_index(snapshot_mock): - # ensure we don't rebuild indexes / snapshot ids repeatedly +import os - snapshot_mock.side_effect = create_job_snapshot_id - with instance_for_test() as instance: - with in_process_test_workspace( - instance, LoadableTargetOrigin(python_file=__file__) - ) as workspace: +from dagster._core.workspace.load_target import ModuleTarget - def _fetch_snap_id(): - location = workspace.code_locations[0] - ex_repo = next(iter(location.get_repositories().values())) - return ex_repo.get_all_jobs()[0].identifying_job_snapshot_id - _fetch_snap_id() - assert snapshot_mock.call_count == 1 +def workspace_load_target(): + return ModuleTarget( + module_name="dagster_tests.core_tests.snap_tests.test_active_data", + attribute="a_repo", + working_directory=os.path.join(os.path.dirname(__file__), "..", "..", ".."), + location_name="test_location", + ) - _fetch_snap_id() - assert snapshot_mock.call_count == 1 +def test_remote_repo_shared_index_single_threaded(): + # ensure we don't rebuild indexes / snapshot ids repeatedly + with mock.patch("dagster._core.snap.job_snapshot._create_job_snapshot_id") as snapshot_mock: + snapshot_mock.side_effect = _create_job_snapshot_id + with instance_for_test() as instance: + with create_test_daemon_workspace_context( + workspace_load_target(), + instance, + ) as workspace_process_context: + workspace = workspace_process_context.create_request_context() -@mock.patch("dagster._core.remote_representation.job_index.create_job_snapshot_id") -def test_remote_repo_shared_index_threaded(snapshot_mock): - # ensure we don't rebuild indexes / snapshot ids repeatedly across threads + def _fetch_snap_id(): + location = workspace.code_locations[0] + ex_repo = next(iter(location.get_repositories().values())) + return ex_repo.get_all_jobs()[0].identifying_job_snapshot_id - snapshot_mock.side_effect = create_job_snapshot_id - with instance_for_test() as instance: - with in_process_test_workspace( - instance, LoadableTargetOrigin(python_file=__file__) - ) as workspace: + _fetch_snap_id() + assert snapshot_mock.call_count == 1 - def _fetch_snap_id(): - location = workspace.code_locations[0] - ex_repo = next(iter(location.get_repositories().values())) - return ex_repo.get_all_jobs()[0].identifying_job_snapshot_id + _fetch_snap_id() + assert snapshot_mock.call_count == 1 - with ThreadPoolExecutor() as executor: - wait([executor.submit(_fetch_snap_id) for _ in range(100)]) - assert snapshot_mock.call_count == 1 +def test_remote_repo_shared_index_multi_threaded(): + # ensure we don't rebuild indexes / snapshot ids repeatedly across threads + with mock.patch("dagster._core.snap.job_snapshot._create_job_snapshot_id") as snapshot_mock: + snapshot_mock.side_effect = _create_job_snapshot_id + with instance_for_test() as instance: + with create_test_daemon_workspace_context( + workspace_load_target(), + instance, + ) as workspace_process_context: + workspace = workspace_process_context.create_request_context() + + def _fetch_snap_id(): + location = workspace.code_locations[0] + ex_repo = next(iter(location.get_repositories().values())) + return ex_repo.get_all_jobs()[0].identifying_job_snapshot_id + + with ThreadPoolExecutor() as executor: + wait([executor.submit(_fetch_snap_id) for _ in range(100)]) + + assert snapshot_mock.call_count == 1 diff --git a/python_modules/dagster/dagster_tests/core_tests/snap_tests/test_execution_plan.py b/python_modules/dagster/dagster_tests/core_tests/snap_tests/test_execution_plan.py index 6fb6b387c0126..58b7928a284fe 100644 --- a/python_modules/dagster/dagster_tests/core_tests/snap_tests/test_execution_plan.py +++ b/python_modules/dagster/dagster_tests/core_tests/snap_tests/test_execution_plan.py @@ -1,6 +1,6 @@ from dagster import GraphOut, In, Out, graph, job, op from dagster._core.execution.api import create_execution_plan -from dagster._core.snap import create_job_snapshot_id, snapshot_from_execution_plan +from dagster._core.snap import snapshot_from_execution_plan from dagster._serdes import serialize_pp @@ -17,10 +17,7 @@ def noop_job(): snapshot.assert_match( serialize_pp( - snapshot_from_execution_plan( - execution_plan, - create_job_snapshot_id(noop_job.get_job_snapshot()), - ) + snapshot_from_execution_plan(execution_plan, noop_job.get_job_snapshot().snapshot_id) ) ) @@ -44,7 +41,7 @@ def noop_job(): serialize_pp( snapshot_from_execution_plan( execution_plan, - create_job_snapshot_id(noop_job.get_job_snapshot()), + noop_job.get_job_snapshot().snapshot_id, ) ) ) @@ -84,7 +81,7 @@ def do_comps(): serialize_pp( snapshot_from_execution_plan( execution_plan, - create_job_snapshot_id(do_comps.get_job_snapshot()), + do_comps.get_job_snapshot().snapshot_id, ) ) ) @@ -105,7 +102,7 @@ def noop_job(): serialize_pp( snapshot_from_execution_plan( execution_plan, - create_job_snapshot_id(noop_job.get_job_snapshot()), + noop_job.get_job_snapshot().snapshot_id, ) ) ) diff --git a/python_modules/dagster/dagster_tests/core_tests/snap_tests/test_job_snap.py b/python_modules/dagster/dagster_tests/core_tests/snap_tests/test_job_snap.py index e3c7c719ab209..797b4ee90605f 100644 --- a/python_modules/dagster/dagster_tests/core_tests/snap_tests/test_job_snap.py +++ b/python_modules/dagster/dagster_tests/core_tests/snap_tests/test_job_snap.py @@ -7,7 +7,6 @@ DependencyStructureIndex, JobSnap, NodeInvocationSnap, - create_job_snapshot_id, snap_from_config_type, ) from dagster._core.snap.dep_snapshot import ( @@ -51,7 +50,7 @@ def test_empty_job_snap_props(snapshot): assert job_snapshot == serialize_rt(job_snapshot) snapshot.assert_match(serialize_pp(job_snapshot)) - snapshot.assert_match(create_job_snapshot_id(job_snapshot)) + snapshot.assert_match(job_snapshot.snapshot_id) def test_job_snap_all_props(snapshot): @@ -72,7 +71,7 @@ def noop_job(): assert job_snapshot == serialize_rt(job_snapshot) snapshot.assert_match(serialize_pp(job_snapshot)) - snapshot.assert_match(create_job_snapshot_id(job_snapshot)) + snapshot.assert_match(job_snapshot.snapshot_id) def test_noop_deps_snap(): @@ -107,7 +106,7 @@ def two_op_job(): assert job_snapshot == serialize_rt(job_snapshot) snapshot.assert_match(serialize_pp(job_snapshot)) - snapshot.assert_match(create_job_snapshot_id(job_snapshot)) + snapshot.assert_match(job_snapshot.snapshot_id) def test_basic_dep(): @@ -177,7 +176,7 @@ def single_dep_job(): assert job_snapshot == serialize_rt(job_snapshot) snapshot.assert_match(serialize_pp(job_snapshot)) - snapshot.assert_match(create_job_snapshot_id(job_snapshot)) + snapshot.assert_match(job_snapshot.snapshot_id) def test_basic_fan_in(snapshot): @@ -218,7 +217,7 @@ def fan_in_test(): assert job_snapshot == serialize_rt(job_snapshot) snapshot.assert_match(serialize_pp(job_snapshot)) - snapshot.assert_match(create_job_snapshot_id(job_snapshot)) + snapshot.assert_match(job_snapshot.snapshot_id) def _dict_has_stable_hashes(hydrated_map, snapshot_config_snap_map): diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_reconstructable.py b/python_modules/dagster/dagster_tests/definitions_tests/test_reconstructable.py index 873f5819137d6..51718347fff62 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_reconstructable.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_reconstructable.py @@ -20,7 +20,7 @@ JobPythonOrigin, RepositoryPythonOrigin, ) -from dagster._core.snap import JobSnap, create_job_snapshot_id +from dagster._core.snap import JobSnap from dagster._core.test_utils import instance_for_test from dagster._utils import file_relative_path from dagster._utils.hosted_user_process import recon_job_from_origin @@ -52,7 +52,7 @@ def get_with_args(_x): def pid(pipeline_def): - return create_job_snapshot_id(JobSnap.from_job_def(pipeline_def)) + return JobSnap.from_job_def(pipeline_def).snapshot_id @job diff --git a/python_modules/dagster/dagster_tests/general_tests/compat_tests/test_change_snapshot_structure.py b/python_modules/dagster/dagster_tests/general_tests/compat_tests/test_change_snapshot_structure.py index c22550e8582ac..84ecbaf9ec0de 100644 --- a/python_modules/dagster/dagster_tests/general_tests/compat_tests/test_change_snapshot_structure.py +++ b/python_modules/dagster/dagster_tests/general_tests/compat_tests/test_change_snapshot_structure.py @@ -1,6 +1,6 @@ from dagster._core.instance import DagsterInstance, InstanceRef from dagster._core.remote_representation import RemoteExecutionPlan -from dagster._core.snap import create_execution_plan_snapshot_id, create_job_snapshot_id +from dagster._core.snap import create_execution_plan_snapshot_id from dagster._utils import file_relative_path from dagster._utils.test import copy_directory @@ -22,7 +22,7 @@ def test_run_created_in_0_7_9_snapshot_id_change(): # It is the pipeline snapshot that changed # Verify that snapshot ids are not equal. This changed in 0.7.10 - created_snapshot_id = create_job_snapshot_id(job_snapshot) + created_snapshot_id = job_snapshot.snapshot_id assert created_snapshot_id != old_job_snapshot_id # verify that both are accessible off of the historical pipeline diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py index 9072ac74ec235..1cd9824521160 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py @@ -24,7 +24,6 @@ RemoteRepositoryOrigin, ) from dagster._core.run_coordinator import DefaultRunCoordinator -from dagster._core.snap import create_job_snapshot_id from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus, RunsFilter from dagster._core.storage.event_log import InMemoryEventLogStorage from dagster._core.storage.noop_compute_log_manager import NoOpComputeLogManager @@ -237,8 +236,8 @@ def test_fetch_by_snapshot_id(self, storage): job_def_b = GraphDefinition(name="some_other_pipeline", node_defs=[]).to_job() job_snapshot_a = job_def_a.get_job_snapshot() job_snapshot_b = job_def_b.get_job_snapshot() - job_snapshot_a_id = create_job_snapshot_id(job_snapshot_a) - job_snapshot_b_id = create_job_snapshot_id(job_snapshot_b) + job_snapshot_a_id = job_snapshot_a.snapshot_id + job_snapshot_b_id = job_snapshot_b.snapshot_id assert storage.add_job_snapshot(job_snapshot_a) == job_snapshot_a_id assert storage.add_job_snapshot(job_snapshot_b) == job_snapshot_b_id @@ -1080,7 +1079,7 @@ def test_write_conflicting_run_id(self, storage: RunStorage): def test_add_get_snapshot(self, storage): job_def = GraphDefinition(name="some_pipeline", node_defs=[]).to_job() job_snapshot = job_def.get_job_snapshot() - job_snapshot_id = create_job_snapshot_id(job_snapshot) + job_snapshot_id = job_snapshot.snapshot_id assert storage.add_job_snapshot(job_snapshot) == job_snapshot_id fetch_job_snapshot = storage.get_job_snapshot(job_snapshot_id) @@ -1100,7 +1099,7 @@ def test_single_write_read_with_snapshot(self, storage: RunStorage): job_snapshot = job_def.get_job_snapshot() - job_snapshot_id = create_job_snapshot_id(job_snapshot) + job_snapshot_id = job_snapshot.snapshot_id run_with_snapshot = DagsterRun( run_id=run_with_snapshot_id, @@ -1811,7 +1810,7 @@ def test_debug_snapshot_import(self, storage): job_def = GraphDefinition(name="some_pipeline", node_defs=[]).to_job() job_snapshot = job_def.get_job_snapshot() - job_snapshot_id = create_job_snapshot_id(job_snapshot) + job_snapshot_id = job_snapshot.snapshot_id new_job_snapshot_id = f"{job_snapshot_id}-new-snapshot" storage.add_snapshot(job_snapshot, snapshot_id=new_job_snapshot_id)