diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index fad34cfc4b247..dcd0e7d48185a 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -2433,7 +2433,11 @@ def handle_new_event( for event in events: run_id = event.run_id - if event.is_dagster_event and event.get_dagster_event().is_job_event: + if ( + not self._event_storage.handles_run_events_in_store_event + and event.is_dagster_event + and event.get_dagster_event().is_job_event + ): self._run_storage.handle_run_event(run_id, event.get_dagster_event()) for sub in self._subscribers[run_id]: diff --git a/python_modules/dagster/dagster/_core/storage/event_log/base.py b/python_modules/dagster/dagster/_core/storage/event_log/base.py index 0e7694f6d42a4..17b58b7439c8d 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/base.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/base.py @@ -616,3 +616,7 @@ def get_updated_data_version_partitions( self, asset_key: AssetKey, partitions: Iterable[str], since_storage_id: int ) -> Set[str]: raise NotImplementedError() + + @property + def handles_run_events_in_store_event(self) -> bool: + return False diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_run_storage.py b/python_modules/dagster/dagster_tests/storage_tests/test_run_storage.py index 701e2131bc836..765feee5cf6a0 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_run_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_run_storage.py @@ -1,10 +1,13 @@ +import os import tempfile from contextlib import contextmanager import pytest +from dagster import DagsterInstance from dagster._core.storage.legacy_storage import LegacyRunStorage from dagster._core.storage.runs import InMemoryRunStorage, SqliteRunStorage from dagster._core.storage.sqlite_storage import DagsterSqliteStorage +from dagster._core.test_utils import instance_for_test from dagster_tests.storage_tests.utils.run_storage import TestRunStorage @@ -37,34 +40,57 @@ def create_legacy_run_storage(): storage.dispose() -class TestSqliteImplementation(TestRunStorage): +class TestSqliteRunStorage(TestRunStorage): __test__ = True - @pytest.fixture(name="storage", params=[create_sqlite_run_storage]) - def run_storage(self, request): - with request.param() as s: - yield s + @pytest.fixture(name="instance", scope="function") + def instance(self): + with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdir_path: + with instance_for_test(temp_dir=tmpdir_path) as instance: + yield instance + @pytest.fixture(name="storage", scope="function") + def run_storage(self, instance): + run_storage = instance.run_storage + assert isinstance(run_storage, SqliteRunStorage) + yield run_storage -class TestInMemoryImplementation(TestRunStorage): + +class TestInMemoryRunStorage(TestRunStorage): __test__ = True - @pytest.fixture(name="storage", params=[create_in_memory_storage]) - def run_storage(self, request): - with request.param() as s: - yield s + @pytest.fixture(name="instance", scope="function") + def instance(self): + with DagsterInstance.ephemeral() as the_instance: + yield the_instance + + @pytest.fixture(name="storage") + def run_storage(self, instance): + yield instance.run_storage def test_storage_telemetry(self, storage): pass -class TestLegacyStorage(TestRunStorage): +class TestLegacyRunStorage(TestRunStorage): __test__ = True - @pytest.fixture(name="storage", params=[create_legacy_run_storage]) - def run_storage(self, request): - with request.param() as s: - yield s + @pytest.fixture(name="instance", scope="function") + def instance(self): + with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdir_path: + with instance_for_test(temp_dir=tmpdir_path) as instance: + yield instance + + @pytest.fixture(name="storage", scope="function") + def run_storage(self, instance): + storage = instance.get_ref().storage + assert isinstance(storage, DagsterSqliteStorage) + legacy_storage = LegacyRunStorage(storage) + legacy_storage.register_instance(instance) + try: + yield legacy_storage + finally: + legacy_storage.dispose() def test_storage_telemetry(self, storage): pass diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py index 8876c97eb1b33..9f98fa076cb6e 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py @@ -14,6 +14,7 @@ DagsterSnapshotDoesNotExist, ) from dagster._core.events import DagsterEvent, DagsterEventType, JobFailureData, RunFailureReason +from dagster._core.events.log import EventLogEntry from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill from dagster._core.instance import DagsterInstance, InstanceType from dagster._core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher @@ -80,6 +81,10 @@ def run_storage(self, request): with request.param() as s: yield s + @pytest.fixture(name="instance") + def instance(self, request) -> Optional[DagsterInstance]: + return None + # Override for storages that are not allowed to delete runs def can_delete_runs(self): return True @@ -714,7 +719,7 @@ def test_fetch_by_status(self, storage): run.run_id for run in storage.get_runs(RunsFilter(statuses=[DagsterRunStatus.SUCCESS])) } == set() - def test_failure_event_updates_tags(self, storage): + def test_failure_event_updates_tags(self, storage, instance): assert storage one = make_new_run_id() storage.add_run( @@ -722,22 +727,34 @@ def test_failure_event_updates_tags(self, storage): run_id=one, job_name="some_pipeline", status=DagsterRunStatus.STARTED ) ) - storage.handle_run_event( - one, # fail one after two has fails and three has succeeded - DagsterEvent( - message="a message", - event_type_value=DagsterEventType.PIPELINE_FAILURE.value, - job_name="some_pipeline", - event_specific_data=JobFailureData( - error=None, failure_reason=RunFailureReason.RUN_EXCEPTION + instance.handle_new_event( + self._get_run_event_entry( + DagsterEvent( + message="a message", + event_type_value=DagsterEventType.PIPELINE_FAILURE.value, + job_name="some_pipeline", + event_specific_data=JobFailureData( + error=None, failure_reason=RunFailureReason.RUN_EXCEPTION + ), ), - ), + one, # fail one after two has fails and three has succeeded + ) ) run = _get_run_by_id(storage, one) assert run.tags[RUN_FAILURE_REASON_TAG] == RunFailureReason.RUN_EXCEPTION.value - def test_get_run_records(self, storage): + def _get_run_event_entry(self, dagster_event: DagsterEvent, run_id: str): + return EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=run_id, + timestamp=time.time(), + dagster_event=dagster_event, + ) + + def test_get_run_records(self, storage, instance): assert storage [one, two, three] = [make_new_run_id() for _ in range(3)] storage.add_run( @@ -755,29 +772,35 @@ def test_get_run_records(self, storage): run_id=three, job_name="some_pipeline", status=DagsterRunStatus.STARTED ) ) - storage.handle_run_event( - three, - DagsterEvent( - message="a message", - event_type_value=DagsterEventType.PIPELINE_SUCCESS.value, - job_name="some_pipeline", - ), + instance.handle_new_event( + self._get_run_event_entry( + DagsterEvent( + message="a message", + event_type_value=DagsterEventType.PIPELINE_SUCCESS.value, + job_name="some_pipeline", + ), + three, + ) ) - storage.handle_run_event( - two, - DagsterEvent( - message="a message", - event_type_value=DagsterEventType.PIPELINE_SUCCESS.value, - job_name="some_pipeline", - ), + instance.handle_new_event( + self._get_run_event_entry( + DagsterEvent( + message="a message", + event_type_value=DagsterEventType.PIPELINE_SUCCESS.value, + job_name="some_pipeline", + ), + two, + ) ) - storage.handle_run_event( - one, - DagsterEvent( - message="a message", - event_type_value=DagsterEventType.PIPELINE_SUCCESS.value, - job_name="some_pipeline", - ), + instance.handle_new_event( + self._get_run_event_entry( + DagsterEvent( + message="a message", + event_type_value=DagsterEventType.PIPELINE_SUCCESS.value, + job_name="some_pipeline", + ), + one, + ) ) def _run_ids(records): @@ -791,7 +814,7 @@ def _run_ids(records): assert _run_ids(storage.get_run_records(cursor=three, limit=1)) == [two] assert _run_ids(storage.get_run_records(cursor=one, limit=1, ascending=True)) == [two] - def test_fetch_records_by_update_timestamp(self, storage): + def test_fetch_records_by_update_timestamp(self, storage, instance): assert storage self._skip_in_memory(storage) @@ -813,21 +836,25 @@ def test_fetch_records_by_update_timestamp(self, storage): run_id=three, job_name="some_pipeline", status=DagsterRunStatus.STARTED ) ) - storage.handle_run_event( - three, # three succeeds - DagsterEvent( - message="a message", - event_type_value=DagsterEventType.PIPELINE_SUCCESS.value, - job_name="some_pipeline", + instance.handle_new_event( + self._get_run_event_entry( + DagsterEvent( + message="a message", + event_type_value=DagsterEventType.PIPELINE_SUCCESS.value, + job_name="some_pipeline", + ), + three, # three succeeds ), ) - storage.handle_run_event( - one, # fail one after two has fails and three has succeeded - DagsterEvent( - message="a message", - event_type_value=DagsterEventType.PIPELINE_FAILURE.value, - job_name="some_pipeline", - ), + instance.handle_new_event( + self._get_run_event_entry( + DagsterEvent( + message="a message", + event_type_value=DagsterEventType.PIPELINE_FAILURE.value, + job_name="some_pipeline", + ), + one, # fail one after two has fails and three has succeeded + ) ) record_two = storage.get_run_records( @@ -1469,7 +1496,7 @@ def test_secondary_index(self, storage): for name in REQUIRED_DATA_MIGRATIONS.keys(): assert storage.has_built_index(name) - def test_handle_run_event_job_success_test(self, storage): + def test_handle_run_event_job_success_test(self, storage, instance): run_id = make_new_run_id() run_to_add = TestRunStorage.build_run(job_name="pipeline_name", run_id=run_id) storage.add_run(run_to_add) @@ -1484,36 +1511,40 @@ def test_handle_run_event_job_success_test(self, storage): logging_tags=None, ) - storage.handle_run_event(run_id, dagster_job_start_event) + instance.handle_new_event(self._get_run_event_entry(dagster_job_start_event, run_id)) assert _get_run_by_id(storage, run_id).status == DagsterRunStatus.STARTED - storage.handle_run_event( - make_new_run_id(), # diff run - DagsterEvent( - message="a message", - event_type_value=DagsterEventType.PIPELINE_SUCCESS.value, - job_name="pipeline_name", - step_key=None, - node_handle=None, - step_kind_value=None, - logging_tags=None, - ), + instance.handle_new_event( + self._get_run_event_entry( + DagsterEvent( + message="a message", + event_type_value=DagsterEventType.PIPELINE_SUCCESS.value, + job_name="pipeline_name", + step_key=None, + node_handle=None, + step_kind_value=None, + logging_tags=None, + ), + make_new_run_id(), # diff run + ) ) assert _get_run_by_id(storage, run_id).status == DagsterRunStatus.STARTED - storage.handle_run_event( - run_id, # correct run - DagsterEvent( - message="a message", - event_type_value=DagsterEventType.PIPELINE_SUCCESS.value, - job_name="pipeline_name", - step_key=None, - node_handle=None, - step_kind_value=None, - logging_tags=None, - ), + instance.handle_new_event( + self._get_run_event_entry( + DagsterEvent( + message="a message", + event_type_value=DagsterEventType.PIPELINE_SUCCESS.value, + job_name="pipeline_name", + step_key=None, + node_handle=None, + step_kind_value=None, + logging_tags=None, + ), + run_id, # correct run + ) ) assert _get_run_by_id(storage, run_id).status == DagsterRunStatus.SUCCESS @@ -1548,7 +1579,7 @@ def test_debug_snapshot_import(self, storage): assert not storage.has_snapshot(ep_snapshot_id) assert storage.has_snapshot(new_ep_snapshot_id) - def test_run_record_stats(self, storage): + def test_run_record_stats(self, storage, instance): assert storage self._skip_in_memory(storage) @@ -1563,13 +1594,15 @@ def test_run_record_stats(self, storage): assert run_record.start_time is None assert run_record.end_time is None - storage.handle_run_event( - run_id, - DagsterEvent( - message="a message", - event_type_value=DagsterEventType.PIPELINE_START.value, - job_name="pipeline_name", - ), + instance.handle_new_event( + self._get_run_event_entry( + DagsterEvent( + message="a message", + event_type_value=DagsterEventType.PIPELINE_START.value, + job_name="pipeline_name", + ), + run_id, + ) ) run_record = storage.get_run_records(RunsFilter(run_ids=[run_id]))[0] @@ -1577,13 +1610,15 @@ def test_run_record_stats(self, storage): assert run_record.start_time is not None assert run_record.end_time is None - storage.handle_run_event( - run_id, - DagsterEvent( - message="a message", - event_type_value=DagsterEventType.PIPELINE_SUCCESS.value, - job_name="pipeline_name", - ), + instance.handle_new_event( + self._get_run_event_entry( + DagsterEvent( + message="a message", + event_type_value=DagsterEventType.PIPELINE_SUCCESS.value, + job_name="pipeline_name", + ), + run_id, + ) ) run_record = storage.get_run_records(RunsFilter(run_ids=[run_id]))[0] diff --git a/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py b/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py index 63d355c4d37b2..a51ae343ca764 100644 --- a/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py +++ b/python_modules/libraries/dagster-mysql/dagster_mysql_tests/test_run_storage.py @@ -14,11 +14,20 @@ class TestMySQLRunStorage(TestRunStorage): __test__ = True + @pytest.fixture(name="instance", scope="function") + def instance(self, conn_string): + MySQLRunStorage.create_clean_storage(conn_string) + + with instance_for_test( + overrides={"storage": {"mysql": {"mysql_url": conn_string}}} + ) as instance: + yield instance + @pytest.fixture(scope="function", name="storage") - def run_storage(self, conn_string): - storage = MySQLRunStorage.create_clean_storage(conn_string) - assert storage - return storage + def run_storage(self, instance): + run_storage = instance.run_storage + assert isinstance(run_storage, MySQLRunStorage) + return run_storage def test_load_from_config(self, conn_string): parse_result = urlparse(conn_string) diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_run_storage.py b/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_run_storage.py index 153a17faeab64..d5bb961c21cf7 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_run_storage.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres_tests/test_run_storage.py @@ -10,11 +10,20 @@ class TestPostgresRunStorage(TestRunStorage): __test__ = True + @pytest.fixture(name="instance", scope="function") + def instance(self, conn_string): + PostgresRunStorage.create_clean_storage(conn_string) + + with instance_for_test( + overrides={"storage": {"postgres": {"postgres_url": conn_string}}} + ) as instance: + yield instance + @pytest.fixture(scope="function", name="storage") - def run_storage(self, conn_string): - storage = PostgresRunStorage.create_clean_storage(conn_string) - assert storage - return storage + def run_storage(self, instance): + storage = instance.run_storage + assert isinstance(storage, PostgresRunStorage) + yield storage def test_load_from_config(self, hostname): url_cfg = f"""