diff --git a/spec/amqproxy_spec.cr b/spec/amqproxy_spec.cr index e8b5187..cb433c8 100644 --- a/spec/amqproxy_spec.cr +++ b/spec/amqproxy_spec.cr @@ -2,13 +2,15 @@ require "./spec_helper" describe AMQProxy::Server do it "keeps connections open" do - s = AMQProxy::Server.new("127.0.0.1", 5672, false) + s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::ERROR) spawn { s.listen("127.0.0.1", 5673) } sleep 0.001 - AMQP::Connection.start(AMQP::Config.new(port: 5673)) do |conn| - conn.channel - s.client_connections.should eq(1) - s.upstream_connections.should eq(1) + 10.times do + AMQP::Connection.start(AMQP::Config.new(port: 5673)) do |conn| + conn.channel + s.client_connections.should eq(1) + s.upstream_connections.should eq(1) + end end s.client_connections.should eq(0) s.upstream_connections.should eq(1) diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index 02547b9..5503384 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -19,7 +19,6 @@ module AMQProxy @outbox.send frame end rescue ex : Errno | IO::Error | OpenSSL::SSL::Error - #print "Decoding client frames ", ex.inspect_with_backtrace, "\n" @outbox.send nil end @@ -35,7 +34,6 @@ module AMQProxy @outbox.send nil end rescue ex : Errno | IO::Error | OpenSSL::SSL::Error - puts "Client conn closed: #{ex.message}" @outbox.send nil end diff --git a/src/amqproxy/pool.cr b/src/amqproxy/pool.cr index 311e744..eda2bb1 100644 --- a/src/amqproxy/pool.cr +++ b/src/amqproxy/pool.cr @@ -1,7 +1,7 @@ module AMQProxy class Pool getter :size - def initialize(@host : String, @port : Int32, @tls : Bool) + def initialize(@host : String, @port : Int32, @tls : Bool, @log : Logger) @pools = {} of String => Deque(Upstream) @size = 0 end @@ -10,16 +10,16 @@ module AMQProxy q = @pools[[user, password, vhost].join] ||= Deque(Upstream).new u = q.shift do @size += 1 - Upstream.new(@host, @port, @tls).connect(user, password, vhost) + Upstream.new(@host, @port, @tls, @log).connect(user, password, vhost) end block.call u ensure if u.nil? @size -= 1 - print "Upstream connection could not be established\n" + @log.error "Upstream connection could not be established" elsif u.closed? @size -= 1 - print "Upstream connection closed when returned\n" + @log.error "Upstream connection closed when returned" elsif !q.nil? q.push u end diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index 3295586..964e9cf 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -1,5 +1,6 @@ require "socket" require "openssl" +require "logger" require "./amqp" require "./pool" require "./client" @@ -9,12 +10,15 @@ module AMQProxy class Server @running = true - def initialize(upstream_host, upstream_port, upstream_tls) - print "Proxy upstream: #{upstream_host}:#{upstream_port} " - print "TLS" if upstream_tls - print "\n" - @pool = Pool.new(upstream_host, upstream_port, upstream_tls) + def initialize(upstream_host, upstream_port, upstream_tls, log_level = Logger::INFO) + @log = Logger.new(STDOUT) + @log.level = log_level + @log.formatter = Logger::Formatter.new do |severity, datetime, progname, message, io| + io << message + end @client_connections = 0 + @pool = Pool.new(upstream_host, upstream_port, upstream_tls, @log) + @log.info "Proxy upstream: #{upstream_host}:#{upstream_port} #{upstream_tls ? "TLS" : ""}" end getter :client_connections @@ -31,7 +35,7 @@ module AMQProxy socket.tcp_keepalive_idle = 60 socket.tcp_keepalive_count = 3 socket.tcp_keepalive_interval = 10 - puts "Proxy listening on #{socket.local_address}" + @log.info "Proxy listening on #{socket.local_address}" while @running if client = socket.accept? spawn handle_connection(client, client.remote_address) @@ -53,17 +57,16 @@ module AMQProxy context = OpenSSL::SSL::Context::Server.new context.private_key = key_path context.certificate_chain = cert_path - puts "Proxy listening on #{socket.local_address}:#{port} (TLS)" + log.info "Proxy listening on #{socket.local_address}:#{port} (TLS)" while @running if client = @socket.accept? - print "Client connection accepted from ", client.remote_address, "\n" begin ssl_client = OpenSSL::SSL::Socket::Server.new(client, context) ssl_client.sync_close = ssl_client.sync = true spawn handle_connection(ssl_client, client.remote_address) rescue e : OpenSSL::SSL::Error - print "Error accepting OpenSSL connection from ", client.remote_address, ": ", e.message, "\n" + @log.error "Error accepting OpenSSL connection from #{client.remote_address}: #{e.inspect}" end else break @@ -79,7 +82,7 @@ module AMQProxy def handle_connection(socket, remote_address) @client_connections += 1 c = Client.new(socket) - print "Client connection accepted from ", remote_address, "\n" + @log.info { "Client connection accepted from #{remote_address}" } @pool.borrow(c.user, c.password, c.vhost) do |u| if u.nil? c.write AMQP::Connection::Close.new(403_u16, "ACCESS_REFUSED", @@ -106,10 +109,9 @@ module AMQProxy end end rescue ex : Errno | IO::Error | OpenSSL::SSL::Error - print "Client connection error from ", remote_address, ": ", - ex.inspect_with_backtrace, "\n" + @log.debug { "Client connection error from #{remote_address}: #{ex.inspect}" } ensure - print "Client connection closed from ", remote_address, "\n" + @log.info { "Client connection closed from #{remote_address}" } socket.close @client_connections -= 1 end diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 75cda21..ed91a13 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -4,7 +4,7 @@ require "uri" module AMQProxy class Upstream - def initialize(@host : String, @port : Int32, @tls : Bool) + def initialize(@host : String, @port : Int32, @tls : Bool, @log : Logger) @socket = uninitialized IO @to_client = Channel(AMQP::Frame?).new(1) @open_channels = Set(UInt16).new @@ -14,7 +14,7 @@ module AMQProxy def connect(user : String, password : String, vhost : String) tcp_socket = TCPSocket.new(@host, @port) tcp_socket.tcp_nodelay = true - print "Connected to upstream ", tcp_socket.remote_address, "\n" + @log.info { "Connected to upstream #{tcp_socket.remote_address}" } @socket = if @tls OpenSSL::SSL::Socket::Client.new(tcp_socket, hostname: @host).tap do |c| @@ -27,8 +27,7 @@ module AMQProxy spawn decode_frames self rescue ex : IO::EOFError - puts "Upstream connection failed to #{user}@#{@host}:#{@port}/#{vhost}" - ex.inspect_with_backtrace(STDERR) + @log.error "Failed connecting to upstream #{user}@#{@host}:#{@port}/#{vhost}" nil end @@ -46,7 +45,7 @@ module AMQProxy @to_client.send frame end rescue ex : Errno | IO::EOFError - print "Error on upstream socket: ", ex.inspect, "\n" + @log.error "Error reading from upstream: #{ex.inspect}" @to_client.send nil end @@ -83,7 +82,7 @@ module AMQProxy end @socket.write frame.to_slice rescue ex : Errno | IO::EOFError - puts "proxy write bytes: #{ex.message}" + @log.error "Error sending to upstream: #{ex.inspect}" @to_client.send nil end