diff --git a/shard.lock b/shard.lock index 434be48..951b52d 100644 --- a/shard.lock +++ b/shard.lock @@ -10,5 +10,5 @@ shards: amqp-client: git: https://github.com/cloudamqp/amqp-client.cr.git - version: 1.2.1 + version: 1.2.2 diff --git a/spec/amqproxy_spec.cr b/spec/amqproxy_spec.cr index 07a8814..9daff74 100644 --- a/spec/amqproxy_spec.cr +++ b/spec/amqproxy_spec.cr @@ -201,4 +201,28 @@ describe AMQProxy::Server do s.stop_accepting_clients end end + + it "passes connection blocked frames to clients" do + s = AMQProxy::Server.new("127.0.0.1", 5672, false) + done = Channel(Nil).new + begin + spawn { s.listen("127.0.0.1", 5673) } + Fiber.yield + AMQP::Client.start("amqp://localhost:5673") do |conn| + conn.on_blocked do + done.send nil + system("#{MAYBE_SUDO}rabbitmqctl set_vm_memory_high_watermark 0.8 > /dev/null").should be_true + end + conn.on_unblocked do + done.send nil + end + ch = conn.channel + system("#{MAYBE_SUDO}rabbitmqctl set_vm_memory_high_watermark 0.001 > /dev/null").should be_true + ch.basic_publish "foobar", "amq.fanout" + 2.times { done.receive } + end + ensure + s.stop_accepting_clients + end + end end diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index 9534f3c..208244f 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -223,7 +223,7 @@ module AMQProxy capabilities: { consumer_priorities: true, exchange_exchange_bindings: true, - "connection.blocked": false, + "connection.blocked": true, authentication_failure_close: true, per_consumer_qos: true, "basic.nack": true, diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 44e3c09..a483aa3 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -74,7 +74,7 @@ module AMQProxy end # Frames from upstream (to client) - private def read_loop(socket, remote_address : String) + private def read_loop(socket, remote_address : String) # ameba:disable Metrics/CyclomaticComplexity Log.context.set(remote_address: remote_address) loop do case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) @@ -87,6 +87,9 @@ module AMQProxy end return when AMQ::Protocol::Frame::Connection::CloseOk then return + when AMQ::Protocol::Frame::Connection::Blocked, + AMQ::Protocol::Frame::Connection::Unblocked + send_to_all_clients(frame) when AMQ::Protocol::Frame::Channel::OpenOk # we assume it always succeeds when AMQ::Protocol::Frame::Channel::CloseOk # when channel pool requested channel close else @@ -125,6 +128,21 @@ module AMQProxy end end + private def send_to_all_clients(frame : AMQ::Protocol::Frame::Connection) + Log.debug { "Sending broadcast frame to all client connections" } + clients = Set(Client).new + @channels_lock.synchronize do + @channels.each_value do |downstream_channel| + if dc = downstream_channel + clients << dc.client + end + end + end + clients.each do |client| + client.write frame + end + end + # Forward frames from client to upstream def write(frame : AMQ::Protocol::Frame) : Nil case frame @@ -232,7 +250,7 @@ module AMQProxy capabilities: { consumer_priorities: true, exchange_exchange_bindings: true, - "connection.blocked": false, + "connection.blocked": true, authentication_failure_close: true, per_consumer_qos: true, "basic.nack": true,