Skip to content

Commit

Permalink
use a Logger and disable it in specs
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Feb 26, 2018
1 parent cf16d20 commit 03ca8ae
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 30 deletions.
12 changes: 7 additions & 5 deletions spec/amqproxy_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ require "./spec_helper"

describe AMQProxy::Server do
it "keeps connections open" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::ERROR)
spawn { s.listen("127.0.0.1", 5673) }
sleep 0.001
AMQP::Connection.start(AMQP::Config.new(port: 5673)) do |conn|
conn.channel
s.client_connections.should eq(1)
s.upstream_connections.should eq(1)
10.times do
AMQP::Connection.start(AMQP::Config.new(port: 5673)) do |conn|
conn.channel
s.client_connections.should eq(1)
s.upstream_connections.should eq(1)
end
end
s.client_connections.should eq(0)
s.upstream_connections.should eq(1)
Expand Down
2 changes: 0 additions & 2 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ module AMQProxy
@outbox.send frame
end
rescue ex : Errno | IO::Error | OpenSSL::SSL::Error
#print "Decoding client frames ", ex.inspect_with_backtrace, "\n"
@outbox.send nil
end

Expand All @@ -35,7 +34,6 @@ module AMQProxy
@outbox.send nil
end
rescue ex : Errno | IO::Error | OpenSSL::SSL::Error
puts "Client conn closed: #{ex.message}"
@outbox.send nil
end

Expand Down
8 changes: 4 additions & 4 deletions src/amqproxy/pool.cr
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module AMQProxy
class Pool
getter :size
def initialize(@host : String, @port : Int32, @tls : Bool)
def initialize(@host : String, @port : Int32, @tls : Bool, @log : Logger)
@pools = {} of String => Deque(Upstream)
@size = 0
end
Expand All @@ -10,16 +10,16 @@ module AMQProxy
q = @pools[[user, password, vhost].join] ||= Deque(Upstream).new
u = q.shift do
@size += 1
Upstream.new(@host, @port, @tls).connect(user, password, vhost)
Upstream.new(@host, @port, @tls, @log).connect(user, password, vhost)
end
block.call u
ensure
if u.nil?
@size -= 1
print "Upstream connection could not be established\n"
@log.error "Upstream connection could not be established"
elsif u.closed?
@size -= 1
print "Upstream connection closed when returned\n"
@log.error "Upstream connection closed when returned"
elsif !q.nil?
q.push u
end
Expand Down
28 changes: 15 additions & 13 deletions src/amqproxy/server.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require "socket"
require "openssl"
require "logger"
require "./amqp"
require "./pool"
require "./client"
Expand All @@ -9,12 +10,15 @@ module AMQProxy
class Server
@running = true

def initialize(upstream_host, upstream_port, upstream_tls)
print "Proxy upstream: #{upstream_host}:#{upstream_port} "
print "TLS" if upstream_tls
print "\n"
@pool = Pool.new(upstream_host, upstream_port, upstream_tls)
def initialize(upstream_host, upstream_port, upstream_tls, log_level = Logger::INFO)
@log = Logger.new(STDOUT)
@log.level = log_level
@log.formatter = Logger::Formatter.new do |severity, datetime, progname, message, io|
io << message
end
@client_connections = 0
@pool = Pool.new(upstream_host, upstream_port, upstream_tls, @log)
@log.info "Proxy upstream: #{upstream_host}:#{upstream_port} #{upstream_tls ? "TLS" : ""}"
end

getter :client_connections
Expand All @@ -31,7 +35,7 @@ module AMQProxy
socket.tcp_keepalive_idle = 60
socket.tcp_keepalive_count = 3
socket.tcp_keepalive_interval = 10
puts "Proxy listening on #{socket.local_address}"
@log.info "Proxy listening on #{socket.local_address}"
while @running
if client = socket.accept?
spawn handle_connection(client, client.remote_address)
Expand All @@ -53,17 +57,16 @@ module AMQProxy
context = OpenSSL::SSL::Context::Server.new
context.private_key = key_path
context.certificate_chain = cert_path
puts "Proxy listening on #{socket.local_address}:#{port} (TLS)"
log.info "Proxy listening on #{socket.local_address}:#{port} (TLS)"

while @running
if client = @socket.accept?
print "Client connection accepted from ", client.remote_address, "\n"
begin
ssl_client = OpenSSL::SSL::Socket::Server.new(client, context)
ssl_client.sync_close = ssl_client.sync = true
spawn handle_connection(ssl_client, client.remote_address)
rescue e : OpenSSL::SSL::Error
print "Error accepting OpenSSL connection from ", client.remote_address, ": ", e.message, "\n"
@log.error "Error accepting OpenSSL connection from #{client.remote_address}: #{e.inspect}"
end
else
break
Expand All @@ -79,7 +82,7 @@ module AMQProxy
def handle_connection(socket, remote_address)
@client_connections += 1
c = Client.new(socket)
print "Client connection accepted from ", remote_address, "\n"
@log.info { "Client connection accepted from #{remote_address}" }
@pool.borrow(c.user, c.password, c.vhost) do |u|
if u.nil?
c.write AMQP::Connection::Close.new(403_u16, "ACCESS_REFUSED",
Expand All @@ -106,10 +109,9 @@ module AMQProxy
end
end
rescue ex : Errno | IO::Error | OpenSSL::SSL::Error
print "Client connection error from ", remote_address, ": ",
ex.inspect_with_backtrace, "\n"
@log.debug { "Client connection error from #{remote_address}: #{ex.inspect}" }
ensure
print "Client connection closed from ", remote_address, "\n"
@log.info { "Client connection closed from #{remote_address}" }
socket.close
@client_connections -= 1
end
Expand Down
11 changes: 5 additions & 6 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ require "uri"

module AMQProxy
class Upstream
def initialize(@host : String, @port : Int32, @tls : Bool)
def initialize(@host : String, @port : Int32, @tls : Bool, @log : Logger)
@socket = uninitialized IO
@to_client = Channel(AMQP::Frame?).new(1)
@open_channels = Set(UInt16).new
Expand All @@ -14,7 +14,7 @@ module AMQProxy
def connect(user : String, password : String, vhost : String)
tcp_socket = TCPSocket.new(@host, @port)
tcp_socket.tcp_nodelay = true
print "Connected to upstream ", tcp_socket.remote_address, "\n"
@log.info { "Connected to upstream #{tcp_socket.remote_address}" }
@socket =
if @tls
OpenSSL::SSL::Socket::Client.new(tcp_socket, hostname: @host).tap do |c|
Expand All @@ -27,8 +27,7 @@ module AMQProxy
spawn decode_frames
self
rescue ex : IO::EOFError
puts "Upstream connection failed to #{user}@#{@host}:#{@port}/#{vhost}"
ex.inspect_with_backtrace(STDERR)
@log.error "Failed connecting to upstream #{user}@#{@host}:#{@port}/#{vhost}"
nil
end

Expand All @@ -46,7 +45,7 @@ module AMQProxy
@to_client.send frame
end
rescue ex : Errno | IO::EOFError
print "Error on upstream socket: ", ex.inspect, "\n"
@log.error "Error reading from upstream: #{ex.inspect}"
@to_client.send nil
end

Expand Down Expand Up @@ -83,7 +82,7 @@ module AMQProxy
end
@socket.write frame.to_slice
rescue ex : Errno | IO::EOFError
puts "proxy write bytes: #{ex.message}"
@log.error "Error sending to upstream: #{ex.inspect}"
@to_client.send nil
end

Expand Down

0 comments on commit 03ca8ae

Please sign in to comment.