Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Transient endpoints and returning un-routed messages #101

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions lib/Endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,28 @@ class Endpoint {
let consumeResult

try {
const worker = await this._remit._workers.acquire()

try {
await worker.assertQueue(this._options.queue, {
exclusive: false,
durable: true,
autoDelete: true,
maxPriority: 10
})

this._remit._workers.release(worker)
} catch (e) {
this._remit._workers.destroy(worker)
throw e
}

await this._consumer.bindQueue(
this._options.queue,
this._remit._exchange,
this._options.event
)

consumeResult = await this._consumer.consume(
this._options.queue,
this._remit._namespace.bind(this._incoming.bind(this)),
Expand Down Expand Up @@ -225,22 +247,6 @@ class Endpoint {
this._starting = true

try {
const worker = await this._remit._workers.acquire()

try {
await worker.assertQueue(queue, {
exclusive: false,
durable: true,
autoDelete: false,
maxPriority: 10
})

this._remit._workers.release(worker)
} catch (e) {
delete this._starting
this._remit._workers.destroy(worker)
throw e
}

const connection = await this._remit._connection
this._consumer = await connection.createChannel()
Expand All @@ -253,12 +259,6 @@ class Endpoint {
this._consumer.prefetch(prefetch, true)
}

await this._consumer.bindQueue(
queue,
this._remit._exchange,
event
)

await this.resume()
delete this._starting

Expand Down
11 changes: 10 additions & 1 deletion lib/Remit.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const Listener = require('./Listener')
const Request = require('./Request')
const Emitter = require('./Emitter')
const { createNamespace } = require('cls-hooked')
const parseEvent = require('../utils/parseEvent')

class Remit {
constructor (options = {}) {
Expand Down Expand Up @@ -79,9 +80,17 @@ class Remit {
return connection
}

async _incoming (message) {
async _incoming (isReturn, message) {
if (!message) {
await throwAsException(new Error('Request reply consumer cancelled unexpectedly; this was most probably done via RabbitMQ\'s management panel'))
}

if (isReturn) {
return this._emitter.emit(`return-${message.properties.correlationId}`, message, {
event: parseEvent(message.properties, message.fields),
code: 'no_route',
message: `Request could not be routed to any endpoints. This signifies no matching endpoints are currently running.`
})
}

try {
Expand Down
7 changes: 4 additions & 3 deletions lib/Request.js
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,12 @@ class Request extends CallableInstance {
publishChannel.on('error', console.error)
publishChannel.on('close', () => {
throwAsException(new Error('Reply consumer died - this is most likely due to the RabbitMQ connection dying'))
})
})
publishChannel.on('return', this._remit._namespace.bind(this._remit._incoming.bind(this._remit, true)))

await publishChannel.consume(
'amq.rabbitmq.reply-to',
this._remit._namespace.bind(this._remit._incoming.bind(this._remit)),
this._remit._namespace.bind(this._remit._incoming.bind(this._remit, false)),
{
noAck: true,
exclusive: true
Expand All @@ -216,7 +217,7 @@ class Request extends CallableInstance {
}

_waitForResult (messageId, span) {
const types = ['data', 'timeout']
const types = ['data', 'timeout', 'return']

return new Promise((resolve, reject) => {
const cleanUp = (message, err, result) => {
Expand Down
8 changes: 4 additions & 4 deletions test/endpoint.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ describe('Endpoint', function () {
} catch (e) {
errorCaught = true
expect(e).to.be.an('object')
expect(e).to.have.property('code', 'request_timedout')
expect(e).to.have.property('message', 'Request timed out after no response for 2000ms')
expect(e).to.have.property('code', 'no_route')
expect(e).to.have.property('message', 'Request could not be routed to any endpoints. This signifies no matching endpoints are currently running.')
}

expect(errorCaught).to.equal(true)
Expand Down Expand Up @@ -555,8 +555,8 @@ describe('Endpoint', function () {
} catch (e) {
errorCaught = true
expect(e).to.be.an('object')
expect(e).to.have.property('code', 'request_timedout')
expect(e).to.have.property('message', 'Request timed out after no response for 2000ms')
expect(e).to.have.property('code', 'no_route')
expect(e).to.have.property('message', 'Request could not be routed to any endpoints. This signifies no matching endpoints are currently running.')
}

expect(errorCaught).to.equal(true)
Expand Down
6 changes: 5 additions & 1 deletion test/request.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@ describe('Request', function () {
this.slow(2000)

const request = remit.request('timeout-test')
request.options({timeout: 1000})
request.options({timeout: 1000})

await remit.endpoint('timeout-test')
.handler((e, cb) => setTimeout(cb, 3000))
.start()

try {
await request()
Expand Down