Skip to content

Commit

Permalink
Never reuse channels
Browse files Browse the repository at this point in the history
For instance, when a client A sends Pub/Headers frames but not the Body
frame and then exits, then when the next client tries to publish the
upstream servers is not expected a Publish frame but is waiting for that
Body frame that will never come.
  • Loading branch information
carlhoerberg committed May 9, 2024
1 parent 3187589 commit c993ae7
Showing 1 changed file with 8 additions and 34 deletions.
42 changes: 8 additions & 34 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ module AMQProxy
class Upstream
Log = ::Log.for(self)
@socket : IO
@unsafe_channels = Set(UInt16).new
@channels = Hash(UInt16, DownstreamChannel?).new
@channels = Hash(UInt16, DownstreamChannel).new
@channels_lock = Mutex.new
@channel_max : UInt16
@lock = Mutex.new
Expand Down Expand Up @@ -38,14 +37,7 @@ module AMQProxy
def open_channel_for(downstream_channel : DownstreamChannel) : UpstreamChannel
@channels_lock.synchronize do
1_u16.upto(@channel_max) do |i|
if @channels.has_key?(i)
if @channels[i].nil?
@channels[i] = downstream_channel
return UpstreamChannel.new(self, i) # reuse
else
next # in use
end
else
unless @channels.has_key?(i)
@channels[i] = downstream_channel
send AMQ::Protocol::Frame::Channel::Open.new(i)
return UpstreamChannel.new(self, i)
Expand All @@ -57,11 +49,9 @@ module AMQProxy

def unassign_channel(channel : UInt16)
@channels_lock.synchronize do
if @unsafe_channels.delete channel
if @channels.has_key? channel
send AMQ::Protocol::Frame::Channel::Close.new(channel, 0u16, "", 0u16, 0u16)
@channels.delete channel
elsif @channels.has_key? channel
@channels[channel] = nil # keep for reuse
end
end
end
Expand All @@ -71,7 +61,7 @@ module AMQProxy
end

def active_channels
@channels.count { |_, v| !v.nil? }
@channels.size
end

# Frames from upstream (to client)
Expand All @@ -96,7 +86,6 @@ module AMQProxy
when AMQ::Protocol::Frame::Channel::CloseOk # when channel pool requested channel close
when AMQ::Protocol::Frame::Channel::Close
send AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel)
@unsafe_channels.delete(frame.channel)
if downstream_channel = @channels.delete(frame.channel)
downstream_channel.write(frame)
end
Expand All @@ -123,16 +112,12 @@ module AMQProxy
end

private def close_all_client_channels
Log.debug { "Closing all client channels for closed upstream" }
@channels_lock.synchronize do
cnt = 0
return if @channels.empty?
Log.debug { "Upstream connection closed, closing #{@channels.size} client channels" }
@channels.each_value do |downstream_channel|
if dch = downstream_channel
dch.close
cnt += 1
end
downstream_channel.close
end
Log.debug { "Upstream connection closed, closing #{cnt} client channels" } unless cnt.zero?
@channels.clear
end
end
Expand All @@ -142,9 +127,7 @@ module AMQProxy
clients = Set(Client).new
@channels_lock.synchronize do
@channels.each_value do |downstream_channel|
if dc = downstream_channel
clients << dc.client
end
clients << downstream_channel.client
end
end
clients.each do |client|
Expand All @@ -155,19 +138,10 @@ module AMQProxy
# Forward frames from client to upstream
def write(frame : AMQ::Protocol::Frame) : Nil
case frame
when AMQ::Protocol::Frame::Basic::Publish,
AMQ::Protocol::Frame::Basic::Qos
when AMQ::Protocol::Frame::Basic::Get
@unsafe_channels.add(frame.channel) unless frame.no_ack
when AMQ::Protocol::Frame::Basic,
AMQ::Protocol::Frame::Confirm,
AMQ::Protocol::Frame::Tx
@unsafe_channels.add(frame.channel)
when AMQ::Protocol::Frame::Connection
raise "Connection frames should not be sent through here: #{frame}"
when AMQ::Protocol::Frame::Channel::CloseOk # when upstream server requested a channel close and client confirmed
@channels_lock.synchronize do
@unsafe_channels.delete(frame.channel)
@channels.delete(frame.channel)
end
when AMQ::Protocol::Frame::Channel
Expand Down

0 comments on commit c993ae7

Please sign in to comment.