diff --git a/src/amqp-channel.ts b/src/amqp-channel.ts index dfd3a46..5b3e722 100644 --- a/src/amqp-channel.ts +++ b/src/amqp-channel.ts @@ -338,12 +338,13 @@ export class AMQPChannel { buffer.setUint8(j, 206); j += 1 // frame end byte buffer.setUint32(headerStart + 3, j - headerStart - 8) // update frameSize + let lastFrame // Send current frames if there's no body to send if (body.byteLength === 0) { - await this.connection.send(new Uint8Array(buffer.buffer, 0, j)) + lastFrame = this.connection.send(new Uint8Array(buffer.buffer, 0, j)) } else if (j >= buffer.byteLength - 8) { // Send current frames if a body frame can't fit in the rest of the frame buffer - await this.connection.send(new Uint8Array(buffer.buffer, 0, j)) + lastFrame = this.connection.send(new Uint8Array(buffer.buffer, 0, j)) j = 0 } @@ -358,10 +359,11 @@ export class AMQPChannel { const bodyView = new Uint8Array(buffer.buffer, j, frameSize) bodyView.set(dataSlice); j += frameSize // body content buffer.setUint8(j, 206); j += 1 // frame end byte - await this.connection.send(new Uint8Array(buffer.buffer, 0, j)) + lastFrame = this.connection.send(new Uint8Array(buffer.buffer, 0, j)) bodyPos += frameSize j = 0 } + await lastFrame // buffer all frames and only wait for the last as RabbitMQ requires all publish frames to be sent together this.connection.bufferPool.push(buffer) // return buffer to buffer pool for later reuse // if publish confirm is enabled, put a promise on a queue if the sends were ok // the promise on the queue will be fullfilled by the read loop when an ack/nack