diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index 0d008ad0..2a70ed9c 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -572,6 +572,8 @@ def __init__( self.__closed = False + self.__last_batch_time = 0.0 + def handle_sigchld(signum: int, frame: Any) -> None: # Terminates the consumer if any child process of the # consumer is terminated. @@ -585,6 +587,7 @@ def handle_sigchld(signum: int, frame: Any) -> None: def __submit_batch(self, input_block_too_small: bool) -> None: assert self.__batch_builder is not None + batch = self.__batch_builder.build() logger.debug("Submitting %r to %r...", batch, self.__pool) self.__processes.append( @@ -608,6 +611,12 @@ def __submit_batch(self, input_block_too_small: bool) -> None: ) self.__batch_builder = None + if self.__last_batch_time == 0.0: + self.__last_batch_time = time.time() + else: + self.__metrics.timing("arroyo.strategies.run_task_with_multiprocessing.batch_build_and_submit_time", time.time() - self.__last_batch_time) + self.__last_batch_time = time.time() + def __forward_invalid_offsets(self) -> None: if len(self.__invalid_messages): self.__next_step.poll() diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index d6cdd33e..a197654b 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -98,4 +98,6 @@ "arroyo.processing.strategies.healthcheck.touch", # Counter: Number of messages dropped in the FilterStep strategy "arroyo.strategies.filter.dropped_messages", + # The amount of time it takes for the multiprocessing batch to be built and submitted to subprocesses + "arroyo.strategies.run_task_with_multiprocessing.batch_build_and_submit_time" ]