Skip to content

Commit

Permalink
Add a thread pool to process backfills in parallel (#26189)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Some users are running into situations where they kick off a backfill
that takes hours to process and it holds up every other backfill until
it finishes. Adding a thread pool can alleviate this in most situations,
where there is one of a handful of long-running backfills; in the
unlikely case where all of the backfills being executed in parallel take
hours, this still won't help.

## How I Tested These Changes

Existing tests, plus changing e2e tests to use thread pool for
backfills.

## Changelog

Added the option to use a thread pool to process backfills in parallel.
  • Loading branch information
deepyaman authored Dec 18, 2024
1 parent 4e52c92 commit 29a91fc
Show file tree
Hide file tree
Showing 15 changed files with 457 additions and 88 deletions.
3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,9 @@ def get_settings(self, settings_key: str) -> Any:
return self._settings.get(settings_key)
return {}

def get_backfill_settings(self) -> Mapping[str, Any]:
return self.get_settings("backfills")

def get_scheduler_settings(self) -> Mapping[str, Any]:
return self.get_settings("schedules")

Expand Down
15 changes: 15 additions & 0 deletions python_modules/dagster/dagster/_core/instance/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,20 @@ def get_tick_retention_settings(
return default_retention_settings


def backfills_daemon_config() -> Field:
return Field(
{
"use_threads": Field(Bool, is_required=False, default_value=False),
"num_workers": Field(
int,
is_required=False,
description="How many threads to use to process multiple backfills in parallel",
),
},
is_required=False,
)


def sensors_daemon_config() -> Field:
return Field(
{
Expand Down Expand Up @@ -389,6 +403,7 @@ def dagster_instance_config_schema() -> Mapping[str, Field]:
),
"secrets": secrets_loader_config_schema(),
"retention": retention_config_schema(),
"backfills": backfills_daemon_config(),
"sensors": sensors_daemon_config(),
"schedules": schedules_daemon_config(),
"auto_materialize": Field(
Expand Down
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/_core/instance/ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ def from_dir(
"run_retries",
"code_servers",
"retention",
"backfills",
"sensors",
"schedules",
"nux",
Expand Down
22 changes: 3 additions & 19 deletions python_modules/dagster/dagster/_daemon/asset_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
from dagster._serdes import serialize_value
from dagster._serdes.serdes import deserialize_value
from dagster._time import get_current_datetime, get_current_timestamp
from dagster._utils import SingleInstigatorDebugCrashFlags, check_for_debug_crash
from dagster._utils import SingleInstigatorDebugCrashFlags, check_for_debug_crash, return_as_list

_LEGACY_PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY = "ASSET_DAEMON_CURSOR"
_PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY = "ASSET_DAEMON_CURSOR_NEW"
Expand Down Expand Up @@ -655,24 +655,6 @@ def _copy_default_auto_materialize_sensor_states(

return result

def _process_auto_materialize_tick(
self,
workspace_process_context: IWorkspaceProcessContext,
repository: Optional[RemoteRepository],
sensor: Optional[RemoteSensor],
debug_crash_flags: SingleInstigatorDebugCrashFlags,
submit_threadpool_executor: Optional[ThreadPoolExecutor],
):
return list(
self._process_auto_materialize_tick_generator(
workspace_process_context,
repository,
sensor,
debug_crash_flags,
submit_threadpool_executor=submit_threadpool_executor,
)
)

def _process_auto_materialize_tick_generator(
self,
workspace_process_context: IWorkspaceProcessContext,
Expand Down Expand Up @@ -878,6 +860,8 @@ def _process_auto_materialize_tick_generator(

yield error_info

_process_auto_materialize_tick = return_as_list(_process_auto_materialize_tick_generator)

def _evaluate_auto_materialize_tick(
self,
tick_context: AutoMaterializeLaunchContext,
Expand Down
118 changes: 104 additions & 14 deletions python_modules/dagster/dagster/_daemon/backfill.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import os
import sys
import threading
from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import contextmanager
from typing import Iterable, Mapping, Optional, Sequence, cast
from typing import TYPE_CHECKING, Dict, Iterable, Mapping, Optional, Sequence, cast

import dagster._check as check
from dagster._core.definitions.instigation_logger import InstigationLogger
Expand All @@ -16,9 +18,13 @@
from dagster._core.execution.job_backfill import execute_job_backfill_iteration
from dagster._core.workspace.context import IWorkspaceProcessContext
from dagster._daemon.utils import DaemonErrorCapture
from dagster._time import get_current_datetime
from dagster._time import get_current_datetime, get_current_timestamp
from dagster._utils import return_as_list
from dagster._utils.error import SerializableErrorInfo

if TYPE_CHECKING:
from dagster._daemon.daemon import DaemonIterator


@contextmanager
def _get_instigation_logger_if_log_storage_enabled(
Expand Down Expand Up @@ -46,9 +52,55 @@ def _get_max_asset_backfill_retries():
return int(os.getenv("DAGSTER_MAX_ASSET_BACKFILL_RETRIES", "5"))


def execute_backfill_iteration_loop(
workspace_process_context: IWorkspaceProcessContext,
logger: logging.Logger,
shutdown_event: threading.Event,
until: Optional[float] = None,
threadpool_executor: Optional[ThreadPoolExecutor] = None,
) -> "DaemonIterator":
from dagster._daemon.controller import DEFAULT_DAEMON_INTERVAL_SECONDS
from dagster._daemon.daemon import SpanMarker

backfill_futures: Dict[str, Future] = {}
while True:
start_time = get_current_timestamp()
if until and start_time >= until:
# provide a way of organically ending the loop to support test environment
break

yield SpanMarker.START_SPAN

try:
yield from execute_backfill_iteration(
workspace_process_context,
logger,
threadpool_executor=threadpool_executor,
backfill_futures=backfill_futures,
)
except Exception:
error_info = DaemonErrorCapture.on_exception(
exc_info=sys.exc_info(),
logger=logger,
log_message="BackfillDaemon caught an error",
)
yield error_info

yield SpanMarker.END_SPAN

end_time = get_current_timestamp()
loop_duration = end_time - start_time
sleep_time = max(0, DEFAULT_DAEMON_INTERVAL_SECONDS - loop_duration)
shutdown_event.wait(sleep_time)

yield None


def execute_backfill_iteration(
workspace_process_context: IWorkspaceProcessContext,
logger: logging.Logger,
threadpool_executor: Optional[ThreadPoolExecutor] = None,
backfill_futures: Optional[Dict[str, Future]] = None,
debug_crash_flags: Optional[Mapping[str, int]] = None,
) -> Iterable[Optional[SerializableErrorInfo]]:
instance = workspace_process_context.instance
Expand All @@ -68,7 +120,12 @@ def execute_backfill_iteration(
backfill_jobs = [*in_progress_backfills, *canceling_backfills]

yield from execute_backfill_jobs(
workspace_process_context, logger, backfill_jobs, debug_crash_flags
workspace_process_context,
logger,
backfill_jobs,
threadpool_executor,
backfill_futures,
debug_crash_flags,
)


Expand All @@ -86,6 +143,8 @@ def execute_backfill_jobs(
workspace_process_context: IWorkspaceProcessContext,
logger: logging.Logger,
backfill_jobs: Sequence[PartitionBackfill],
threadpool_executor: Optional[ThreadPoolExecutor] = None,
backfill_futures: Optional[Dict[str, Future]] = None,
debug_crash_flags: Optional[Mapping[str, int]] = None,
) -> Iterable[Optional[SerializableErrorInfo]]:
instance = workspace_process_context.instance
Expand All @@ -103,18 +162,49 @@ def execute_backfill_jobs(
)

try:
if backfill.is_asset_backfill:
yield from execute_asset_backfill_iteration(
backfill, backfill_logger, workspace_process_context, instance
)
if threadpool_executor:
if backfill_futures is None:
check.failed(
"backfill_futures dict must be passed with threadpool_executor"
)

# only allow one backfill per backfill job to be in flight
if backfill_id in backfill_futures and not backfill_futures[backfill_id].done():
continue

if backfill.is_asset_backfill:
future = threadpool_executor.submit(
return_as_list(execute_asset_backfill_iteration),
backfill,
backfill_logger,
workspace_process_context,
instance,
)
else:
future = threadpool_executor.submit(
return_as_list(execute_job_backfill_iteration),
backfill,
backfill_logger,
workspace_process_context,
debug_crash_flags,
instance,
)
backfill_futures[backfill_id] = future
yield

else:
yield from execute_job_backfill_iteration(
backfill,
backfill_logger,
workspace_process_context,
debug_crash_flags,
instance,
)
if backfill.is_asset_backfill:
yield from execute_asset_backfill_iteration(
backfill, backfill_logger, workspace_process_context, instance
)
else:
yield from execute_job_backfill_iteration(
backfill,
backfill_logger,
workspace_process_context,
debug_crash_flags,
instance,
)
except Exception as e:
backfill = check.not_none(instance.get_backfill(backfill.backfill_id))
if (
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_daemon/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def create_daemon_of_type(daemon_type: str, instance: DagsterInstance) -> Dagste
interval_seconds=instance.run_coordinator.dequeue_interval_seconds # type: ignore # (??)
)
elif daemon_type == BackfillDaemon.daemon_type():
return BackfillDaemon(interval_seconds=DEFAULT_DAEMON_INTERVAL_SECONDS)
return BackfillDaemon(settings=instance.get_backfill_settings())
elif daemon_type == MonitoringDaemon.daemon_type():
return MonitoringDaemon(interval_seconds=instance.run_monitoring_poll_interval_seconds)
elif daemon_type == EventLogConsumerDaemon.daemon_type():
Expand Down
31 changes: 27 additions & 4 deletions python_modules/dagster/dagster/_daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from dagster._core.telemetry import DAEMON_ALIVE, log_action
from dagster._core.utils import InheritContextThreadPoolExecutor
from dagster._core.workspace.context import IWorkspaceProcessContext
from dagster._daemon.backfill import execute_backfill_iteration
from dagster._daemon.backfill import execute_backfill_iteration_loop
from dagster._daemon.monitoring import (
execute_concurrency_slots_iteration,
execute_run_monitoring_iteration,
Expand Down Expand Up @@ -333,16 +333,39 @@ def core_loop(
)


class BackfillDaemon(IntervalDaemon):
class BackfillDaemon(DagsterDaemon):
def __init__(self, settings: Mapping[str, Any]) -> None:
super().__init__()
self._exit_stack = ExitStack()
self._threadpool_executor: Optional[InheritContextThreadPoolExecutor] = None

if settings.get("use_threads"):
self._threadpool_executor = self._exit_stack.enter_context(
InheritContextThreadPoolExecutor(
max_workers=settings.get("num_workers"),
thread_name_prefix="backfill_daemon_worker",
)
)

@classmethod
def daemon_type(cls) -> str:
return "BACKFILL"

def run_iteration(
def __exit__(self, _exception_type, _exception_value, _traceback):
self._exit_stack.close()
super().__exit__(_exception_type, _exception_value, _traceback)

def core_loop(
self,
workspace_process_context: IWorkspaceProcessContext,
shutdown_event: Event,
) -> DaemonIterator:
yield from execute_backfill_iteration(workspace_process_context, self._logger)
yield from execute_backfill_iteration_loop(
workspace_process_context,
self._logger,
shutdown_event,
threadpool_executor=self._threadpool_executor,
)


class MonitoringDaemon(IntervalDaemon):
Expand Down
37 changes: 11 additions & 26 deletions python_modules/dagster/dagster/_daemon/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@
from dagster._daemon.utils import DaemonErrorCapture
from dagster._scheduler.stale import resolve_stale_or_missing_assets
from dagster._time import get_current_datetime, get_current_timestamp
from dagster._utils import DebugCrashFlags, SingleInstigatorDebugCrashFlags, check_for_debug_crash
from dagster._utils import (
DebugCrashFlags,
SingleInstigatorDebugCrashFlags,
check_for_debug_crash,
return_as_list,
)
from dagster._utils.error import SerializableErrorInfo
from dagster._utils.merger import merge_dicts

Expand Down Expand Up @@ -344,7 +349,6 @@ def execute_sensor_iteration_loop(
yield SpanMarker.END_SPAN

end_time = get_current_timestamp()

loop_duration = end_time - start_time
sleep_time = max(0, MIN_INTERVAL_LOOP_TIME - loop_duration)
shutdown_event.wait(sleep_time)
Expand Down Expand Up @@ -453,30 +457,6 @@ def execute_sensor_iteration(
)


def _process_tick(
workspace_process_context: IWorkspaceProcessContext,
logger: logging.Logger,
remote_sensor: RemoteSensor,
sensor_state: InstigatorState,
sensor_debug_crash_flags: Optional[SingleInstigatorDebugCrashFlags],
tick_retention_settings,
submit_threadpool_executor: Optional[ThreadPoolExecutor],
):
# evaluate the tick immediately, but from within a thread. The main thread should be able to
# heartbeat to keep the daemon alive
return list(
_process_tick_generator(
workspace_process_context,
logger,
remote_sensor,
sensor_state,
sensor_debug_crash_flags,
tick_retention_settings,
submit_threadpool_executor,
)
)


def _get_evaluation_tick(
instance: DagsterInstance,
sensor: RemoteSensor,
Expand Down Expand Up @@ -630,6 +610,11 @@ def _process_tick_generator(
yield error_info


# evaluate the tick immediately, but from within a thread. The main thread should be able to
# heartbeat to keep the daemon alive
_process_tick = return_as_list(_process_tick_generator)


def _sensor_instigator_data(state: InstigatorState) -> Optional[SensorInstigatorData]:
instigator_data = state.instigator_data
if instigator_data is None or isinstance(instigator_data, SensorInstigatorData):
Expand Down
Loading

0 comments on commit 29a91fc

Please sign in to comment.