Skip to content

Commit

Permalink
Error writing to upstream (#161)
Browse files Browse the repository at this point in the history
* suppress some errors that shouldnt be logged.

* Fiber yield every 4096 frame to balance full throttle connections.

* Wait for Connection::CloseOk after upstream error

* dont error out if socket is already closed when client readloop ends

* Only send Channel::Close once to client when upstream closes

* Never reuse channels

For instance, when a client A sends Pub/Headers frames but not the Body
frame and then exits, then when the next client tries to publish the
upstream servers is not expected a Publish frame but is waiting for that
Body frame that will never come.

* Don't raise on Upstream Channel Close

* there are only active channels, none for reuse

---------

Co-authored-by: Carl Hörberg <[email protected]>
  • Loading branch information
viktorerlingsson and carlhoerberg authored May 9, 2024
1 parent 67356e5 commit 43079e4
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 59 deletions.
2 changes: 1 addition & 1 deletion src/amqproxy/channel_pool.cr
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ module AMQProxy
@lock.synchronize do
(@upstreams.size - 1).times do # leave at least one connection
u = @upstreams.pop
if u.active_channels.zero?
if u.channels.zero?
begin
u.close "Pooled connection closed due to inactivity"
rescue ex
Expand Down
37 changes: 22 additions & 15 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module AMQProxy
class Client
Log = ::Log.for(self)
getter credentials : Credentials
@channel_map = Hash(UInt16, UpstreamChannel).new
@channel_map = Hash(UInt16, UpstreamChannel?).new
@outgoing_frames = Channel(AMQ::Protocol::Frame).new(128)
@frame_max : UInt32
@channel_max : UInt16
Expand All @@ -28,6 +28,7 @@ module AMQProxy
def read_loop(channel_pool, socket = @socket) # ameba:disable Metrics/CyclomaticComplexity
Log.context.set(remote_address: socket.remote_address.to_s)
Log.debug { "Connected" }
i = 0u64
socket.read_timeout = (@heartbeat / 2).ceil.seconds if @heartbeat > 0
loop do
case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian)
Expand All @@ -49,22 +50,32 @@ module AMQProxy
end
write AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel)
when AMQ::Protocol::Frame::Channel::CloseOk
# CloseOk already sent in Upstream#read_loop
# CloseOk reply to server is already sent in Upstream#read_loop
@channel_map.delete(frame.channel)
when frame.channel.zero?
Log.error { "Unexpected connection frame: #{frame}" }
close_connection(540_u16, "NOT_IMPLEMENTED", frame)
else
src_channel = frame.channel
begin
upstream_channel = @channel_map[frame.channel]
upstream_channel.write(frame)
if upstream_channel = @channel_map[frame.channel]
upstream_channel.write(frame)
else
# Channel::Close is sent, waiting for CloseOk
end
rescue ex : Upstream::WriteError
close_channel(src_channel)
rescue KeyError
close_connection(504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open", frame)
end
end
Fiber.yield if (i &+= 1) % 4096 == 0
rescue ex : Upstream::AccessError
Log.error { "Access refused, reason: #{ex.message}" }
close_connection(403_u16, ex.message || "ACCESS_REFUSED")
rescue ex : Upstream::Error
Log.error(exception: ex) { "Upstream error" }
close_connection(503_u16, "UPSTREAM_ERROR - #{ex.message}")
rescue IO::TimeoutError
time_since_last_heartbeat = (Time.monotonic - @last_heartbeat).total_seconds.to_i # ignore subsecond latency
if time_since_last_heartbeat <= 1 + @heartbeat # add 1s grace because of rounding
Expand All @@ -75,16 +86,8 @@ module AMQProxy
return
end
end
rescue ex : IO::EOFError
Log.debug { "Disconnected" }
rescue ex : IO::Error
Log.error(exception: ex) { "IO error" } unless socket.closed?
rescue ex : Upstream::AccessError
Log.error { "Access refused, reason: #{ex.message}" }
close_connection(403_u16, ex.message || "ACCESS_REFUSED")
rescue ex : Upstream::Error
Log.error(exception: ex) { "Upstream error" }
close_connection(503_u16, "UPSTREAM_ERROR - #{ex.message}")
Log.debug { "Disconnected #{ex.inspect}" }
else
Log.debug { "Disconnected" }
ensure
Expand All @@ -100,7 +103,7 @@ module AMQProxy
break if frame.is_a? AMQ::Protocol::Frame::Connection::CloseOk
end
rescue ex : IO::Error
raise ex unless socket.closed?
# Client closed connection, suppress error
ensure
@outgoing_frames.close
socket.close rescue nil
Expand All @@ -110,6 +113,8 @@ module AMQProxy
# Send frame to client, channel id should already be remapped by the caller
def write(frame : AMQ::Protocol::Frame)
@outgoing_frames.send frame
rescue Channel::ClosedError
# do nothing
end

def close_connection(code, text, frame = nil)
Expand All @@ -123,12 +128,14 @@ module AMQProxy

def close_channel(id)
write AMQ::Protocol::Frame::Channel::Close.new(id, 500_u16, "UPSTREAM_DISCONNECTED", 0_u16, 0_u16)
@channel_map[id] = nil
end

private def close_all_upstream_channels
@channel_map.each_value do |upstream_channel|
upstream_channel.unassign
upstream_channel.try &.unassign
rescue Upstream::WriteError
Log.debug { "Upstream write error while closing client's channels" }
next # Nothing to do
end
@channel_map.clear
Expand Down
3 changes: 2 additions & 1 deletion src/amqproxy/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ module AMQProxy
end
rescue ex # only raise from constructor, when negotating
Log.debug { "Client connection failure (#{remote_address}) #{ex.inspect}" }
socket.close
ensure
socket.close rescue nil
end

private def active_client(client, &)
Expand Down
55 changes: 13 additions & 42 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ module AMQProxy
class Upstream
Log = ::Log.for(self)
@socket : IO
@unsafe_channels = Set(UInt16).new
@channels = Hash(UInt16, DownstreamChannel?).new
@channels = Hash(UInt16, DownstreamChannel).new
@channels_lock = Mutex.new
@channel_max : UInt16
@lock = Mutex.new
Expand Down Expand Up @@ -38,14 +37,7 @@ module AMQProxy
def open_channel_for(downstream_channel : DownstreamChannel) : UpstreamChannel
@channels_lock.synchronize do
1_u16.upto(@channel_max) do |i|
if @channels.has_key?(i)
if @channels[i].nil?
@channels[i] = downstream_channel
return UpstreamChannel.new(self, i) # reuse
else
next # in use
end
else
unless @channels.has_key?(i)
@channels[i] = downstream_channel
send AMQ::Protocol::Frame::Channel::Open.new(i)
return UpstreamChannel.new(self, i)
Expand All @@ -56,27 +48,21 @@ module AMQProxy
end

def unassign_channel(channel : UInt16)
@channels_lock.synchronize do
if @unsafe_channels.delete channel
send AMQ::Protocol::Frame::Channel::Close.new(channel, 0u16, "", 0u16, 0u16)
@channels.delete channel
elsif @channels.has_key? channel
@channels[channel] = nil # keep for reuse
end
if @channels_lock.synchronize { @channels.delete(channel) }
send AMQ::Protocol::Frame::Channel::Close.new(channel, 0u16, "", 0u16, 0u16)
end
rescue ex : IO::Error | OpenSSL::SSL::Error
Log.debug(exception: ex) { "Error while closing upstream channel #{channel}" }
end

def channels
@channels.size
end

def active_channels
@channels.count { |_, v| !v.nil? }
end

# Frames from upstream (to client)
def read_loop(socket = @socket) # ameba:disable Metrics/CyclomaticComplexity
Log.context.set(remote_address: @remote_address)
i = 0u64
loop do
case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian)
when AMQ::Protocol::Frame::Heartbeat then send frame
Expand All @@ -95,7 +81,6 @@ module AMQProxy
when AMQ::Protocol::Frame::Channel::CloseOk # when channel pool requested channel close
when AMQ::Protocol::Frame::Channel::Close
send AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel)
@unsafe_channels.delete(frame.channel)
if downstream_channel = @channels.delete(frame.channel)
downstream_channel.write(frame)
end
Expand All @@ -108,9 +93,10 @@ module AMQProxy
"DOWNSTREAM_DISCONNECTED", 0_u16, 0_u16)
end
end
Fiber.yield if (i &+= 1) % 4096 == 0
end
rescue ex : IO::Error | OpenSSL::SSL::Error
Log.error(exception: ex) { "Error reading from upstream" } unless socket.closed?
Log.info { "Connection error #{ex.inspect}" } unless socket.closed?
ensure
socket.close rescue nil
close_all_client_channels
Expand All @@ -121,16 +107,12 @@ module AMQProxy
end

