Skip to content

Commit

Permalink
buffer client publishes per channel
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed May 10, 2024
1 parent 2f91317 commit 885be91
Showing 1 changed file with 24 additions and 17 deletions.
41 changes: 24 additions & 17 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,32 @@ module AMQProxy

# Keep a buffer of publish frames
# Only send to upstream when the full message is received
@next_publish : AMQ::Protocol::Frame::Basic::Publish?
@next_header : AMQ::Protocol::Frame::Header?
@next_bodies = Array(AMQ::Protocol::Frame::BytesBody).new
@publish_buffers = Hash(UInt16, PublishBuffer).new

private class PublishBuffer
getter publish : AMQ::Protocol::Frame::Basic::Publish
property! header : AMQ::Protocol::Frame::Header?
getter bodies = Array(AMQ::Protocol::Frame::BytesBody).new

def initialize(@publish)
end

def full?
header.body_size == @bodies.sum &.body_size
end
end

private def finish_publish(channel)
next_publish = @next_publish.not_nil!("Missing Basic.Publish frame")
next_header = @next_header.not_nil!("Missing Header frame")
buffer = @publish_buffers[channel]
if upstream_channel = @channel_map[channel]
upstream_channel.write(next_publish)
upstream_channel.write(next_header)
@next_bodies.each do |body|
upstream_channel.write(buffer.publish)
upstream_channel.write(buffer.header)
buffer.bodies.each do |body|
upstream_channel.write(body)
end
end
ensure
@next_publish = @next_header = nil
@next_bodies.clear
@publish_buffers.delete channel
end

# frames from enduser
Expand All @@ -69,16 +78,14 @@ module AMQProxy
# Server closed channel, CloseOk reply to server is already sent
@channel_map.delete(frame.channel)
when AMQ::Protocol::Frame::Basic::Publish
@next_publish = frame
@publish_buffers[frame.channel] = PublishBuffer.new(frame)
when AMQ::Protocol::Frame::Header
@next_header = frame
@publish_buffers[frame.channel].header = frame
finish_publish(frame.channel) if frame.body_size.zero?
when AMQ::Protocol::Frame::BytesBody
@next_bodies << frame
if @next_header.not_nil!("Missing Header frame").body_size ==
@next_bodies.sum &.body_size
finish_publish(frame.channel)
end
buffer = @publish_buffers[frame.channel]
buffer.bodies << frame
finish_publish(frame.channel) if buffer.full?
when frame.channel.zero?
Log.error { "Unexpected connection frame: #{frame}" }
close_connection(540_u16, "NOT_IMPLEMENTED", frame)
Expand Down

0 comments on commit 885be91

Please sign in to comment.