Skip to content

Commit

Permalink
Add a "handles_run_events_in_store_event" property to EventLogStorage…
Browse files Browse the repository at this point in the history
…, check it when handling new events (#24229)

Summary:
For storages that do the run updating in the same transaction as storing
the event, this prevents handle_run_event from being called twice - once
in store_event

Test Plan: BK, currently a no-op since it always returns False

## Summary & Motivation

## How I Tested These Changes

## Changelog [New | Bug | Docs]

> Replace this message with a changelog entry, or `NOCHANGELOG`
  • Loading branch information
gibsondan authored Sep 5, 2024
1 parent c2ccdeb commit 0a164d1
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 109 deletions.
6 changes: 5 additions & 1 deletion python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2433,7 +2433,11 @@ def handle_new_event(

for event in events:
run_id = event.run_id
if event.is_dagster_event and event.get_dagster_event().is_job_event:
if (
not self._event_storage.handles_run_events_in_store_event
and event.is_dagster_event
and event.get_dagster_event().is_job_event
):
self._run_storage.handle_run_event(run_id, event.get_dagster_event())

for sub in self._subscribers[run_id]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,3 +616,7 @@ def get_updated_data_version_partitions(
self, asset_key: AssetKey, partitions: Iterable[str], since_storage_id: int
) -> Set[str]:
raise NotImplementedError()

@property
def handles_run_events_in_store_event(self) -> bool:
return False
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import os
import tempfile
from contextlib import contextmanager

import pytest
from dagster import DagsterInstance
from dagster._core.storage.legacy_storage import LegacyRunStorage
from dagster._core.storage.runs import InMemoryRunStorage, SqliteRunStorage
from dagster._core.storage.sqlite_storage import DagsterSqliteStorage
from dagster._core.test_utils import instance_for_test

from dagster_tests.storage_tests.utils.run_storage import TestRunStorage

Expand Down Expand Up @@ -37,34 +40,57 @@ def create_legacy_run_storage():
storage.dispose()


class TestSqliteImplementation(TestRunStorage):
class TestSqliteRunStorage(TestRunStorage):
__test__ = True

@pytest.fixture(name="storage", params=[create_sqlite_run_storage])
def run_storage(self, request):
with request.param() as s:
yield s
@pytest.fixture(name="instance", scope="function")
def instance(self):
with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdir_path:
with instance_for_test(temp_dir=tmpdir_path) as instance:
yield instance

@pytest.fixture(name="storage", scope="function")
def run_storage(self, instance):
run_storage = instance.run_storage
assert isinstance(run_storage, SqliteRunStorage)
yield run_storage

class TestInMemoryImplementation(TestRunStorage):

class TestInMemoryRunStorage(TestRunStorage):
__test__ = True

@pytest.fixture(name="storage", params=[create_in_memory_storage])
def run_storage(self, request):
with request.param() as s:
yield s
@pytest.fixture(name="instance", scope="function")
def instance(self):
with DagsterInstance.ephemeral() as the_instance:
yield the_instance

@pytest.fixture(name="storage")
def run_storage(self, instance):
yield instance.run_storage

def test_storage_telemetry(self, storage):
pass


class TestLegacyStorage(TestRunStorage):
class TestLegacyRunStorage(TestRunStorage):
__test__ = True

@pytest.fixture(name="storage", params=[create_legacy_run_storage])
def run_storage(self, request):
with request.param() as s:
yield s
@pytest.fixture(name="instance", scope="function")
def instance(self):
with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdir_path:
with instance_for_test(temp_dir=tmpdir_path) as instance:
yield instance

@pytest.fixture(name="storage", scope="function")
def run_storage(self, instance):
storage = instance.get_ref().storage
assert isinstance(storage, DagsterSqliteStorage)
legacy_storage = LegacyRunStorage(storage)
legacy_storage.register_instance(instance)
try:
yield legacy_storage
finally:
legacy_storage.dispose()

def test_storage_telemetry(self, storage):
pass
Loading

0 comments on commit 0a164d1

Please sign in to comment.