diff --git a/python_modules/dagster/dagster/_core/storage/event_log/sqlite/consolidated_sqlite_event_log.py b/python_modules/dagster/dagster/_core/storage/event_log/sqlite/consolidated_sqlite_event_log.py index a7f33aca4e2b8..24fe196036003 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/sqlite/consolidated_sqlite_event_log.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/sqlite/consolidated_sqlite_event_log.py @@ -2,6 +2,7 @@ import os from collections import defaultdict from contextlib import contextmanager +from functools import cached_property from typing import Any, Mapping, Optional import sqlalchemy as db @@ -144,9 +145,9 @@ def watch(self, run_id, cursor, callback): self._watchers[run_id][callback] = cursor - @property + @cached_property def supports_global_concurrency_limits(self) -> bool: - return False + return self.has_table("concurrency_limits") def on_modified(self): keys = [ diff --git a/python_modules/dagster/dagster/_core/storage/event_log/sqlite/sqlite_event_log.py b/python_modules/dagster/dagster/_core/storage/event_log/sqlite/sqlite_event_log.py index e22eb71db3e0f..14559adc9b396 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/sqlite/sqlite_event_log.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/sqlite/sqlite_event_log.py @@ -8,6 +8,7 @@ import time from collections import defaultdict from contextlib import contextmanager +from functools import cached_property from typing import TYPE_CHECKING, Any, ContextManager, Iterator, Optional, Sequence, Union import sqlalchemy as db @@ -436,14 +437,6 @@ def fetch_run_status_changes( def supports_event_consumer_queries(self) -> bool: return False - def delete_events(self, run_id: str) -> None: - with self.run_connection(run_id) as conn: - self.delete_events_for_run(conn, run_id) - - # delete the mirrored event in the cross-run index database - with self.index_connection() as conn: - self.delete_events_for_run(conn, run_id) - def wipe(self) -> None: # should delete all the run-sharded db files and drop the contents of the index for filename in ( @@ -508,9 +501,9 @@ def alembic_version(self) -> AlembicVersion: def is_run_sharded(self) -> bool: return True - @property + @cached_property def supports_global_concurrency_limits(self) -> bool: - return False + return self.has_table("concurrency_limits") class SqliteEventLogStorageWatchdog(PatternMatchingEventHandler): diff --git a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_concurrency_command.py b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_concurrency_command.py index 06a03b4236a76..47e45733afaa0 100644 --- a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_concurrency_command.py +++ b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_concurrency_command.py @@ -23,14 +23,6 @@ def mock_instance_runner(): yield instance, runner -@pytest.fixture(name="unsupported_instance_runner") -def mock_unsupported_instance_runner(): - with tempfile.TemporaryDirectory() as dagster_home_temp: - with instance_for_test(temp_dir=dagster_home_temp) as instance: - runner = CliRunner(env={"DAGSTER_HOME": dagster_home_temp}) - yield instance, runner - - def test_get_concurrency(instance_runner): instance, runner = instance_runner result = runner.invoke(get_concurrency) @@ -61,14 +53,3 @@ def test_set_concurrency(instance_runner): result = runner.invoke(set_concurrency, ["foo", "1"]) assert result.exit_code == 0 assert "Set concurrency limit for foo to 1" in result.output - - -def test_unsupported(unsupported_instance_runner): - _instance, runner = unsupported_instance_runner - result = runner.invoke(get_concurrency) - assert result.exit_code == 1 - assert "does not support global concurrency limits" in result.output - - result = runner.invoke(set_concurrency, ["foo", "1"]) - assert result.exit_code == 1 - assert "does not support global concurrency limits" in result.output