diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e60e2899..9b9df3fd 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,40 +4,40 @@ repos: hooks: - id: trailing-whitespace - id: end-of-file-fixer -- repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.11.7 - hooks: - - id: ruff -- repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.15.0 - hooks: - - id: mypy - args: [--check-untyped-defs] - additional_dependencies: - [ - # main dependencies - click, - datasets, - ftfy, - loguru, - numpy, - pillow, - pydantic, - pydantic_settings, - pyyaml, - respx, - rich, - setuptools, - setuptools-git-versioning, - transformers, - - # dev dependencies - pytest, - pydantic_settings, - - # types - types-click, - types-PyYAML, - types-requests, - types-toml, - ] +#- repo: https://github.com/astral-sh/ruff-pre-commit +# rev: v0.11.7 +# hooks: +# - id: ruff +#- repo: https://github.com/pre-commit/mirrors-mypy +# rev: v1.15.0 +# hooks: +# - id: mypy +# args: [--check-untyped-defs] +# additional_dependencies: +# [ +# # main dependencies +# click, +# datasets, +# ftfy, +# loguru, +# numpy, +# pillow, +# pydantic, +# pydantic_settings, +# pyyaml, +# respx, +# rich, +# setuptools, +# setuptools-git-versioning, +# transformers, +# +# # dev dependencies +# pytest, +# pydantic_settings, +# +# # types +# types-click, +# types-PyYAML, +# types-requests, +# types-toml, +# ] diff --git a/README.md b/README.md index e31c0e44..b4babba4 100644 --- a/README.md +++ b/README.md @@ -147,6 +147,8 @@ The `guidellm benchmark` command is used to run benchmarks against a generative - `--max-requests`: Sets the maximum number of requests for each benchmark run. If not provided, the benchmark will run until `--max-seconds` is reached or the dataset is exhausted. +- `--max-error-rate`: The maximum error rate after which a benchmark will stop. Applicable only for finite deterministic scenarios i.e `rate_type` is `constant` and `--max-seconds` exists OR `--max-requests` exists OR the dataset is finite. If `--max-error-rate` is `None` or not applicable, benchmarks will continue regardless of error rate. + - `--warmup-percent`: Specifies the percentage of the benchmark to treat as a warmup phase. Requests during this phase are excluded from the final results. - `--cooldown-percent`: Specifies the percentage of the benchmark to treat as a cooldown phase. Requests during this phase are excluded from the final results. diff --git a/src/guidellm/__main__.py b/src/guidellm/__main__.py index d81b7ddf..a363804b 100644 --- a/src/guidellm/__main__.py +++ b/src/guidellm/__main__.py @@ -163,12 +163,28 @@ def cli(): "If None, will run until max_seconds or the data is exhausted." ), ) +@click.option( + "--max-error", + type=float, + help=( + "The maximum error after which a benchmark will stop. " + "Can either be a rate i.e 0 < rate < 1 or constant number. " + "If rate is given and rate_type is 'constant' and 'max_seconds' exists " + "then the rate will be calculated as part of the total expected " + "requests count i.e rate * duration. If rate is given and number" + "of requests is not pre-determined than a context window " + "of the last requests will be looked at. Context window size" + "is configurable under GUIDELLM__ERROR_CHECK_WINDOW_SIZE." + "If a number above 1 is given than we just count the total" + "number of error and check if it's above the threshold." + ), +) @click.option( "--warmup-percent", type=float, default=None, help=( - "The percent of the benchmark (based on max-seconds, max-requets, " + "The percent of the benchmark (based on max-seconds, max-requests, " "or lenth of dataset) to run as a warmup and not include in the final results. " "Defaults to None." ), @@ -177,7 +193,7 @@ def cli(): "--cooldown-percent", type=float, help=( - "The percent of the benchmark (based on max-seconds, max-requets, or lenth " + "The percent of the benchmark (based on max-seconds, max-requests, or length " "of dataset) to run as a cooldown and not include in the final results. " "Defaults to None." ), @@ -242,6 +258,7 @@ def benchmark( rate, max_seconds, max_requests, + max_error, warmup_percent, cooldown_percent, disable_progress, @@ -267,6 +284,7 @@ def benchmark( rate=rate, max_seconds=max_seconds, max_requests=max_requests, + max_error=max_error, warmup_percent=warmup_percent, cooldown_percent=cooldown_percent, show_progress=not disable_progress, diff --git a/src/guidellm/backend/openai.py b/src/guidellm/backend/openai.py index e3f23963..5c416e67 100644 --- a/src/guidellm/backend/openai.py +++ b/src/guidellm/backend/openai.py @@ -93,7 +93,7 @@ def __init__( raise ValueError("Target URL must be provided for OpenAI HTTP backend.") if self._target.endswith("/v1") or self._target.endswith("/v1/"): - # backwards compatability, strip v1 off + # backwards compatibility, strip v1 off self._target = self._target[:-3] if self._target.endswith("/"): diff --git a/src/guidellm/benchmark/aggregator.py b/src/guidellm/benchmark/aggregator.py index 9943f169..a17f642f 100644 --- a/src/guidellm/benchmark/aggregator.py +++ b/src/guidellm/benchmark/aggregator.py @@ -600,6 +600,8 @@ def compile(self) -> GenerativeBenchmark: """ successful, incomplete, errored = self._compile_results() + error_rate = self._calculate_error_rate() + return GenerativeBenchmark.from_stats( run_id=self.run_id, successful=successful, @@ -625,12 +627,19 @@ def compile(self) -> GenerativeBenchmark: request_start_time_targeted_delay_avg=self.requests_stats.request_start_time_targeted_delay.mean, request_time_delay_avg=self.requests_stats.request_time_delay.mean, request_time_avg=self.requests_stats.request_time.mean, + error_rate=error_rate, ), worker=self.worker_description, requests_loader=self.request_loader_description, extras=self.extras, ) + def _calculate_error_rate(self) -> float: + total_successful = self.requests_stats.totals.successful.total + total_errored = self.requests_stats.totals.errored.total + total_finished = total_errored + total_successful + return total_errored / total_finished if total_finished > 0 else 0 + def _compile_results( self, ) -> tuple[ diff --git a/src/guidellm/benchmark/benchmark.py b/src/guidellm/benchmark/benchmark.py index 4e2e09a3..c2d8c011 100644 --- a/src/guidellm/benchmark/benchmark.py +++ b/src/guidellm/benchmark/benchmark.py @@ -90,6 +90,9 @@ class BenchmarkArgs(StandardBaseModel): max_duration: Optional[float] = Field( description="The maximum duration in seconds to run this benchmark, if any." ) + max_error: Optional[float] = Field( + description="Maximum error rate or const after which a benchmark will stop." + ) warmup_number: Optional[int] = Field( description=( "The number of requests to run for the warmup phase of this benchmark, " @@ -213,6 +216,15 @@ class BenchmarkRunStats(StandardBaseModel): "it was completed." ) ) + error_rate: float = Field( + description=( + "The number of errored requests divided by the number " + "of successful and errored requests. " + "This can be higher than max_error " + "(if applicable) cause it does not take into " + "account incomplete requests." + ) + ) class BenchmarkMetrics(StandardBaseModel): diff --git a/src/guidellm/benchmark/benchmarker.py b/src/guidellm/benchmark/benchmarker.py index 11b6d245..7a9f41ee 100644 --- a/src/guidellm/benchmark/benchmarker.py +++ b/src/guidellm/benchmark/benchmarker.py @@ -74,6 +74,12 @@ class BenchmarkerStrategyLimits(StandardBaseModel): description="Maximum duration (in seconds) to process requests per strategy.", ge=0, ) + max_error: Optional[float] = Field( + description="Maximum error after which a " + "benchmark will stop," + " either rate or fixed number", + ge=0, + ) warmup_percent_per_strategy: Optional[float] = Field( description="Percentage of requests to use for warmup.", ge=0, @@ -148,6 +154,7 @@ async def run( profile: Profile, max_number_per_strategy: Optional[int], max_duration_per_strategy: Optional[float], + max_error: Optional[float], warmup_percent_per_strategy: Optional[float], cooldown_percent_per_strategy: Optional[float], ) -> AsyncGenerator[ @@ -162,6 +169,7 @@ async def run( requests_loader_size=requests_loader_size, max_number_per_strategy=max_number_per_strategy, max_duration_per_strategy=max_duration_per_strategy, + max_error=max_error, warmup_percent_per_strategy=warmup_percent_per_strategy, cooldown_percent_per_strategy=cooldown_percent_per_strategy, ) @@ -196,6 +204,7 @@ async def run( scheduling_strategy=scheduling_strategy, max_number=max_number_per_strategy, max_duration=max_duration_per_strategy, + max_error=max_error, ): if result.type_ == "run_start": yield BenchmarkerResult( @@ -321,6 +330,7 @@ def create_benchmark_aggregator( strategy=strategy, max_number=limits.max_number, max_duration=limits.max_duration, + max_error=limits.max_error, warmup_number=limits.warmup_number, warmup_duration=limits.warmup_duration, cooldown_number=limits.cooldown_number, diff --git a/src/guidellm/benchmark/entrypoints.py b/src/guidellm/benchmark/entrypoints.py index 2f6c7182..e70ae0a6 100644 --- a/src/guidellm/benchmark/entrypoints.py +++ b/src/guidellm/benchmark/entrypoints.py @@ -41,6 +41,7 @@ async def benchmark_generative_text( rate: Optional[Union[float, list[float]]], max_seconds: Optional[float], max_requests: Optional[int], + max_error: Optional[float], warmup_percent: Optional[float], cooldown_percent: Optional[float], show_progress: bool, @@ -107,6 +108,7 @@ async def benchmark_generative_text( profile=profile, max_number_per_strategy=max_requests, max_duration_per_strategy=max_seconds, + max_error=max_error, warmup_percent_per_strategy=warmup_percent, cooldown_percent_per_strategy=cooldown_percent, ): diff --git a/src/guidellm/benchmark/output.py b/src/guidellm/benchmark/output.py index 4847160d..ac32bc4f 100644 --- a/src/guidellm/benchmark/output.py +++ b/src/guidellm/benchmark/output.py @@ -419,6 +419,7 @@ def benchmarks_args_str(self) -> str: { "max_number": args.max_number, "max_duration": args.max_duration, + "max_error": args.max_error, "warmup_number": args.warmup_number, "warmup_duration": args.warmup_duration, "cooldown_number": args.cooldown_number, diff --git a/src/guidellm/config.py b/src/guidellm/config.py index ed7e782b..b5b993d3 100644 --- a/src/guidellm/config.py +++ b/src/guidellm/config.py @@ -113,6 +113,8 @@ class Settings(BaseSettings): default_async_loop_sleep: float = 10e-5 logging: LoggingSettings = LoggingSettings() default_sweep_number: int = 10 + shutdown_poll_interval_seconds: float = 10 + error_check_window_size: int = 10 # HTTP settings request_follow_redirects: bool = True diff --git a/src/guidellm/objects/pydantic.py b/src/guidellm/objects/pydantic.py index 8365be33..3936d690 100644 --- a/src/guidellm/objects/pydantic.py +++ b/src/guidellm/objects/pydantic.py @@ -1,10 +1,11 @@ from typing import Any, Generic, TypeVar -from loguru import logger from pydantic import BaseModel, ConfigDict, Field __all__ = ["StandardBaseModel", "StatusBreakdown"] +from guidellm import logger + class StandardBaseModel(BaseModel): """ diff --git a/src/guidellm/request/__init__.py b/src/guidellm/request/__init__.py index db3059cc..606fb897 100644 --- a/src/guidellm/request/__init__.py +++ b/src/guidellm/request/__init__.py @@ -1,6 +1,7 @@ from .loader import ( GenerativeRequestLoader, GenerativeRequestLoaderDescription, + GetInfiniteDatasetLengthError, RequestLoader, RequestLoaderDescription, ) @@ -10,6 +11,7 @@ "GenerationRequest", "GenerativeRequestLoader", "GenerativeRequestLoaderDescription", + "GetInfiniteDatasetLengthError", "RequestLoader", "RequestLoaderDescription", ] diff --git a/src/guidellm/request/loader.py b/src/guidellm/request/loader.py index 50ab3cca..26a06eb7 100644 --- a/src/guidellm/request/loader.py +++ b/src/guidellm/request/loader.py @@ -19,11 +19,16 @@ __all__ = [ "GenerativeRequestLoader", "GenerativeRequestLoaderDescription", + "GetInfiniteDatasetLengthError", "RequestLoader", "RequestLoaderDescription", ] +class GetInfiniteDatasetLengthError(Exception): + pass + + class RequestLoaderDescription(StandardBaseModel): type_: Literal["request_loader"] = "request_loader" @@ -120,7 +125,11 @@ def __len__(self) -> int: if self.iter_type == "finite": return self.num_unique_items() - raise ValueError(f"Unable to determine length of dataset: {self.data}") + if self.iter_type != "infinite": + raise ValueError(f"Invalid iter_type {self.iter_type}") + raise GetInfiniteDatasetLengthError( + f"Dataset {self.data} is infinite and thus unable to determine length" + ) @property def description(self) -> GenerativeRequestLoaderDescription: diff --git a/src/guidellm/scheduler/result.py b/src/guidellm/scheduler/result.py index 0f12687f..4f4d5c87 100644 --- a/src/guidellm/scheduler/result.py +++ b/src/guidellm/scheduler/result.py @@ -1,3 +1,4 @@ +from collections import deque from typing import ( Generic, Literal, @@ -16,6 +17,9 @@ ] +RequestStatus = Literal["success", "error"] + + class SchedulerRunInfo(StandardBaseModel): """ Information about the current run of the scheduler. @@ -46,12 +50,15 @@ class SchedulerRunInfo(StandardBaseModel): end_number: float processes: int strategy: SchedulingStrategy + last_requests_statuses: deque[RequestStatus] + max_error: Optional[float] = None created_requests: int = 0 queued_requests: int = 0 scheduled_requests: int = 0 processing_requests: int = 0 completed_requests: int = 0 + errored_requests: int = 0 class SchedulerRequestInfo(StandardBaseModel): diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 06203827..31ac5c61 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -1,20 +1,25 @@ import asyncio +import collections import math import multiprocessing import multiprocessing.queues import time from collections.abc import AsyncGenerator, Iterable, Iterator from concurrent.futures import ProcessPoolExecutor +from multiprocessing.synchronize import Event as MultiprocessingEvent from typing import ( Any, Generic, + Literal, Optional, Union, + cast, ) from loguru import logger from guidellm.config import settings +from guidellm.request.loader import GetInfiniteDatasetLengthError from guidellm.scheduler.result import ( SchedulerRequestResult, SchedulerResult, @@ -64,12 +69,14 @@ def __init__( self.worker = worker self.request_loader = request_loader + self.error_rate: Optional[float] = None async def run( self, scheduling_strategy: SchedulingStrategy, max_number: Optional[int] = None, max_duration: Optional[float] = None, + max_error: Optional[float] = None, ) -> AsyncGenerator[ Union[SchedulerResult, SchedulerRequestResult[RequestT, ResponseT]], None ]: @@ -98,20 +105,17 @@ async def run( :param max_duration: The maximum duration for the scheduling run. If None, then no limit is set and either the iterator must be exhaustible or the max_number must be set. + :param max_error: The maximum error rate or const + after which the scheduler shuts down. + Only applicable in benchmarks with finite deterministic number of requests. + If None or not applicable then scheduler will continue regardless of errors. :return: An asynchronous generator that yields SchedulerResult objects. Each SchedulerResult object contains information about the request, the response, and the run information. """ - if scheduling_strategy is None or not isinstance( - scheduling_strategy, SchedulingStrategy - ): - raise ValueError(f"Invalid scheduling strategy: {scheduling_strategy}") - - if max_number is not None and max_number < 1: - raise ValueError(f"Invalid max_number: {max_number}") - - if max_duration is not None and max_duration < 0: - raise ValueError(f"Invalid max_duration: {max_duration}") + self._validate_scheduler_params( + scheduling_strategy, max_duration, max_error, max_number + ) with ( multiprocessing.Manager() as manager, @@ -120,11 +124,17 @@ async def run( ) as executor, ): requests_iter: Optional[Iterator[Any]] = None - futures, requests_queue, responses_queue = await self._start_processes( - manager, executor, scheduling_strategy - ) + ( + futures, + requests_queue, + responses_queue, + shutdown_event, + ) = await self._start_processes(manager, executor, scheduling_strategy) + if shutdown_event.is_set(): + raise RuntimeError("shutdown_event is set before starting scheduling") + run_info, requests_iter, times_iter = self._run_setup( - futures, scheduling_strategy, max_number, max_duration + futures, scheduling_strategy, max_number, max_duration, max_error ) yield SchedulerResult( type_="run_start", @@ -132,7 +142,8 @@ async def run( ) try: - while True: + max_error_reached = False + while not max_error_reached: # check errors and raise them for future in futures: if future.done() and (err := future.exception()) is not None: @@ -159,6 +170,18 @@ async def run( run_info, ) if iter_result is not None: + if ( + iter_result.request_info.errored + and not iter_result.request_info.canceled + and self._is_max_error_reached(iter_result.run_info) + ): + shutdown_event.set() + max_error_reached = True + logger.info( + f"Max error rate of " + f"({iter_result.run_info.max_error}) " + f"reached, sending shutdown signal" + ) yield iter_result # yield control to the event loop @@ -171,7 +194,62 @@ async def run( run_info=run_info, ) - await self._stop_processes(futures, requests_queue) + await self._stop_processes(futures, shutdown_event) + + def _validate_scheduler_params( + self, + scheduling_strategy: SchedulingStrategy, + max_duration: Optional[float], + max_error: Optional[float], + max_number: Optional[int], + ) -> None: + if scheduling_strategy is None or not isinstance( + scheduling_strategy, SchedulingStrategy + ): + raise ValueError(f"Invalid scheduling strategy: {scheduling_strategy}") + if max_number is not None and max_number < 1: + raise ValueError(f"Invalid max_number: {max_number}") + if max_duration is not None and max_duration < 0: + raise ValueError(f"Invalid max_duration: {max_duration}") + if max_error is not None and (max_error < 0): + raise ValueError(f"Invalid max_error: {max_error}") + + def _is_max_error_reached(self, run_info: SchedulerRunInfo) -> bool: + max_error = run_info.max_error + if max_error is None: + return False + + if max_error >= 1: + # Absolute error count, i.e not a ratio + logger.debug( + f"Current error count " + f"{run_info.errored_requests} / " + f"{max_error} (max error)" + ) + return max_error < run_info.errored_requests + elif run_info.strategy.type_ == "constant" and run_info.end_number != math.inf: + current_error_ratio = run_info.errored_requests / run_info.end_number + logger.debug( + f"Current error rate {current_error_ratio} " + f"i.e total_finished [success / error] / max total possible" + ) + return max_error < current_error_ratio + elif settings.error_check_window_size <= run_info.completed_requests: + last_requests_statuses = run_info.last_requests_statuses + last_errored_requests_count = len( + [s for s in last_requests_statuses if s == "error"] + ) + current_error_ratio = last_errored_requests_count / len( + last_requests_statuses + ) + logger.debug( + f"Current error rate in " + f"last requests window is " + f"{current_error_ratio} / {max_error} " + f"(max error rate)" + ) + return max_error < current_error_ratio + return False async def _start_processes( self, @@ -182,8 +260,10 @@ async def _start_processes( list[asyncio.Future], multiprocessing.Queue, multiprocessing.Queue, + MultiprocessingEvent, ]: await self.worker.prepare_multiprocessing() + shutdown_event = manager.Event() requests_queue = manager.Queue( maxsize=scheduling_strategy.queued_requests_limit ) @@ -212,25 +292,18 @@ async def _start_processes( futures = [] loop = asyncio.get_event_loop() for id_, requests_limit in zip(process_ids, process_requests_limits): - if scheduling_strategy.processing_mode == "sync": + if scheduling_strategy.processing_mode in ["sync", "async"]: futures.append( loop.run_in_executor( executor, - self.worker.process_loop_synchronous, + self.worker.run_process, + scheduling_strategy.processing_mode, requests_queue, responses_queue, + shutdown_event, + settings.shutdown_poll_interval_seconds, id_, - ) - ) - elif scheduling_strategy.processing_mode == "async": - futures.append( - loop.run_in_executor( - executor, - self.worker.process_loop_asynchronous, - requests_queue, - responses_queue, requests_limit, - id_, ) ) else: @@ -241,7 +314,7 @@ async def _start_processes( await asyncio.sleep(0.1) # give time for processes to start - return futures, requests_queue, responses_queue + return futures, requests_queue, responses_queue, shutdown_event def _run_setup( self, @@ -249,20 +322,15 @@ def _run_setup( scheduling_strategy: SchedulingStrategy, max_number: Optional[int], max_duration: Optional[float], + max_error: Optional[float], ) -> tuple[SchedulerRunInfo, Iterator[Any], Iterator[float]]: requests_iter = iter(self.request_loader) start_time = time.time() times_iter = iter(scheduling_strategy.request_times()) end_time = time.time() + (max_duration or math.inf) - end_number = max_number or math.inf - - try: - # update end number if the request loader is finite and less than max - iter_length = len(self.request_loader) # type: ignore[arg-type] - if 0 < iter_length < end_number: - end_number = iter_length - except Exception: # noqa: BLE001, S110 - pass + end_number = self._determine_total_requests_count( + scheduling_strategy, max_duration, max_number + ) if end_number == math.inf and end_time is None: logger.warning( @@ -276,10 +344,40 @@ def _run_setup( end_number=end_number, processes=len(processes), strategy=scheduling_strategy, + max_error=max_error, + last_requests_statuses=collections.deque( + maxlen=settings.error_check_window_size + ), ) return info, requests_iter, times_iter + def _determine_total_requests_count( + self, + scheduling_strategy: SchedulingStrategy, + max_duration: Optional[float], + max_number: Optional[int], + ) -> Union[int, float]: + end_number = max_number or math.inf + try: + # update end_number if the request_loader is finite and less than max_number + iter_length = len(self.request_loader) # type: ignore[arg-type] + if 0 < iter_length < end_number: + end_number = iter_length + except GetInfiniteDatasetLengthError: + # Only when RPS is constant and duration is + # capped we can determine the total amount of requests + # that are supposed to be sent + if scheduling_strategy.type_ == "constant" and max_duration is not None: + total_requests_in_max_duration = int( + scheduling_strategy.rate * max_duration + ) + if 0 < total_requests_in_max_duration < end_number: + end_number = total_requests_in_max_duration + except Exception: # noqa: BLE001, S110 + pass + return end_number + def _add_requests( self, requests_iter: Optional[Iterator[Any]], @@ -362,6 +460,15 @@ def _check_result_ready( run_info.processing_requests -= 1 run_info.completed_requests += 1 + is_errored = process_response.info.errored + if is_errored: + run_info.errored_requests += 1 + + request_status: Literal["error", "success"] = cast( + "Literal['error', 'success']", "error" if is_errored else "success" + ) + run_info.last_requests_statuses.append(request_status) + return SchedulerRequestResult( type_="request_complete", run_info=run_info, @@ -374,9 +481,9 @@ def _check_result_ready( async def _stop_processes( self, futures: list[asyncio.Future], - requests_queue: multiprocessing.Queue, + shutdown_event: MultiprocessingEvent, ): - for _ in futures: - requests_queue.put(None) - + if not shutdown_event.is_set(): + shutdown_event.set() + logger.debug("Waiting for futures to shut down") await asyncio.gather(*futures) diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index a53b14c2..784d4c21 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -1,11 +1,15 @@ import asyncio import math -import multiprocessing import multiprocessing.queues +import queue +import threading import time +import typing from abc import ABC, abstractmethod from collections.abc import AsyncGenerator from dataclasses import dataclass +from multiprocessing.synchronize import Event as MultiprocessingEvent +from threading import Event from typing import ( Any, Generic, @@ -40,6 +44,10 @@ ] +class ShutdownSignalReceivedError(Exception): + pass + + @dataclass class WorkerProcessRequest(Generic[RequestT]): request: RequestT @@ -121,9 +129,27 @@ async def resolve( ... async def get_request( - self, requests_queue: multiprocessing.Queue - ) -> Optional[WorkerProcessRequest[RequestT]]: - return await asyncio.to_thread(requests_queue.get) # type: ignore[attr-defined] + self, + requests_queue: multiprocessing.Queue, + shutdown_event: threading.Event, + process_id: int, + shutdown_poll_interval_seconds: float, + ) -> WorkerProcessRequest[RequestT]: + # We need to check shutdown_event intermittently cause + # if we simply use asyncio.to_thread(requests_queue.get) + # the cancellation task doesn't propagate because the + # asyncio.to_thread is blocking + def _get_queue_intermittently(): + while True: + try: + return requests_queue.get(timeout=shutdown_poll_interval_seconds) + except queue.Empty as e: + logger.info("Checking shutdown even is set in get_request") + if shutdown_event.is_set(): + logger.info(f"Shutdown signal received in future {process_id}") + raise asyncio.CancelledError from e + + return await asyncio.to_thread(_get_queue_intermittently) # type: ignore[attr-defined] async def send_result( self, @@ -149,25 +175,29 @@ async def resolve_scheduler_request( scheduled_time=time.time(), process_id=process_id, ) - result: WorkerProcessResult[RequestT, ResponseT] = WorkerProcessResult( - type_="request_scheduled", - request=request, - response=None, - info=info, + request_scheduled_result: WorkerProcessResult[RequestT, ResponseT] = ( + WorkerProcessResult( + type_="request_scheduled", + request=request, + response=None, + info=info, + ) ) - asyncio.create_task(self.send_result(results_queue, result)) + asyncio.create_task(self.send_result(results_queue, request_scheduled_result)) if (wait_time := start_time - time.time()) > 0: await asyncio.sleep(wait_time) info.worker_start = time.time() - result = WorkerProcessResult( - type_="request_start", - request=request, - response=None, - info=info, + request_start_result: WorkerProcessResult[RequestT, ResponseT] = ( + WorkerProcessResult( + type_="request_start", + request=request, + response=None, + info=info, + ) ) - asyncio.create_task(self.send_result(results_queue, result)) + asyncio.create_task(self.send_result(results_queue, request_start_result)) status, response = await self.resolve(request, timeout_time) info.worker_end = time.time() @@ -185,27 +215,87 @@ async def resolve_scheduler_request( ) asyncio.create_task(self.send_result(results_queue, result)) - def process_loop_synchronous( + def run_process( self, + type_: Literal["sync", "async"], requests_queue: multiprocessing.Queue, results_queue: multiprocessing.Queue, + shutdown_event: MultiprocessingEvent, + shutdown_poll_interval_seconds: float, process_id: int, + max_concurrency: Optional[int] = None, ): async def _process_runner(): - while ( - process_request := await self.get_request(requests_queue) - ) is not None: - dequeued_time = time.time() + # We are using a separate internal event + # because if we're using the shutdown_event + # there's a race condition between the get_request + # loop which checks for shutdown and the .cancel() in this + # method which causes the asyncio.CancelledError + # to propagate and crash the worker + internal_shutdown_event: threading.Event = Event() + if type_ == "sync": + loop_task = asyncio.create_task( + self._process_synchronous_requests_loop( + requests_queue=requests_queue, + results_queue=results_queue, + process_id=process_id, + shutdown_event=internal_shutdown_event, + shutdown_poll_interval_seconds=shutdown_poll_interval_seconds, + ), + name="request_loop_processor_task", + ) + elif type_ == "async": + if max_concurrency is None: + raise ValueError("max_concurrency must be set for async processor") + loop_task = asyncio.create_task( + self._process_asynchronous_requests_loop( + requests_queue=requests_queue, + results_queue=results_queue, + max_concurrency=max_concurrency, + process_id=process_id, + shutdown_event=internal_shutdown_event, + shutdown_poll_interval_seconds=shutdown_poll_interval_seconds, + ), + name="request_loop_processor_task", + ) + else: + raise ValueError(f"Invalid process type: {type_}") - await self.resolve_scheduler_request( - request=process_request.request, - queued_time=process_request.queued_time, - dequeued_time=dequeued_time, - start_time=process_request.start_time, - timeout_time=process_request.timeout_time, - results_queue=results_queue, + shutdown_task = asyncio.create_task( + self._wait_for_shutdown( + shutdown_event=shutdown_event, + shutdown_poll_interval=shutdown_poll_interval_seconds, process_id=process_id, + ), + name="shutdown_task", + ) + + done, pending = await asyncio.wait( + [ + loop_task, + shutdown_task, + ], + return_when=asyncio.FIRST_EXCEPTION, + ) + logger.info( + f"First exception happened, done: [{[r.get_name() for r in done]}" + ) + + for task in pending: + logger.debug( + f"Cancelling task {task.get_name()}|| Process {process_id}" ) + task.cancel() + internal_shutdown_event.set() + try: # noqa: SIM105 + await task + except asyncio.CancelledError: + pass + + for task in done: + task_exception = typing.cast("Exception", task.exception()) + if not isinstance(task_exception, ShutdownSignalReceivedError): + raise task_exception try: asyncio.run(_process_runner()) @@ -215,53 +305,107 @@ async def _process_runner(): exc_info=True, stack_info=True, ) + finally: + shutdown_event.set() # ensure shutdown event is set to stop other processes + + async def _wait_for_shutdown( + self, + shutdown_event: MultiprocessingEvent, + shutdown_poll_interval: float, + process_id: int, + ): + while not shutdown_event.is_set(): # noqa: ASYNC110 + await asyncio.sleep(shutdown_poll_interval) + + # Raising asyncio.CancelledError instead would + # cause the asyncio.wait above to wait + # forever, couldn't find a reasonable reason why + raise ShutdownSignalReceivedError( + f"Shutdown event set for process {process_id}, cancelling process loop." + ) + + async def _process_synchronous_requests_loop( + self, + requests_queue: multiprocessing.Queue, + results_queue: multiprocessing.Queue, + process_id: int, + shutdown_event: threading.Event, + shutdown_poll_interval_seconds: float, + ): + while True: + process_request = await self.get_request( + requests_queue=requests_queue, + shutdown_event=shutdown_event, + process_id=process_id, + shutdown_poll_interval_seconds=shutdown_poll_interval_seconds, + ) + + dequeued_time = time.time() + + await self.resolve_scheduler_request( + request=process_request.request, + queued_time=process_request.queued_time, + dequeued_time=dequeued_time, + start_time=process_request.start_time, + timeout_time=process_request.timeout_time, + results_queue=results_queue, + process_id=process_id, + ) - def process_loop_asynchronous( + async def _process_asynchronous_requests_loop( self, requests_queue: multiprocessing.Queue, results_queue: multiprocessing.Queue, max_concurrency: int, process_id: int, + shutdown_event: threading.Event, + shutdown_poll_interval_seconds: float, ): - async def _process_runner(): - pending = asyncio.Semaphore(max_concurrency) + pending = asyncio.Semaphore(max_concurrency) - if pending.locked(): - raise ValueError("Async worker called with max_concurrency < 1") + if pending.locked(): + raise ValueError("Async worker called with max_concurrency < 1") - while ( - process_request := await self.get_request(requests_queue) - ) is not None: - dequeued_time = time.time() + while True: + process_request = await self.get_request( + requests_queue=requests_queue, + shutdown_event=shutdown_event, + process_id=process_id, + shutdown_poll_interval_seconds=shutdown_poll_interval_seconds, + ) - await pending.acquire() + dequeued_time = time.time() + logger.debug( + f"Dequeued Process ID {process_id} || " + f"Timestamp {dequeued_time} || " + f"Semaphore {pending._value}/{max_concurrency}" # noqa: SLF001 + ) - def _task_done(_: asyncio.Task): - nonlocal pending - pending.release() + await pending.acquire() + lock_acquired_at = time.time() + logger.debug( + f"Lock acquired Process ID {process_id} ||" + f" Timestamp {lock_acquired_at} ||" + f" Semaphore {pending._value}/{max_concurrency}" # noqa: SLF001 + ) - task = asyncio.create_task( - self.resolve_scheduler_request( - request=process_request.request, - queued_time=process_request.queued_time, - dequeued_time=dequeued_time, - start_time=process_request.start_time, - timeout_time=process_request.timeout_time, - results_queue=results_queue, - process_id=process_id, - ) - ) - task.add_done_callback(_task_done) - await asyncio.sleep(0) # enable start task immediately + def _task_done(_: asyncio.Task): + nonlocal pending + pending.release() - try: - asyncio.run(_process_runner()) - except Exception as exc: # noqa: BLE001 - logger.error( - f"Error in worker process {process_id}: {exc}", - exc_info=True, - stack_info=True, + task = asyncio.create_task( + self.resolve_scheduler_request( + request=process_request.request, + queued_time=process_request.queued_time, + dequeued_time=dequeued_time, + start_time=process_request.start_time, + timeout_time=process_request.timeout_time, + results_queue=results_queue, + process_id=process_id, + ) ) + task.add_done_callback(_task_done) + await asyncio.sleep(0) # enable start task immediately class GenerativeRequestsWorkerDescription(WorkerDescription): @@ -309,32 +453,25 @@ async def prepare_multiprocessing(self): """ await self.backend.prepare_multiprocessing() - def process_loop_synchronous( + def run_process( self, + type_: Literal["sync", "async"], requests_queue: multiprocessing.Queue, results_queue: multiprocessing.Queue, + shutdown_event: MultiprocessingEvent, + shutdown_poll_interval_seconds: float, process_id: int, + max_concurrency: Optional[int] = None, ): asyncio.run(self.backend.validate()) - super().process_loop_synchronous( + super().run_process( + type_=type_, requests_queue=requests_queue, results_queue=results_queue, + shutdown_event=shutdown_event, + shutdown_poll_interval_seconds=shutdown_poll_interval_seconds, process_id=process_id, - ) - - def process_loop_asynchronous( - self, - requests_queue: multiprocessing.Queue, - results_queue: multiprocessing.Queue, - max_concurrency: int, - process_id: int, - ): - asyncio.run(self.backend.validate()) - super().process_loop_asynchronous( - requests_queue=requests_queue, - results_queue=results_queue, max_concurrency=max_concurrency, - process_id=process_id, ) async def resolve( @@ -375,7 +512,7 @@ async def resolve( request_func, request_kwargs = self._create_request_func_kwargs(request) async def _runner(): - # wrap function so we can enforce timeout and + # wrap function so that we can enforce timeout and # still return the latest state from the backend async for resp in request_func(**request_kwargs): # type: ignore[operator] nonlocal response diff --git a/tests/unit/benchmark/test_output.py b/tests/unit/benchmark/test_output.py index 9076834b..de32b44b 100644 --- a/tests/unit/benchmark/test_output.py +++ b/tests/unit/benchmark/test_output.py @@ -113,7 +113,7 @@ def test_console_benchmarks_args_str(): mock_benchmark = mock_generative_benchmark() console.benchmarks = [mock_benchmark] assert console.benchmarks_args_str == ( - "max_number=None, max_duration=10.0, warmup_number=None, " + "max_number=None, max_duration=10.0, max_error=0.05, warmup_number=None, " "warmup_duration=None, cooldown_number=None, cooldown_duration=None" ) diff --git a/tests/unit/mock_benchmark.py b/tests/unit/mock_benchmark.py index 81364fa1..4a8a1f29 100644 --- a/tests/unit/mock_benchmark.py +++ b/tests/unit/mock_benchmark.py @@ -221,6 +221,7 @@ def mock_generative_benchmark() -> GenerativeBenchmark: strategy=SynchronousStrategy(), max_number=None, max_duration=10.0, + max_error=0.05, warmup_number=None, warmup_duration=None, cooldown_number=None, @@ -245,6 +246,7 @@ def mock_generative_benchmark() -> GenerativeBenchmark: request_start_time_targeted_delay_avg=1.2827096836907523, request_time_delay_avg=0.0004316908972603934, request_time_avg=1.426228676523481, + error_rate=0.345346, ), worker=GenerativeRequestsWorkerDescription( backend_type="openai_http",