From affc41dc4e8cbdaff6cc11dad16395dd342d06f2 Mon Sep 17 00:00:00 2001 From: Kristjan Tammekivi Date: Thu, 12 Sep 2024 17:13:19 +0300 Subject: [PATCH] fix: properly reject when socket ends during connection --- src/amqp-queue.ts | 2 +- src/amqp-socket-client.ts | 23 +++++++++++++++-------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/amqp-queue.ts b/src/amqp-queue.ts index 10aa265e..0996badb 100644 --- a/src/amqp-queue.ts +++ b/src/amqp-queue.ts @@ -4,7 +4,7 @@ import type { AMQPProperties } from './amqp-properties.js' import type { AMQPConsumer } from './amqp-consumer.js' /** - * Convience class for queues + * Convenience class for queues */ export class AMQPQueue { readonly channel: AMQPChannel diff --git a/src/amqp-socket-client.ts b/src/amqp-socket-client.ts index f9a252f8..55e1284c 100644 --- a/src/amqp-socket-client.ts +++ b/src/amqp-socket-client.ts @@ -48,7 +48,21 @@ export class AMQPClient extends AMQPBaseClient { } override connect(): Promise { + let rejectConnection: (reason: Error) => void const socket = this.connectSocket() + socket.on('connect', () => { + socket.on('error', (err) => this.onerror(new AMQPError(err.message, this))); + socket.on('end', () => { + if (rejectConnection) { + rejectConnection(new AMQPError('Connection ended', this)) + } + }) + socket.on('close', (hadError: boolean) => { + const clientClosed = this.closed; + this.closed = true; + if (!hadError && !clientClosed) this.onerror(new AMQPError('Socket closed', this)); + }); + }); Object.defineProperty(this, 'socket', { value: socket, writable: true, @@ -59,6 +73,7 @@ export class AMQPClient extends AMQPBaseClient { // enable TCP keepalive if AMQP heartbeats are disabled if (this.heartbeat === 0) socket.setKeepAlive(true, 60) return new Promise((resolve, reject) => { + rejectConnection = reject; socket.on('timeout', () => reject(new AMQPError("timeout", this))) socket.on('error', (err) => reject(new AMQPError(err.message, this))) const onConnect = (conn: AMQPBaseClient) => { @@ -80,14 +95,6 @@ export class AMQPClient extends AMQPBaseClient { const sendStart = () => this.send(new Uint8Array([65, 77, 81, 80, 0, 0, 9, 1])) const conn = this.tls ? tls.connect(options, sendStart) : net.connect(options, sendStart) conn.on('data', this.onRead.bind(this)) - conn.on('connect', () => { - conn.on('error', (err) => this.onerror(new AMQPError(err.message, this))) - conn.on('close', (hadError: boolean) => { - const clientClosed = this.closed - this.closed = true - if (!hadError && !clientClosed) this.onerror(new AMQPError("Socket closed", this)) - }) - }) return conn }