From 29a91fc41c1c8345dc995956e72ae1f0f5758bfe Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Wed, 18 Dec 2024 14:15:26 -0700 Subject: [PATCH] Add a thread pool to process backfills in parallel (#26189) ## Summary & Motivation Some users are running into situations where they kick off a backfill that takes hours to process and it holds up every other backfill until it finishes. Adding a thread pool can alleviate this in most situations, where there is one of a handful of long-running backfills; in the unlikely case where all of the backfills being executed in parallel take hours, this still won't help. ## How I Tested These Changes Existing tests, plus changing e2e tests to use thread pool for backfills. ## Changelog Added the option to use a thread pool to process backfills in parallel. --- .../dagster/_core/instance/__init__.py | 3 + .../dagster/dagster/_core/instance/config.py | 15 + .../dagster/dagster/_core/instance/ref.py | 1 + .../dagster/dagster/_daemon/asset_daemon.py | 22 +- .../dagster/dagster/_daemon/backfill.py | 118 +++++++- .../dagster/dagster/_daemon/controller.py | 2 +- .../dagster/dagster/_daemon/daemon.py | 31 +- .../dagster/dagster/_daemon/sensor.py | 37 +-- .../dagster/dagster/_scheduler/scheduler.py | 5 +- .../dagster/dagster/_utils/__init__.py | 10 + .../daemon_sensor_tests/test_sensor_run.py | 6 - .../daemon_tests/test_backfill.py | 268 +++++++++++++++++- .../daemon_tests/test_dagster_daemon.py | 7 + .../daemon_tests/test_e2e.py | 5 + .../dagster_tests/scheduler_tests/conftest.py | 15 +- 15 files changed, 457 insertions(+), 88 deletions(-) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 25d251bc30679..f781a49e602e2 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -831,6 +831,9 @@ def get_settings(self, settings_key: str) -> Any: return self._settings.get(settings_key) return {} + def get_backfill_settings(self) -> Mapping[str, Any]: + return self.get_settings("backfills") + def get_scheduler_settings(self) -> Mapping[str, Any]: return self.get_settings("schedules") diff --git a/python_modules/dagster/dagster/_core/instance/config.py b/python_modules/dagster/dagster/_core/instance/config.py index 24dcd46c01ad1..2da173d909254 100644 --- a/python_modules/dagster/dagster/_core/instance/config.py +++ b/python_modules/dagster/dagster/_core/instance/config.py @@ -267,6 +267,20 @@ def get_tick_retention_settings( return default_retention_settings +def backfills_daemon_config() -> Field: + return Field( + { + "use_threads": Field(Bool, is_required=False, default_value=False), + "num_workers": Field( + int, + is_required=False, + description="How many threads to use to process multiple backfills in parallel", + ), + }, + is_required=False, + ) + + def sensors_daemon_config() -> Field: return Field( { @@ -389,6 +403,7 @@ def dagster_instance_config_schema() -> Mapping[str, Field]: ), "secrets": secrets_loader_config_schema(), "retention": retention_config_schema(), + "backfills": backfills_daemon_config(), "sensors": sensors_daemon_config(), "schedules": schedules_daemon_config(), "auto_materialize": Field( diff --git a/python_modules/dagster/dagster/_core/instance/ref.py b/python_modules/dagster/dagster/_core/instance/ref.py index 6a160b8d217a4..5c29d1d85b5de 100644 --- a/python_modules/dagster/dagster/_core/instance/ref.py +++ b/python_modules/dagster/dagster/_core/instance/ref.py @@ -451,6 +451,7 @@ def from_dir( "run_retries", "code_servers", "retention", + "backfills", "sensors", "schedules", "nux", diff --git a/python_modules/dagster/dagster/_daemon/asset_daemon.py b/python_modules/dagster/dagster/_daemon/asset_daemon.py index 85bed317ee1bd..e4ac46140256c 100644 --- a/python_modules/dagster/dagster/_daemon/asset_daemon.py +++ b/python_modules/dagster/dagster/_daemon/asset_daemon.py @@ -72,7 +72,7 @@ from dagster._serdes import serialize_value from dagster._serdes.serdes import deserialize_value from dagster._time import get_current_datetime, get_current_timestamp -from dagster._utils import SingleInstigatorDebugCrashFlags, check_for_debug_crash +from dagster._utils import SingleInstigatorDebugCrashFlags, check_for_debug_crash, return_as_list _LEGACY_PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY = "ASSET_DAEMON_CURSOR" _PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY = "ASSET_DAEMON_CURSOR_NEW" @@ -655,24 +655,6 @@ def _copy_default_auto_materialize_sensor_states( return result - def _process_auto_materialize_tick( - self, - workspace_process_context: IWorkspaceProcessContext, - repository: Optional[RemoteRepository], - sensor: Optional[RemoteSensor], - debug_crash_flags: SingleInstigatorDebugCrashFlags, - submit_threadpool_executor: Optional[ThreadPoolExecutor], - ): - return list( - self._process_auto_materialize_tick_generator( - workspace_process_context, - repository, - sensor, - debug_crash_flags, - submit_threadpool_executor=submit_threadpool_executor, - ) - ) - def _process_auto_materialize_tick_generator( self, workspace_process_context: IWorkspaceProcessContext, @@ -878,6 +860,8 @@ def _process_auto_materialize_tick_generator( yield error_info + _process_auto_materialize_tick = return_as_list(_process_auto_materialize_tick_generator) + def _evaluate_auto_materialize_tick( self, tick_context: AutoMaterializeLaunchContext, diff --git a/python_modules/dagster/dagster/_daemon/backfill.py b/python_modules/dagster/dagster/_daemon/backfill.py index 1fcc0c24c4199..23b5b8ccfdc68 100644 --- a/python_modules/dagster/dagster/_daemon/backfill.py +++ b/python_modules/dagster/dagster/_daemon/backfill.py @@ -1,8 +1,10 @@ import logging import os import sys +import threading +from concurrent.futures import Future, ThreadPoolExecutor from contextlib import contextmanager -from typing import Iterable, Mapping, Optional, Sequence, cast +from typing import TYPE_CHECKING, Dict, Iterable, Mapping, Optional, Sequence, cast import dagster._check as check from dagster._core.definitions.instigation_logger import InstigationLogger @@ -16,9 +18,13 @@ from dagster._core.execution.job_backfill import execute_job_backfill_iteration from dagster._core.workspace.context import IWorkspaceProcessContext from dagster._daemon.utils import DaemonErrorCapture -from dagster._time import get_current_datetime +from dagster._time import get_current_datetime, get_current_timestamp +from dagster._utils import return_as_list from dagster._utils.error import SerializableErrorInfo +if TYPE_CHECKING: + from dagster._daemon.daemon import DaemonIterator + @contextmanager def _get_instigation_logger_if_log_storage_enabled( @@ -46,9 +52,55 @@ def _get_max_asset_backfill_retries(): return int(os.getenv("DAGSTER_MAX_ASSET_BACKFILL_RETRIES", "5")) +def execute_backfill_iteration_loop( + workspace_process_context: IWorkspaceProcessContext, + logger: logging.Logger, + shutdown_event: threading.Event, + until: Optional[float] = None, + threadpool_executor: Optional[ThreadPoolExecutor] = None, +) -> "DaemonIterator": + from dagster._daemon.controller import DEFAULT_DAEMON_INTERVAL_SECONDS + from dagster._daemon.daemon import SpanMarker + + backfill_futures: Dict[str, Future] = {} + while True: + start_time = get_current_timestamp() + if until and start_time >= until: + # provide a way of organically ending the loop to support test environment + break + + yield SpanMarker.START_SPAN + + try: + yield from execute_backfill_iteration( + workspace_process_context, + logger, + threadpool_executor=threadpool_executor, + backfill_futures=backfill_futures, + ) + except Exception: + error_info = DaemonErrorCapture.on_exception( + exc_info=sys.exc_info(), + logger=logger, + log_message="BackfillDaemon caught an error", + ) + yield error_info + + yield SpanMarker.END_SPAN + + end_time = get_current_timestamp() + loop_duration = end_time - start_time + sleep_time = max(0, DEFAULT_DAEMON_INTERVAL_SECONDS - loop_duration) + shutdown_event.wait(sleep_time) + + yield None + + def execute_backfill_iteration( workspace_process_context: IWorkspaceProcessContext, logger: logging.Logger, + threadpool_executor: Optional[ThreadPoolExecutor] = None, + backfill_futures: Optional[Dict[str, Future]] = None, debug_crash_flags: Optional[Mapping[str, int]] = None, ) -> Iterable[Optional[SerializableErrorInfo]]: instance = workspace_process_context.instance @@ -68,7 +120,12 @@ def execute_backfill_iteration( backfill_jobs = [*in_progress_backfills, *canceling_backfills] yield from execute_backfill_jobs( - workspace_process_context, logger, backfill_jobs, debug_crash_flags + workspace_process_context, + logger, + backfill_jobs, + threadpool_executor, + backfill_futures, + debug_crash_flags, ) @@ -86,6 +143,8 @@ def execute_backfill_jobs( workspace_process_context: IWorkspaceProcessContext, logger: logging.Logger, backfill_jobs: Sequence[PartitionBackfill], + threadpool_executor: Optional[ThreadPoolExecutor] = None, + backfill_futures: Optional[Dict[str, Future]] = None, debug_crash_flags: Optional[Mapping[str, int]] = None, ) -> Iterable[Optional[SerializableErrorInfo]]: instance = workspace_process_context.instance @@ -103,18 +162,49 @@ def execute_backfill_jobs( ) try: - if backfill.is_asset_backfill: - yield from execute_asset_backfill_iteration( - backfill, backfill_logger, workspace_process_context, instance - ) + if threadpool_executor: + if backfill_futures is None: + check.failed( + "backfill_futures dict must be passed with threadpool_executor" + ) + + # only allow one backfill per backfill job to be in flight + if backfill_id in backfill_futures and not backfill_futures[backfill_id].done(): + continue + + if backfill.is_asset_backfill: + future = threadpool_executor.submit( + return_as_list(execute_asset_backfill_iteration), + backfill, + backfill_logger, + workspace_process_context, + instance, + ) + else: + future = threadpool_executor.submit( + return_as_list(execute_job_backfill_iteration), + backfill, + backfill_logger, + workspace_process_context, + debug_crash_flags, + instance, + ) + backfill_futures[backfill_id] = future + yield + else: - yield from execute_job_backfill_iteration( - backfill, - backfill_logger, - workspace_process_context, - debug_crash_flags, - instance, - ) + if backfill.is_asset_backfill: + yield from execute_asset_backfill_iteration( + backfill, backfill_logger, workspace_process_context, instance + ) + else: + yield from execute_job_backfill_iteration( + backfill, + backfill_logger, + workspace_process_context, + debug_crash_flags, + instance, + ) except Exception as e: backfill = check.not_none(instance.get_backfill(backfill.backfill_id)) if ( diff --git a/python_modules/dagster/dagster/_daemon/controller.py b/python_modules/dagster/dagster/_daemon/controller.py index e0016460d5921..0371c07128180 100644 --- a/python_modules/dagster/dagster/_daemon/controller.py +++ b/python_modules/dagster/dagster/_daemon/controller.py @@ -362,7 +362,7 @@ def create_daemon_of_type(daemon_type: str, instance: DagsterInstance) -> Dagste interval_seconds=instance.run_coordinator.dequeue_interval_seconds # type: ignore # (??) ) elif daemon_type == BackfillDaemon.daemon_type(): - return BackfillDaemon(interval_seconds=DEFAULT_DAEMON_INTERVAL_SECONDS) + return BackfillDaemon(settings=instance.get_backfill_settings()) elif daemon_type == MonitoringDaemon.daemon_type(): return MonitoringDaemon(interval_seconds=instance.run_monitoring_poll_interval_seconds) elif daemon_type == EventLogConsumerDaemon.daemon_type(): diff --git a/python_modules/dagster/dagster/_daemon/daemon.py b/python_modules/dagster/dagster/_daemon/daemon.py index 6a461618c0a69..9ae368c903cf2 100644 --- a/python_modules/dagster/dagster/_daemon/daemon.py +++ b/python_modules/dagster/dagster/_daemon/daemon.py @@ -22,7 +22,7 @@ from dagster._core.telemetry import DAEMON_ALIVE, log_action from dagster._core.utils import InheritContextThreadPoolExecutor from dagster._core.workspace.context import IWorkspaceProcessContext -from dagster._daemon.backfill import execute_backfill_iteration +from dagster._daemon.backfill import execute_backfill_iteration_loop from dagster._daemon.monitoring import ( execute_concurrency_slots_iteration, execute_run_monitoring_iteration, @@ -333,16 +333,39 @@ def core_loop( ) -class BackfillDaemon(IntervalDaemon): +class BackfillDaemon(DagsterDaemon): + def __init__(self, settings: Mapping[str, Any]) -> None: + super().__init__() + self._exit_stack = ExitStack() + self._threadpool_executor: Optional[InheritContextThreadPoolExecutor] = None + + if settings.get("use_threads"): + self._threadpool_executor = self._exit_stack.enter_context( + InheritContextThreadPoolExecutor( + max_workers=settings.get("num_workers"), + thread_name_prefix="backfill_daemon_worker", + ) + ) + @classmethod def daemon_type(cls) -> str: return "BACKFILL" - def run_iteration( + def __exit__(self, _exception_type, _exception_value, _traceback): + self._exit_stack.close() + super().__exit__(_exception_type, _exception_value, _traceback) + + def core_loop( self, workspace_process_context: IWorkspaceProcessContext, + shutdown_event: Event, ) -> DaemonIterator: - yield from execute_backfill_iteration(workspace_process_context, self._logger) + yield from execute_backfill_iteration_loop( + workspace_process_context, + self._logger, + shutdown_event, + threadpool_executor=self._threadpool_executor, + ) class MonitoringDaemon(IntervalDaemon): diff --git a/python_modules/dagster/dagster/_daemon/sensor.py b/python_modules/dagster/dagster/_daemon/sensor.py index f9daf37f65a3b..1611f911cca73 100644 --- a/python_modules/dagster/dagster/_daemon/sensor.py +++ b/python_modules/dagster/dagster/_daemon/sensor.py @@ -66,7 +66,12 @@ from dagster._daemon.utils import DaemonErrorCapture from dagster._scheduler.stale import resolve_stale_or_missing_assets from dagster._time import get_current_datetime, get_current_timestamp -from dagster._utils import DebugCrashFlags, SingleInstigatorDebugCrashFlags, check_for_debug_crash +from dagster._utils import ( + DebugCrashFlags, + SingleInstigatorDebugCrashFlags, + check_for_debug_crash, + return_as_list, +) from dagster._utils.error import SerializableErrorInfo from dagster._utils.merger import merge_dicts @@ -344,7 +349,6 @@ def execute_sensor_iteration_loop( yield SpanMarker.END_SPAN end_time = get_current_timestamp() - loop_duration = end_time - start_time sleep_time = max(0, MIN_INTERVAL_LOOP_TIME - loop_duration) shutdown_event.wait(sleep_time) @@ -453,30 +457,6 @@ def execute_sensor_iteration( ) -def _process_tick( - workspace_process_context: IWorkspaceProcessContext, - logger: logging.Logger, - remote_sensor: RemoteSensor, - sensor_state: InstigatorState, - sensor_debug_crash_flags: Optional[SingleInstigatorDebugCrashFlags], - tick_retention_settings, - submit_threadpool_executor: Optional[ThreadPoolExecutor], -): - # evaluate the tick immediately, but from within a thread. The main thread should be able to - # heartbeat to keep the daemon alive - return list( - _process_tick_generator( - workspace_process_context, - logger, - remote_sensor, - sensor_state, - sensor_debug_crash_flags, - tick_retention_settings, - submit_threadpool_executor, - ) - ) - - def _get_evaluation_tick( instance: DagsterInstance, sensor: RemoteSensor, @@ -630,6 +610,11 @@ def _process_tick_generator( yield error_info +# evaluate the tick immediately, but from within a thread. The main thread should be able to +# heartbeat to keep the daemon alive +_process_tick = return_as_list(_process_tick_generator) + + def _sensor_instigator_data(state: InstigatorState) -> Optional[SensorInstigatorData]: instigator_data = state.instigator_data if instigator_data is None or isinstance(instigator_data, SensorInstigatorData): diff --git a/python_modules/dagster/dagster/_scheduler/scheduler.py b/python_modules/dagster/dagster/_scheduler/scheduler.py index 4c83de5acc5e0..22e3882bb3b0d 100644 --- a/python_modules/dagster/dagster/_scheduler/scheduler.py +++ b/python_modules/dagster/dagster/_scheduler/scheduler.py @@ -197,8 +197,8 @@ def execute_scheduler_iteration_loop( scheduler_run_futures: Dict[str, Future] = {} iteration_times: Dict[str, ScheduleIterationTimes] = {} - submit_threadpool_executor = None threadpool_executor = None + submit_threadpool_executor = None with ExitStack() as stack: settings = workspace_process_context.instance.get_scheduler_settings() @@ -225,6 +225,7 @@ def execute_scheduler_iteration_loop( next_interval_time = _get_next_scheduler_iteration_time(start_time) yield SpanMarker.START_SPAN + try: yield from launch_scheduled_runs( workspace_process_context, @@ -248,8 +249,8 @@ def execute_scheduler_iteration_loop( next_interval_time = min(start_time + ERROR_INTERVAL_TIME, next_interval_time) yield SpanMarker.END_SPAN - end_time = get_current_timestamp() + end_time = get_current_timestamp() if next_interval_time > end_time: # Sleep until the beginning of the next minute, plus a small epsilon to # be sure that we're past the start of the minute diff --git a/python_modules/dagster/dagster/_utils/__init__.py b/python_modules/dagster/dagster/_utils/__init__.py index fce1525d93856..39808e9453cfa 100644 --- a/python_modules/dagster/dagster/_utils/__init__.py +++ b/python_modules/dagster/dagster/_utils/__init__.py @@ -844,3 +844,13 @@ def run_with_concurrent_update_guard( return update_fn(**kwargs) return + + +def return_as_list(func: Callable[..., Iterable[T]]) -> Callable[..., List[T]]: + """A decorator that returns a list from the output of a function.""" + + @functools.wraps(func) + def inner(*args, **kwargs): + return list(func(*args, **kwargs)) + + return inner diff --git a/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_sensor_run.py b/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_sensor_run.py index 6db5285f05648..192d26c9bb9a1 100644 --- a/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_sensor_run.py +++ b/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_sensor_run.py @@ -2908,12 +2908,6 @@ def test_repository_namespacing(executor): assert len(ticks) == 2 -def test_settings(): - settings = {"use_threads": True, "num_workers": 4} - with instance_for_test(overrides={"sensors": settings}) as thread_inst: - assert thread_inst.get_settings("sensors") == settings - - @pytest.mark.parametrize("sensor_name", ["logging_sensor", "multi_asset_logging_sensor"]) def test_sensor_logging(executor, instance, workspace_context, remote_repo, sensor_name) -> None: sensor = remote_repo.get_sensor(sensor_name) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 740e8bb9ae0a3..7b88aeecb6dd2 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -4,6 +4,9 @@ import string import sys import time +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import cast from unittest import mock import dagster._check as check @@ -58,11 +61,11 @@ from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill from dagster._core.execution.plan.resume_retry import ReexecutionStrategy from dagster._core.remote_representation import ( + CodeLocation, InProcessCodeLocationOrigin, RemoteRepository, RemoteRepositoryOrigin, ) -from dagster._core.remote_representation.code_location import CodeLocation from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.dagster_run import ( IN_PROGRESS_RUN_STATUSES, @@ -81,13 +84,17 @@ ) from dagster._core.test_utils import ( create_run_for_test, + create_test_daemon_workspace_context, environ, + instance_for_test, step_did_not_run, step_failed, step_succeeded, + wait_for_futures, ) from dagster._core.types.loadable_target_origin import LoadableTargetOrigin from dagster._core.workspace.context import WorkspaceProcessContext +from dagster._core.workspace.load_target import ModuleTarget from dagster._daemon import get_default_daemon_logger from dagster._daemon.auto_run_reexecution.auto_run_reexecution import ( consume_new_runs_for_automatic_reexecution, @@ -580,10 +587,12 @@ def wait_for_all_runs_to_finish(instance, timeout=10): break +@pytest.mark.parametrize("parallel", [True, False]) def test_simple_backfill( instance: DagsterInstance, workspace_context: WorkspaceProcessContext, remote_repo: RemoteRepository, + parallel: bool, ): partition_set = remote_repo.get_partition_set("the_job_partition_set") instance.add_backfill( @@ -600,7 +609,24 @@ def test_simple_backfill( ) assert instance.get_runs_count() == 0 - list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + if parallel: + backfill_daemon_futures = {} + list( + execute_backfill_iteration( + workspace_context, + get_default_daemon_logger("BackfillDaemon"), + threadpool_executor=ThreadPoolExecutor(2), + backfill_futures=backfill_daemon_futures, + ) + ) + + wait_for_futures(backfill_daemon_futures) + else: + list( + execute_backfill_iteration( + workspace_context, get_default_daemon_logger("BackfillDaemon") + ) + ) assert instance.get_runs_count() == 3 runs = instance.get_runs() @@ -613,6 +639,105 @@ def test_simple_backfill( assert three.tags[PARTITION_NAME_TAG] == "three" +@pytest.mark.parametrize("parallel", [True, False]) +def test_two_backfills_at_the_same_time( + tmp_path: Path, + parallel: bool, +): + # In order to avoid deadlock, we need to ensure that the instance we + # are using will launch runs in separate subprocesses rather than in + # the same in-memory process. This is akin to the context created in + # https://github.com/dagster-io/dagster/blob/a116c44/python_modules/dagster/dagster_tests/scheduler_tests/conftest.py#L53-L71 + with instance_for_test( + overrides={ + "event_log_storage": { + "module": "dagster._core.storage.event_log", + "class": "ConsolidatedSqliteEventLogStorage", + "config": {"base_dir": str(tmp_path)}, + }, + "run_retries": {"enabled": True}, + } + ) as instance: + with create_test_daemon_workspace_context( + workspace_load_target=ModuleTarget( + module_name="dagster_tests.daemon_tests.test_backfill", + attribute="the_repo", + working_directory=os.path.join(os.path.dirname(__file__), "..", ".."), + location_name="test_location", + ), + instance=instance, + ) as workspace_context: + remote_repo = cast( + CodeLocation, + next( + iter( + workspace_context.create_request_context() + .get_code_location_entries() + .values() + ) + ).code_location, + ).get_repository("the_repo") + + first_partition_set = remote_repo.get_partition_set("the_job_partition_set") + second_partition_keys = my_config.partitions_def.get_partition_keys() + second_partition_set = remote_repo.get_partition_set( + "comp_always_succeed_partition_set" + ) + instance.add_backfill( + PartitionBackfill( + backfill_id="simple", + partition_set_origin=first_partition_set.get_remote_origin(), + status=BulkActionStatus.REQUESTED, + partition_names=["one", "two", "three"], + from_failure=False, + reexecution_steps=None, + tags=None, + backfill_timestamp=get_current_timestamp(), + ) + ) + instance.add_backfill( + PartitionBackfill( + backfill_id="partition_schedule_from_job", + partition_set_origin=second_partition_set.get_remote_origin(), + status=BulkActionStatus.REQUESTED, + partition_names=second_partition_keys[:3], + from_failure=False, + reexecution_steps=None, + tags=None, + backfill_timestamp=get_current_timestamp(), + ) + ) + assert instance.get_runs_count() == 0 + + if parallel: + threadpool_executor = ThreadPoolExecutor(4) + backfill_daemon_futures = {} + list( + execute_backfill_iteration( + workspace_context, + get_default_daemon_logger("BackfillDaemon"), + threadpool_executor=threadpool_executor, + backfill_futures=backfill_daemon_futures, + ) + ) + + wait_for_futures(backfill_daemon_futures) + else: + list( + execute_backfill_iteration( + workspace_context, get_default_daemon_logger("BackfillDaemon") + ) + ) + + assert instance.get_runs_count() == 6 + + runs = list(instance.get_runs()) + backfill_ids = sorted(run.tags[BACKFILL_ID_TAG] for run in runs) + partition_names = {run.tags[PARTITION_NAME_TAG] for run in runs} + assert backfill_ids == ["partition_schedule_from_job"] * 3 + ["simple"] * 3 + assert partition_names == {"one", "two", "three", *second_partition_keys[:3]} + + def test_canceled_backfill( instance: DagsterInstance, workspace_context: WorkspaceProcessContext, @@ -648,10 +773,12 @@ def test_canceled_backfill( assert instance.get_runs_count() == 1 +@pytest.mark.parametrize("parallel", [True, False]) def test_failure_backfill( instance: DagsterInstance, workspace_context: WorkspaceProcessContext, remote_repo: RemoteRepository, + parallel: bool, ): output_file = _failure_flag_file() partition_set = remote_repo.get_partition_set("conditional_failure_job_partition_set") @@ -671,11 +798,25 @@ def test_failure_backfill( try: touch_file(output_file) - list( - execute_backfill_iteration( - workspace_context, get_default_daemon_logger("BackfillDaemon") + if parallel: + backfill_daemon_futures = {} + list( + execute_backfill_iteration( + workspace_context, + get_default_daemon_logger("BackfillDaemon"), + threadpool_executor=ThreadPoolExecutor(2), + backfill_futures=backfill_daemon_futures, + ) ) - ) + + wait_for_futures(backfill_daemon_futures) + else: + list( + execute_backfill_iteration( + workspace_context, get_default_daemon_logger("BackfillDaemon") + ) + ) + wait_for_all_runs_to_start(instance) finally: os.remove(output_file) @@ -718,7 +859,25 @@ def test_failure_backfill( ) assert not os.path.isfile(_failure_flag_file()) - list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + if parallel: + backfill_daemon_futures = {} + list( + execute_backfill_iteration( + workspace_context, + get_default_daemon_logger("BackfillDaemon"), + threadpool_executor=ThreadPoolExecutor(2), + backfill_futures=backfill_daemon_futures, + ) + ) + + wait_for_futures(backfill_daemon_futures) + else: + list( + execute_backfill_iteration( + workspace_context, get_default_daemon_logger("BackfillDaemon") + ) + ) + wait_for_all_runs_to_start(instance) assert instance.get_runs_count() == 6 @@ -925,6 +1084,61 @@ def test_large_backfill( assert instance.get_runs_count() == 3 +def test_backfill_is_processed_only_once( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + remote_repo: RemoteRepository, +): + backfill_id = "simple" + partition_set = remote_repo.get_partition_set("config_job_partition_set") + instance.add_backfill( + PartitionBackfill( + backfill_id=backfill_id, + partition_set_origin=partition_set.get_remote_origin(), + status=BulkActionStatus.REQUESTED, + partition_names=["one", "two", "three"], + from_failure=False, + reexecution_steps=None, + tags=None, + backfill_timestamp=get_current_timestamp(), + ) + ) + assert instance.get_runs_count() == 0 + + threadpool_executor = ThreadPoolExecutor(2) + backfill_daemon_futures = {} + list( + execute_backfill_iteration( + workspace_context, + get_default_daemon_logger("BackfillDaemon"), + threadpool_executor=threadpool_executor, + backfill_futures=backfill_daemon_futures, + ) + ) + + assert instance.get_runs_count() == 0 + future = backfill_daemon_futures[backfill_id] + + with mock.patch.object( + threadpool_executor, "submit", side_effect=AssertionError("Should not be called") + ): + list( + execute_backfill_iteration( + workspace_context, + get_default_daemon_logger("BackfillDaemon"), + threadpool_executor=threadpool_executor, + backfill_futures=backfill_daemon_futures, + ) + ) + + assert instance.get_runs_count() == 0 + assert backfill_daemon_futures[backfill_id] is future + + wait_for_futures(backfill_daemon_futures) + + assert instance.get_runs_count() == 3 + + def test_unloadable_backfill(instance, workspace_context): unloadable_origin = _unloadable_partition_set_origin() instance.add_backfill( @@ -2828,10 +3042,12 @@ def test_asset_backfill_logs( assert record_dict.get("msg") +@pytest.mark.parametrize("parallel", [True, False]) def test_asset_backfill_from_asset_graph_subset( instance: DagsterInstance, workspace_context: WorkspaceProcessContext, remote_repo: RemoteRepository, + parallel: bool, ): del remote_repo @@ -2860,7 +3076,24 @@ def test_asset_backfill_from_asset_graph_subset( assert backfill assert backfill.status == BulkActionStatus.REQUESTED - list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + if parallel: + backfill_daemon_futures = {} + list( + execute_backfill_iteration( + workspace_context, + get_default_daemon_logger("BackfillDaemon"), + threadpool_executor=ThreadPoolExecutor(2), + backfill_futures=backfill_daemon_futures, + ) + ) + + wait_for_futures(backfill_daemon_futures) + else: + list( + execute_backfill_iteration( + workspace_context, get_default_daemon_logger("BackfillDaemon") + ) + ) assert instance.get_runs_count() == 3 wait_for_all_runs_to_start(instance, timeout=30) assert instance.get_runs_count() == 3 @@ -2875,7 +3108,24 @@ def test_asset_backfill_from_asset_graph_subset( assert step_succeeded(instance, run, "reusable") assert step_succeeded(instance, run, "bar") - list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + if parallel: + backfill_daemon_futures = {} + list( + execute_backfill_iteration( + workspace_context, + get_default_daemon_logger("BackfillDaemon"), + threadpool_executor=ThreadPoolExecutor(2), + backfill_futures=backfill_daemon_futures, + ) + ) + + wait_for_futures(backfill_daemon_futures) + else: + list( + execute_backfill_iteration( + workspace_context, get_default_daemon_logger("BackfillDaemon") + ) + ) backfill = instance.get_backfill("backfill_from_asset_graph_subset") assert backfill assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_dagster_daemon.py b/python_modules/dagster/dagster_tests/daemon_tests/test_dagster_daemon.py index 4c656c6077505..1b90b97fea1bc 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_dagster_daemon.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_dagster_daemon.py @@ -11,6 +11,13 @@ from dagster._utils.log import get_structlog_json_formatter +@pytest.mark.parametrize("daemon", ["backfills", "schedules", "sensors"]) +def test_settings(daemon): + settings = {"use_threads": True, "num_workers": 4} + with instance_for_test(overrides={daemon: settings}) as thread_inst: + assert thread_inst.get_settings(daemon) == settings + + def test_scheduler_instance(): with instance_for_test( overrides={ diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py index 4a664fda6e6c4..402c0133cfcd4 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py @@ -176,15 +176,20 @@ def _execute_ticks( ) ) + backfill_daemon_futures = {} list( execute_backfill_iteration( context, get_default_daemon_logger("BackfillDaemon"), + threadpool_executor=threadpool_executor, + backfill_futures=backfill_daemon_futures, + debug_crash_flags=debug_crash_flags or {}, ) ) wait_for_futures(asset_daemon_futures) wait_for_futures(sensor_daemon_futures) + wait_for_futures(backfill_daemon_futures) def _get_current_state(context: WorkspaceRequestContext) -> Mapping[str, InstigatorState]: diff --git a/python_modules/dagster/dagster_tests/scheduler_tests/conftest.py b/python_modules/dagster/dagster_tests/scheduler_tests/conftest.py index 80c87a0a281ea..0679622ec42f8 100644 --- a/python_modules/dagster/dagster_tests/scheduler_tests/conftest.py +++ b/python_modules/dagster/dagster_tests/scheduler_tests/conftest.py @@ -1,10 +1,10 @@ import os import sys -from typing import Iterator, Optional +from typing import Iterator, Optional, cast import pytest from dagster._core.instance import DagsterInstance -from dagster._core.remote_representation.external import RemoteRepository +from dagster._core.remote_representation import CodeLocation, RemoteRepository from dagster._core.test_utils import ( SingleThreadPoolExecutor, create_test_daemon_workspace_context, @@ -73,11 +73,12 @@ def workspace_fixture( @pytest.fixture(name="remote_repo", scope="session") def remote_repo_fixture(workspace_context: WorkspaceProcessContext) -> RemoteRepository: - return next( - iter(workspace_context.create_request_context().get_code_location_entries().values()) - ).code_location.get_repository( # type: ignore # (possible none) - "the_repo" - ) + return cast( + CodeLocation, + next( + iter(workspace_context.create_request_context().get_code_location_entries().values()) + ).code_location, + ).get_repository("the_repo") def loadable_target_origin() -> LoadableTargetOrigin: