From 2816331445f1465cd9659463a7b2fb2e6cec58c4 Mon Sep 17 00:00:00 2001 From: Riya Chakraborty Date: Wed, 10 Apr 2024 11:00:00 -0700 Subject: [PATCH 1/4] add batch build time metric --- .../strategies/run_task_with_multiprocessing.py | 8 ++++++++ arroyo/utils/metric_defs.py | 2 ++ 2 files changed, 10 insertions(+) diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index 0d008ad0..efeda05e 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -572,6 +572,8 @@ def __init__( self.__closed = False + self.__last_batch_time = None + def handle_sigchld(signum: int, frame: Any) -> None: # Terminates the consumer if any child process of the # consumer is terminated. @@ -585,6 +587,7 @@ def handle_sigchld(signum: int, frame: Any) -> None: def __submit_batch(self, input_block_too_small: bool) -> None: assert self.__batch_builder is not None + batch = self.__batch_builder.build() logger.debug("Submitting %r to %r...", batch, self.__pool) self.__processes.append( @@ -608,6 +611,11 @@ def __submit_batch(self, input_block_too_small: bool) -> None: ) self.__batch_builder = None + if self.__last_batch_time is None: + self.__last_batch_time = time.time() + else: + self.__metrics.timing("arroyo.strategies.run_task_with_multiprocessing.batch.build_and_submit_time", time.time() - self.__last_batch_time) + def __forward_invalid_offsets(self) -> None: if len(self.__invalid_messages): self.__next_step.poll() diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index 05f47ac3..68e94301 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -98,4 +98,6 @@ "arroyo.processing.strategies.healthcheck.touch", # Number of messages dropped in the FilterStep strategy "arroyo.strategies.filter.dropped_messages", + # The amount of time it takes for the multiprocessing batch to be built and submitted to subprocesses + "arroyo.strategies.run_task_with_multiprocessing.batch.build_and_submit_time" ] From 0ae83ad06af30bd8025f016e99d499ea2c7dc4e8 Mon Sep 17 00:00:00 2001 From: Riya Chakraborty Date: Wed, 10 Apr 2024 11:16:51 -0700 Subject: [PATCH 2/4] change to float --- arroyo/processing/strategies/run_task_with_multiprocessing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index efeda05e..a14d9e67 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -572,7 +572,7 @@ def __init__( self.__closed = False - self.__last_batch_time = None + self.__last_batch_time = 0.0 def handle_sigchld(signum: int, frame: Any) -> None: # Terminates the consumer if any child process of the @@ -611,7 +611,7 @@ def __submit_batch(self, input_block_too_small: bool) -> None: ) self.__batch_builder = None - if self.__last_batch_time is None: + if self.__last_batch_time == 0.0: self.__last_batch_time = time.time() else: self.__metrics.timing("arroyo.strategies.run_task_with_multiprocessing.batch.build_and_submit_time", time.time() - self.__last_batch_time) From 27e38bf60174804e73f044a76c2626fc65b578d8 Mon Sep 17 00:00:00 2001 From: Riya Chakraborty Date: Wed, 10 Apr 2024 11:30:18 -0700 Subject: [PATCH 3/4] reset time --- arroyo/processing/strategies/run_task_with_multiprocessing.py | 1 + 1 file changed, 1 insertion(+) diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index a14d9e67..2b91152f 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -615,6 +615,7 @@ def __submit_batch(self, input_block_too_small: bool) -> None: self.__last_batch_time = time.time() else: self.__metrics.timing("arroyo.strategies.run_task_with_multiprocessing.batch.build_and_submit_time", time.time() - self.__last_batch_time) + self.__last_batch_time = time.time() def __forward_invalid_offsets(self) -> None: if len(self.__invalid_messages): From 0b10ff02fbc5017940d920013b3bbfeeb45da45c Mon Sep 17 00:00:00 2001 From: Riya Chakraborty Date: Fri, 12 Apr 2024 13:31:37 -0700 Subject: [PATCH 4/4] change metrc name --- arroyo/processing/strategies/run_task_with_multiprocessing.py | 2 +- arroyo/utils/metric_defs.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index 2b91152f..2a70ed9c 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -614,7 +614,7 @@ def __submit_batch(self, input_block_too_small: bool) -> None: if self.__last_batch_time == 0.0: self.__last_batch_time = time.time() else: - self.__metrics.timing("arroyo.strategies.run_task_with_multiprocessing.batch.build_and_submit_time", time.time() - self.__last_batch_time) + self.__metrics.timing("arroyo.strategies.run_task_with_multiprocessing.batch_build_and_submit_time", time.time() - self.__last_batch_time) self.__last_batch_time = time.time() def __forward_invalid_offsets(self) -> None: diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index 68e94301..12874e1f 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -99,5 +99,5 @@ # Number of messages dropped in the FilterStep strategy "arroyo.strategies.filter.dropped_messages", # The amount of time it takes for the multiprocessing batch to be built and submitted to subprocesses - "arroyo.strategies.run_task_with_multiprocessing.batch.build_and_submit_time" + "arroyo.strategies.run_task_with_multiprocessing.batch_build_and_submit_time" ]