From 55e91016a4a9ddbefd25695abd1179ee897f61e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Mon, 16 Sep 2024 14:44:07 +0200 Subject: [PATCH] Always buffer all publish framed together Even if the user doesn't await basicPublish all framed beloging to one publish should be published together. So intead of waiting for a potentially blocked socket to be drain enqueue all data and only await for the last sent frame. Fixes #49 --- src/amqp-channel.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/amqp-channel.ts b/src/amqp-channel.ts index dfd3a46b..5b3e7222 100644 --- a/src/amqp-channel.ts +++ b/src/amqp-channel.ts @@ -338,12 +338,13 @@ export class AMQPChannel { buffer.setUint8(j, 206); j += 1 // frame end byte buffer.setUint32(headerStart + 3, j - headerStart - 8) // update frameSize + let lastFrame // Send current frames if there's no body to send if (body.byteLength === 0) { - await this.connection.send(new Uint8Array(buffer.buffer, 0, j)) + lastFrame = this.connection.send(new Uint8Array(buffer.buffer, 0, j)) } else if (j >= buffer.byteLength - 8) { // Send current frames if a body frame can't fit in the rest of the frame buffer - await this.connection.send(new Uint8Array(buffer.buffer, 0, j)) + lastFrame = this.connection.send(new Uint8Array(buffer.buffer, 0, j)) j = 0 } @@ -358,10 +359,11 @@ export class AMQPChannel { const bodyView = new Uint8Array(buffer.buffer, j, frameSize) bodyView.set(dataSlice); j += frameSize // body content buffer.setUint8(j, 206); j += 1 // frame end byte - await this.connection.send(new Uint8Array(buffer.buffer, 0, j)) + lastFrame = this.connection.send(new Uint8Array(buffer.buffer, 0, j)) bodyPos += frameSize j = 0 } + await lastFrame // buffer all frames and only wait for the last as RabbitMQ requires all publish frames to be sent together this.connection.bufferPool.push(buffer) // return buffer to buffer pool for later reuse // if publish confirm is enabled, put a promise on a queue if the sends were ok // the promise on the queue will be fullfilled by the read loop when an ack/nack