diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index b33b4422a0c6d..02d95a25ba4a1 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -1277,14 +1277,16 @@ def _ensure_persisted_job_snapshot( check.opt_inst_param(parent_job_snapshot, "parent_job_snapshot", JobSnapshot) if job_snapshot.lineage_snapshot: - if not self._run_storage.has_job_snapshot( - job_snapshot.lineage_snapshot.parent_snapshot_id - ): - returned_job_snapshot_id = self._run_storage.add_job_snapshot( - parent_job_snapshot # type: ignore # (possible none) + parent_snapshot_id = create_job_snapshot_id(check.not_none(parent_job_snapshot)) + + if job_snapshot.lineage_snapshot.parent_snapshot_id != parent_snapshot_id: + warnings.warn( + f"Stored parent snapshot ID {parent_snapshot_id} did not match the parent snapshot ID {job_snapshot.lineage_snapshot.parent_snapshot_id} on the subsetted job" ) - check.invariant( - job_snapshot.lineage_snapshot.parent_snapshot_id == returned_job_snapshot_id + + if not self._run_storage.has_job_snapshot(parent_snapshot_id): + self._run_storage.add_job_snapshot( + check.not_none(parent_job_snapshot), parent_snapshot_id ) job_snapshot_id = create_job_snapshot_id(job_snapshot)