Skip to content

Commit ad71509

Browse files
committed
fix(network): adjust number of permits if the request is huge (#2294)
* refactor: use only one semaphore to limit the queues requests size Signed-off-by: Ning Yu <[email protected]> * fix(network): adjust number of permits if the request is huge Signed-off-by: Ning Yu <[email protected]> --------- Signed-off-by: Ning Yu <[email protected]>
1 parent 513e584 commit ad71509

File tree

1 file changed

+5
-12
lines changed

1 file changed

+5
-12
lines changed

core/src/main/scala/kafka/network/RequestChannel.scala

+5-12
Original file line numberDiff line numberDiff line change
@@ -365,13 +365,13 @@ class RequestChannel(val queueSize: Int,
365365
// AutoMQ inject start
366366
/**
367367
* Queue of requests to be handled, in the order they arrived.
368-
* Note: Before any request enters this queue, it needs to acquire {@link multiQueuedRequestSizeSemaphore}
368+
* Note: Before any request enters this queue, it needs to acquire {@link queuedRequestSizeSemaphore}
369369
*/
370370
private val multiRequestQueue = new java.util.ArrayList[ArrayBlockingQueue[BaseRequest]]()
371371
/**
372372
* Semaphore to limit the total size of requests in the {@link multiRequestQueue}.
373373
*/
374-
private val multiQueuedRequestSizeSemaphore = new java.util.ArrayList[Semaphore]()
374+
private val queuedRequestSizeSemaphore = new Semaphore(queuedRequestSize)
375375
private val availableRequestSizeMetricName = metricNamePrefix.concat(AvailableRequestSizeMetric)
376376
// AutoMQ inject end
377377
private val processors = new ConcurrentHashMap[Int, Processor]()
@@ -390,7 +390,7 @@ class RequestChannel(val queueSize: Int,
390390
}
391391
})
392392
metricsGroup.newGauge(availableRequestSizeMetricName, () => {
393-
multiQueuedRequestSizeSemaphore.stream().mapToInt(s => s.availablePermits()).sum()
393+
queuedRequestSizeSemaphore.availablePermits()
394394
})
395395

396396
def this(queueSize: Int, metricNamePrefix: String, time: Time, metrics: RequestChannel.Metrics) {
@@ -415,13 +415,8 @@ class RequestChannel(val queueSize: Int,
415415
// AutoMQ inject start
416416
def registerNRequestHandler(count: Int): Unit = {
417417
val queueSize = math.max(this.queueSize / count, 1)
418-
// TODO: maxQueuedRequestSize will be 100 / 8 = 12.5 MiB as a default.
419-
// However, if the request size is too large, it will block at the semaphore.
420-
// Currently, the max request size is 1 MiB (max.request.size) by default, so it is not very problematic.
421-
val maxQueuedRequestSize = math.max(this.queuedRequestSize / count, 10 * 1024 * 1024)
422418
for (_ <- 0 until count) {
423419
multiRequestQueue.add(new ArrayBlockingQueue[BaseRequest](queueSize))
424-
multiQueuedRequestSizeSemaphore.add(new Semaphore(maxQueuedRequestSize))
425420
multiCallbackQueue.add(new ArrayBlockingQueue[BaseRequest](queueSize))
426421
}
427422
Collections.unmodifiableList(multiRequestQueue)
@@ -436,8 +431,7 @@ class RequestChannel(val queueSize: Int,
436431
/** Send a request to be handled, potentially blocking until there is room in the queue for the request */
437432
def sendRequest(request: RequestChannel.Request): Unit = {
438433
if (multiRequestQueue.size() != 0) {
439-
val requestSizeSemaphore = multiQueuedRequestSizeSemaphore.get(math.abs(request.context.connectionId.hashCode % multiQueuedRequestSizeSemaphore.size()))
440-
requestSizeSemaphore.acquire(request.sizeInBytes)
434+
queuedRequestSizeSemaphore.acquire(Math.min(request.sizeInBytes, queuedRequestSize))
441435
val requestQueue = multiRequestQueue.get(math.abs(request.context.connectionId.hashCode % multiRequestQueue.size()))
442436
requestQueue.put(request)
443437
} else {
@@ -544,7 +538,6 @@ class RequestChannel(val queueSize: Int,
544538
def receiveRequest(timeout: Long, id: Int): RequestChannel.BaseRequest = {
545539
val callbackQueue = multiCallbackQueue.get(id)
546540
val requestQueue = multiRequestQueue.get(id)
547-
val requestSizeSemaphore = multiQueuedRequestSizeSemaphore.get(id)
548541
val callbackRequest = callbackQueue.poll()
549542
if (callbackRequest != null)
550543
callbackRequest
@@ -553,7 +546,7 @@ class RequestChannel(val queueSize: Int,
553546
request match {
554547
case WakeupRequest => callbackQueue.poll()
555548
case request: Request =>
556-
requestSizeSemaphore.release(request.sizeInBytes)
549+
queuedRequestSizeSemaphore.release(Math.min(request.sizeInBytes, queuedRequestSize))
557550
request
558551
case _ => request
559552
}

0 commit comments

Comments
 (0)