Skip to content

Commit

Permalink
Enable global op concurrency on default sqlite storage (#26142)
Browse files Browse the repository at this point in the history
## Summary & Motivation
With the WAL enabled on SQLite, global op concurrency should be
supported on the default SQLite storage.

## How I Tested These Changes
BK

## Changelog
Enables global op concurrency on the default SQLite storage. Deployments
that have not been migrated since `1.6.0` may need to run `dagster
instance migrate` to enable.
  • Loading branch information
prha authored Nov 27, 2024
1 parent de5a419 commit 5c07c70
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from collections import defaultdict
from contextlib import contextmanager
from functools import cached_property
from typing import Any, Mapping, Optional

import sqlalchemy as db
Expand Down Expand Up @@ -144,9 +145,9 @@ def watch(self, run_id, cursor, callback):

self._watchers[run_id][callback] = cursor

@property
@cached_property
def supports_global_concurrency_limits(self) -> bool:
return False
return self.has_table("concurrency_limits")

def on_modified(self):
keys = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
from collections import defaultdict
from contextlib import contextmanager
from functools import cached_property
from typing import TYPE_CHECKING, Any, ContextManager, Iterator, Optional, Sequence, Union

import sqlalchemy as db
Expand Down Expand Up @@ -436,14 +437,6 @@ def fetch_run_status_changes(
def supports_event_consumer_queries(self) -> bool:
return False

def delete_events(self, run_id: str) -> None:
with self.run_connection(run_id) as conn:
self.delete_events_for_run(conn, run_id)

# delete the mirrored event in the cross-run index database
with self.index_connection() as conn:
self.delete_events_for_run(conn, run_id)

def wipe(self) -> None:
# should delete all the run-sharded db files and drop the contents of the index
for filename in (
Expand Down Expand Up @@ -508,9 +501,9 @@ def alembic_version(self) -> AlembicVersion:
def is_run_sharded(self) -> bool:
return True

@property
@cached_property
def supports_global_concurrency_limits(self) -> bool:
return False
return self.has_table("concurrency_limits")


class SqliteEventLogStorageWatchdog(PatternMatchingEventHandler):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@ def mock_instance_runner():
yield instance, runner


@pytest.fixture(name="unsupported_instance_runner")
def mock_unsupported_instance_runner():
with tempfile.TemporaryDirectory() as dagster_home_temp:
with instance_for_test(temp_dir=dagster_home_temp) as instance:
runner = CliRunner(env={"DAGSTER_HOME": dagster_home_temp})
yield instance, runner


def test_get_concurrency(instance_runner):
instance, runner = instance_runner
result = runner.invoke(get_concurrency)
Expand Down Expand Up @@ -61,14 +53,3 @@ def test_set_concurrency(instance_runner):
result = runner.invoke(set_concurrency, ["foo", "1"])
assert result.exit_code == 0
assert "Set concurrency limit for foo to 1" in result.output


def test_unsupported(unsupported_instance_runner):
_instance, runner = unsupported_instance_runner
result = runner.invoke(get_concurrency)
assert result.exit_code == 1
assert "does not support global concurrency limits" in result.output

result = runner.invoke(set_concurrency, ["foo", "1"])
assert result.exit_code == 1
assert "does not support global concurrency limits" in result.output

0 comments on commit 5c07c70

Please sign in to comment.