diff --git a/src/amqproxy/channel_pool.cr b/src/amqproxy/channel_pool.cr index 18e2f77..c2ff973 100644 --- a/src/amqproxy/channel_pool.cr +++ b/src/amqproxy/channel_pool.cr @@ -70,7 +70,7 @@ module AMQProxy @lock.synchronize do (@upstreams.size - 1).times do # leave at least one connection u = @upstreams.pop - if u.active_channels.zero? + if u.channels.zero? begin u.close "Pooled connection closed due to inactivity" rescue ex diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index a08ee00..6399afb 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -8,7 +8,7 @@ module AMQProxy class Client Log = ::Log.for(self) getter credentials : Credentials - @channel_map = Hash(UInt16, UpstreamChannel).new + @channel_map = Hash(UInt16, UpstreamChannel?).new @outgoing_frames = Channel(AMQ::Protocol::Frame).new(128) @frame_max : UInt32 @channel_max : UInt16 @@ -28,6 +28,7 @@ module AMQProxy def read_loop(channel_pool, socket = @socket) # ameba:disable Metrics/CyclomaticComplexity Log.context.set(remote_address: socket.remote_address.to_s) Log.debug { "Connected" } + i = 0u64 socket.read_timeout = (@heartbeat / 2).ceil.seconds if @heartbeat > 0 loop do case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) @@ -49,7 +50,7 @@ module AMQProxy end write AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel) when AMQ::Protocol::Frame::Channel::CloseOk - # CloseOk already sent in Upstream#read_loop + # CloseOk reply to server is already sent in Upstream#read_loop @channel_map.delete(frame.channel) when frame.channel.zero? Log.error { "Unexpected connection frame: #{frame}" } @@ -57,14 +58,24 @@ module AMQProxy else src_channel = frame.channel begin - upstream_channel = @channel_map[frame.channel] - upstream_channel.write(frame) + if upstream_channel = @channel_map[frame.channel] + upstream_channel.write(frame) + else + # Channel::Close is sent, waiting for CloseOk + end rescue ex : Upstream::WriteError close_channel(src_channel) rescue KeyError close_connection(504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open", frame) end end + Fiber.yield if (i &+= 1) % 4096 == 0 + rescue ex : Upstream::AccessError + Log.error { "Access refused, reason: #{ex.message}" } + close_connection(403_u16, ex.message || "ACCESS_REFUSED") + rescue ex : Upstream::Error + Log.error(exception: ex) { "Upstream error" } + close_connection(503_u16, "UPSTREAM_ERROR - #{ex.message}") rescue IO::TimeoutError time_since_last_heartbeat = (Time.monotonic - @last_heartbeat).total_seconds.to_i # ignore subsecond latency if time_since_last_heartbeat <= 1 + @heartbeat # add 1s grace because of rounding @@ -75,16 +86,8 @@ module AMQProxy return end end - rescue ex : IO::EOFError - Log.debug { "Disconnected" } rescue ex : IO::Error - Log.error(exception: ex) { "IO error" } unless socket.closed? - rescue ex : Upstream::AccessError - Log.error { "Access refused, reason: #{ex.message}" } - close_connection(403_u16, ex.message || "ACCESS_REFUSED") - rescue ex : Upstream::Error - Log.error(exception: ex) { "Upstream error" } - close_connection(503_u16, "UPSTREAM_ERROR - #{ex.message}") + Log.debug { "Disconnected #{ex.inspect}" } else Log.debug { "Disconnected" } ensure @@ -100,7 +103,7 @@ module AMQProxy break if frame.is_a? AMQ::Protocol::Frame::Connection::CloseOk end rescue ex : IO::Error - raise ex unless socket.closed? + # Client closed connection, suppress error ensure @outgoing_frames.close socket.close rescue nil @@ -110,6 +113,8 @@ module AMQProxy # Send frame to client, channel id should already be remapped by the caller def write(frame : AMQ::Protocol::Frame) @outgoing_frames.send frame + rescue Channel::ClosedError + # do nothing end def close_connection(code, text, frame = nil) @@ -123,12 +128,14 @@ module AMQProxy def close_channel(id) write AMQ::Protocol::Frame::Channel::Close.new(id, 500_u16, "UPSTREAM_DISCONNECTED", 0_u16, 0_u16) + @channel_map[id] = nil end private def close_all_upstream_channels @channel_map.each_value do |upstream_channel| - upstream_channel.unassign + upstream_channel.try &.unassign rescue Upstream::WriteError + Log.debug { "Upstream write error while closing client's channels" } next # Nothing to do end @channel_map.clear diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index 50c3d76..bc95f54 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -67,7 +67,8 @@ module AMQProxy end rescue ex # only raise from constructor, when negotating Log.debug { "Client connection failure (#{remote_address}) #{ex.inspect}" } - socket.close + ensure + socket.close rescue nil end private def active_client(client, &) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 2a26f1a..0ecf49f 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -8,8 +8,7 @@ module AMQProxy class Upstream Log = ::Log.for(self) @socket : IO - @unsafe_channels = Set(UInt16).new - @channels = Hash(UInt16, DownstreamChannel?).new + @channels = Hash(UInt16, DownstreamChannel).new @channels_lock = Mutex.new @channel_max : UInt16 @lock = Mutex.new @@ -38,14 +37,7 @@ module AMQProxy def open_channel_for(downstream_channel : DownstreamChannel) : UpstreamChannel @channels_lock.synchronize do 1_u16.upto(@channel_max) do |i| - if @channels.has_key?(i) - if @channels[i].nil? - @channels[i] = downstream_channel - return UpstreamChannel.new(self, i) # reuse - else - next # in use - end - else + unless @channels.has_key?(i) @channels[i] = downstream_channel send AMQ::Protocol::Frame::Channel::Open.new(i) return UpstreamChannel.new(self, i) @@ -56,27 +48,21 @@ module AMQProxy end def unassign_channel(channel : UInt16) - @channels_lock.synchronize do - if @unsafe_channels.delete channel - send AMQ::Protocol::Frame::Channel::Close.new(channel, 0u16, "", 0u16, 0u16) - @channels.delete channel - elsif @channels.has_key? channel - @channels[channel] = nil # keep for reuse - end + if @channels_lock.synchronize { @channels.delete(channel) } + send AMQ::Protocol::Frame::Channel::Close.new(channel, 0u16, "", 0u16, 0u16) end + rescue ex : IO::Error | OpenSSL::SSL::Error + Log.debug(exception: ex) { "Error while closing upstream channel #{channel}" } end def channels @channels.size end - def active_channels - @channels.count { |_, v| !v.nil? } - end - # Frames from upstream (to client) def read_loop(socket = @socket) # ameba:disable Metrics/CyclomaticComplexity Log.context.set(remote_address: @remote_address) + i = 0u64 loop do case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) when AMQ::Protocol::Frame::Heartbeat then send frame @@ -95,7 +81,6 @@ module AMQProxy when AMQ::Protocol::Frame::Channel::CloseOk # when channel pool requested channel close when AMQ::Protocol::Frame::Channel::Close send AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel) - @unsafe_channels.delete(frame.channel) if downstream_channel = @channels.delete(frame.channel) downstream_channel.write(frame) end @@ -108,9 +93,10 @@ module AMQProxy "DOWNSTREAM_DISCONNECTED", 0_u16, 0_u16) end end + Fiber.yield if (i &+= 1) % 4096 == 0 end rescue ex : IO::Error | OpenSSL::SSL::Error - Log.error(exception: ex) { "Error reading from upstream" } unless socket.closed? + Log.info { "Connection error #{ex.inspect}" } unless socket.closed? ensure socket.close rescue nil close_all_client_channels @@ -121,16 +107,12 @@ module AMQProxy end private def close_all_client_channels - Log.debug { "Closing all client channels for closed upstream" } @channels_lock.synchronize do - cnt = 0 + return if @channels.empty? + Log.debug { "Upstream connection closed, closing #{@channels.size} client channels" } @channels.each_value do |downstream_channel| - if dch = downstream_channel - dch.close - cnt += 1 - end + downstream_channel.close end - Log.debug { "Upstream connection closed, closing #{cnt} client channels" } unless cnt.zero? @channels.clear end end @@ -140,9 +122,7 @@ module AMQProxy clients = Set(Client).new @channels_lock.synchronize do @channels.each_value do |downstream_channel| - if dc = downstream_channel - clients << dc.client - end + clients << downstream_channel.client end end clients.each do |client| @@ -153,19 +133,10 @@ module AMQProxy # Forward frames from client to upstream def write(frame : AMQ::Protocol::Frame) : Nil case frame - when AMQ::Protocol::Frame::Basic::Publish, - AMQ::Protocol::Frame::Basic::Qos - when AMQ::Protocol::Frame::Basic::Get - @unsafe_channels.add(frame.channel) unless frame.no_ack - when AMQ::Protocol::Frame::Basic, - AMQ::Protocol::Frame::Confirm, - AMQ::Protocol::Frame::Tx - @unsafe_channels.add(frame.channel) when AMQ::Protocol::Frame::Connection raise "Connection frames should not be sent through here: #{frame}" when AMQ::Protocol::Frame::Channel::CloseOk # when upstream server requested a channel close and client confirmed @channels_lock.synchronize do - @unsafe_channels.delete(frame.channel) @channels.delete(frame.channel) end when AMQ::Protocol::Frame::Channel