private def close_all_client_channels
Log.debug { "Closing all client channels for closed upstream" }
@channels_lock.synchronize do
cnt = 0
return if @channels.empty?
Log.debug { "Upstream connection closed, closing #{@channels.size} client channels" }
@channels.each_value do |downstream_channel|
if dch = downstream_channel
dch.close
cnt += 1
end
downstream_channel.close
end
Log.debug { "Upstream connection closed, closing #{cnt} client channels" } unless cnt.zero?
@channels.clear
end
end
Expand All @@ -140,9 +122,7 @@ module AMQProxy
clients = Set(Client).new
@channels_lock.synchronize do
@channels.each_value do |downstream_channel|
if dc = downstream_channel
clients << dc.client
end
clients << downstream_channel.client
end
end
clients.each do |client|
Expand All @@ -153,19 +133,10 @@ module AMQProxy
# Forward frames from client to upstream
def write(frame : AMQ::Protocol::Frame) : Nil
case frame
when AMQ::Protocol::Frame::Basic::Publish,
AMQ::Protocol::Frame::Basic::Qos
when AMQ::Protocol::Frame::Basic::Get
@unsafe_channels.add(frame.channel) unless frame.no_ack
when AMQ::Protocol::Frame::Basic,
AMQ::Protocol::Frame::Confirm,
AMQ::Protocol::Frame::Tx
@unsafe_channels.add(frame.channel)
when AMQ::Protocol::Frame::Connection
raise "Connection frames should not be sent through here: #{frame}"
when AMQ::Protocol::Frame::Channel::CloseOk # when upstream server requested a channel close and client confirmed
@channels_lock.synchronize do
@unsafe_channels.delete(frame.channel)
@channels.delete(frame.channel)
end
when AMQ::Protocol::Frame::Channel
Expand Down

0 comments on commit 43079e4

Please sign in to comment.