[m3msg] Support parallel consumerWriter Flushes to pick next Write #4331
+707
−146
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What this PR does / why we need it:
When m3msg writes to multiple replicas of the consumer services shards, it does so in a serial fashion, blocking if required. Such blocking results in unnecessary latencies and queue build up in the producer service. It does this to avoid sending a msg to all replicas each time since that would increase the network usage significantly but the tradeoff is that the other consumer writer for a replica might have enough room in the send buffer to accommodate the msg write. This PR introduces support for ForcedFlushes to avoid increasing network usage but at the same time choose a consumer writer we know will not block.
This is done by invoking a ForcedFlush() concurrently on all consumerWriter replicas and then using the consumerWriter that returned first. A Flush() does not write any new data to the connection but only flushes the data that it has buffered so far. This way no additional data is introduced on the wire that would not have been previously written.
Details:
A consumerWriter corresponds to one instance of the downstream service. Since an instance owns multiple shards of the m3msg topic, a consumerWriter will multiplex messages from multiple shards onto the same consumerWriter.
This can quickly fill up the flush buffer of a consumerWriter. When the consuming service is slow (for example when it is starting up and warming up its internal cache etc) it might not ACK the messages in time causing a build-up of messages in the m3msg producer msg queue. This leads to elevated consume latencies and memory pressure issues and possibly OOMs. The existing method of picking a consumerWriter was random which would potentially pick the slower one.
With this change we will pick the consumerWriter that has the most available capacity in its flush buffer. Not only that but we will initiate a ForcedFlush on all replicas in parallel and then pick the one that completed first. Note that this could still result in picking a consumerWriter that does not have enough capacity since other goroutines are also queueing up into the same consumerWriter but it at least picks a consumerWriter that was able to clear its flush buffer quickly. We emit a metric called "forced-flush-not-enough-buffer" that tracks how many times we encounter the scenario where we just returned from a flush but there still isn't enough capacity in the consumerWriter to accommodate the upcoming write(). In this case we should tune the WriteBufferSize() via the config.
Fixes #
Special notes for your reviewer:
Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?: