diff --git a/src/lavinmq/amqp/client.cr b/src/lavinmq/amqp/client.cr index 23fc2063d7..67af5be0ad 100644 --- a/src/lavinmq/amqp/client.cr +++ b/src/lavinmq/amqp/client.cr @@ -59,6 +59,25 @@ module LavinMQ @vhost.add_connection(self) @log.info { "Connection established for user=#{@user.name}" } spawn read_loop, name: "Client#read_loop #{@remote_address}" + spawn flush_loop, name: "Client#flush_loop #{@remote_address}" + end + + @flush_buffer = ::Channel(Bool).new(128) + + private def flush_loop + flush_buffer = @flush_buffer + while flush_buffer.receive? && @running + while flush_buffer.try_receive? + end + @write_lock.synchronize do + @socket.flush + end + end + rescue e : IO::Error + end + + private def flush + @flush_buffer.send(true) end # Returns client provided connection name if set, else server generated name @@ -190,8 +209,8 @@ module LavinMQ @write_lock.synchronize do s = @socket s.write_bytes frame, IO::ByteFormat::NetworkEndian - s.flush end + flush @last_sent_frame = RoughTime.monotonic @send_oct_count += 8_u64 + frame.bytesize if frame.is_a?(AMQP::Frame::Connection::CloseOk) @@ -224,9 +243,9 @@ module LavinMQ def deliver(frame, msg) return false if closed? + socket = @socket + websocket = socket.is_a? WebSocketIO @write_lock.synchronize do - socket = @socket - websocket = socket.is_a? WebSocketIO {% unless flag?(:release) %} @log.trace { "Send #{frame.inspect}" } {% end %} @@ -257,9 +276,9 @@ module LavinMQ @send_oct_count += 8_u64 + body.bytesize pos += length end - socket.flush unless websocket # Websockets need to send one frame per WS frame @last_sent_frame = RoughTime.monotonic end + flush unless websocket # Websockets need to send one frame per WS frame true rescue ex : IO::Error | OpenSSL::SSL::Error @log.debug { "Lost connection, while sending message (#{ex.inspect})" } unless closed? @@ -412,7 +431,10 @@ module LavinMQ private def close_socket @running = false - @socket.close + @write_lock.synchronize do + @flush_buffer.close + @socket.close + end rescue ex @log.debug { "#{ex.inspect} when closing socket" } end