diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index 9ec43bb..2e337b1 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -107,6 +107,10 @@ module AMQProxy # Send frame to client, channel id should already be remapped by the caller def write(frame : AMQ::Protocol::Frame) + case frame + when AMQ::Protocol::Frame::Channel::Close + @channel_map[frame.channel] = nil + end @outgoing_frames.send frame rescue Channel::ClosedError # do nothing @@ -123,7 +127,6 @@ module AMQProxy def close_channel(id, code, reason) write AMQ::Protocol::Frame::Channel::Close.new(id, code, reason, 0_u16, 0_u16) - @channel_map[id] = nil end private def close_all_upstream_channels(code = 500_u16, reason = "CLIENT_DISCONNECTED") diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 774a714..a669646 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -74,7 +74,12 @@ module AMQProxy when AMQ::Protocol::Frame::Connection::Blocked, AMQ::Protocol::Frame::Connection::Unblocked send_to_all_clients(frame) - when AMQ::Protocol::Frame::Channel::OpenOk # we assume it always succeeds + when AMQ::Protocol::Frame::Channel::OpenOk # assume it always succeeds + when AMQ::Protocol::Frame::Channel::Close + send AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel) + if downstream_channel = @channels_lock.synchronize { @channels.delete(frame.channel) } + downstream_channel.write frame + end when AMQ::Protocol::Frame::Channel::CloseOk # when client requested channel close @channels_lock.synchronize { @channels.delete(frame.channel) } else