Skip to content

Commit

Permalink
Only send Channel::Close once to client when upstream closes
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed May 9, 2024
1 parent 382a701 commit 3187589
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module AMQProxy
class Client
Log = ::Log.for(self)
getter credentials : Credentials
@channel_map = Hash(UInt16, UpstreamChannel).new
@channel_map = Hash(UInt16, UpstreamChannel?).new
@outgoing_frames = Channel(AMQ::Protocol::Frame).new(128)
@frame_max : UInt32
@channel_max : UInt16
Expand Down Expand Up @@ -50,16 +50,19 @@ module AMQProxy
end
write AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel)
when AMQ::Protocol::Frame::Channel::CloseOk
# CloseOk already sent in Upstream#read_loop
# CloseOk reply to server is already sent in Upstream#read_loop
@channel_map.delete(frame.channel)
when frame.channel.zero?
Log.error { "Unexpected connection frame: #{frame}" }
close_connection(540_u16, "NOT_IMPLEMENTED", frame)
else
src_channel = frame.channel
begin
upstream_channel = @channel_map[frame.channel]
upstream_channel.write(frame)
if upstream_channel = @channel_map[frame.channel]
upstream_channel.write(frame)
else
# Channel::Close is sent, waiting for CloseOk
end
rescue ex : Upstream::WriteError
close_channel(src_channel)
rescue KeyError
Expand Down Expand Up @@ -125,11 +128,12 @@ module AMQProxy

def close_channel(id)
write AMQ::Protocol::Frame::Channel::Close.new(id, 500_u16, "UPSTREAM_DISCONNECTED", 0_u16, 0_u16)
@channel_map[id] = nil
end

private def close_all_upstream_channels
@channel_map.each_value do |upstream_channel|
upstream_channel.unassign
upstream_channel.try &.unassign
rescue Upstream::WriteError
Log.debug { "Upstream write error while closing client's channels" }
next # Nothing to do
Expand Down

0 comments on commit 3187589

Please sign in to comment.