From abee514cedb032ab35c971807ed04c2d0afe6d97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Sat, 9 Mar 2024 10:09:45 +0100 Subject: [PATCH] Heartbeat support (#153) Server should send heartbeat frames to client depending on the negotiated heartbeat interval. Probably fixes #140 --- src/amqproxy/client.cr | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index 1bef428..9534f3c 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -13,6 +13,7 @@ module AMQProxy @frame_max : UInt32 @channel_max : UInt16 @heartbeat : UInt16 + @last_heartbeat = Time.monotonic def initialize(@socket : TCPSocket) set_socket_options(@socket) @@ -27,9 +28,11 @@ module AMQProxy def read_loop(channel_pool, socket = @socket) # ameba:disable Metrics/CyclomaticComplexity Log.context.set(remote_address: socket.remote_address.to_s) Log.debug { "Connected" } + socket.read_timeout = (@heartbeat / 2).ceil.seconds if @heartbeat > 0 loop do case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) - when AMQ::Protocol::Frame::Heartbeat then write frame + when AMQ::Protocol::Frame::Heartbeat + @last_heartbeat = Time.monotonic when AMQ::Protocol::Frame::Connection::CloseOk then return when AMQ::Protocol::Frame::Connection::Close close_all_upstream_channels @@ -63,6 +66,15 @@ module AMQProxy close_connection(504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open", frame) end end + rescue IO::TimeoutError + time_since_last_heartbeat = (Time.monotonic - @last_heartbeat).total_seconds.to_i # ignore subsecond latency + if time_since_last_heartbeat <= 1 + @heartbeat # add 1s grace because of rounding + Log.debug { "Sending heartbeat (last heartbeat #{time_since_last_heartbeat}s ago)" } + write AMQ::Protocol::Frame::Heartbeat.new + else + Log.warn { "No heartbeat response in #{time_since_last_heartbeat}s (max #{1 + @heartbeat}s), closing connection" } + return + end end rescue ex : IO::EOFError Log.debug { "Disconnected" }