Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metric(multiprocessing): Add metric to track time it takes to prepare and submit batch #352

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,8 @@ def __init__(

self.__closed = False

self.__last_batch_time = 0.0

def handle_sigchld(signum: int, frame: Any) -> None:
# Terminates the consumer if any child process of the
# consumer is terminated.
Expand All @@ -585,6 +587,7 @@ def handle_sigchld(signum: int, frame: Any) -> None:

def __submit_batch(self, input_block_too_small: bool) -> None:
assert self.__batch_builder is not None

batch = self.__batch_builder.build()
logger.debug("Submitting %r to %r...", batch, self.__pool)
self.__processes.append(
Expand All @@ -608,6 +611,11 @@ def __submit_batch(self, input_block_too_small: bool) -> None:
)
self.__batch_builder = None

if self.__last_batch_time == 0.0:
self.__last_batch_time = time.time()
else:
self.__metrics.timing("arroyo.strategies.run_task_with_multiprocessing.batch.build_and_submit_time", time.time() - self.__last_batch_time)
Copy link
Member

@nikhars nikhars Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you also need to reset self.__last_batch_time to 0 here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops forgot that line. I think we need to reset it to the current time (so that we have the time that the previous batch was submitted)


def __forward_invalid_offsets(self) -> None:
if len(self.__invalid_messages):
self.__next_step.poll()
Expand Down
2 changes: 2 additions & 0 deletions arroyo/utils/metric_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,6 @@
"arroyo.processing.strategies.healthcheck.touch",
# Number of messages dropped in the FilterStep strategy
"arroyo.strategies.filter.dropped_messages",
# The amount of time it takes for the multiprocessing batch to be built and submitted to subprocesses
"arroyo.strategies.run_task_with_multiprocessing.batch.build_and_submit_time"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"arroyo.strategies.run_task_with_multiprocessing.batch.build_and_submit_time"
"arroyo.strategies.run_task_with_multiprocessing.batch_build_and_submit_time"

]
Loading