From 0224cbdcae041711f143d7ad9326a15b3fe2165f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20Ho=CC=88rberg?= Date: Wed, 20 Sep 2017 23:32:57 +0200 Subject: [PATCH] can listen for TLS connections --- src/amqproxy.cr | 7 ++++- src/amqproxy/client.cr | 15 +++------ src/amqproxy/server.cr | 45 +++++++++++++++++++------- src/amqproxy/upstream.cr | 68 +++++++++++----------------------------- 4 files changed, 63 insertions(+), 72 deletions(-) diff --git a/src/amqproxy.cr b/src/amqproxy.cr index 88881e1..303c14d 100644 --- a/src/amqproxy.cr +++ b/src/amqproxy.cr @@ -31,4 +31,9 @@ OptionParser.parse! do |parser| end server = AMQProxy::Server.new(config["server"]) -server.listen(config["listen"]["address"], config["listen"]["port"].to_i) +if config["listen"]["certificateChain"]? + server.listen_tls(config["listen"]["address"], config["listen"]["port"].to_i, + config["listen"]["certificateChain"], config["listen"]["privateKey"]) +else + server.listen(config["listen"]["address"], config["listen"]["port"].to_i) +end diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index 2625eb5..fa5c44b 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -2,7 +2,7 @@ require "socket" module AMQProxy class Client - def initialize(@socket : TCPSocket) + def initialize(@socket : (TCPSocket | OpenSSL::SSL::Socket::Server)) negotiate_client(@socket) @outbox = Channel(AMQP::Frame?).new spawn decode_frames @@ -10,17 +10,10 @@ module AMQProxy def decode_frames loop do - frame = AMQP::Frame.decode @socket - case frame - when AMQP::Connection::Close - @socket.write AMQP::Connection::CloseOk.new.to_slice - @outbox.send nil - break - end - @outbox.send frame + @outbox.send AMQP::Frame.decode(@socket) end - rescue ex : IO::EOFError - puts "Client conn closed #{ex.message}" + rescue ex : IO::Error | IO::EOFError | OpenSSL::SSL::Error + puts "Client conn closed: #{ex.message}" @outbox.send nil end diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index 6f4b01f..2b3d325 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -1,13 +1,17 @@ require "socket" +require "openssl" require "./amqp" require "./client" require "./upstream" module AMQProxy class Server + @upstream_url : String + @default_prefetch : UInt16 + def initialize(config : Hash(String, String)) - @upstream_url = config["upstream"] - @default_prefetch = @config.fetch("defaultPrefetch", "0").to_u16) + @upstream_url = config["upstream"].to_s + @default_prefetch = config.fetch("defaultPrefetch", "0").to_u16 puts "Proxy upstream: #{config["upstream"]}" end @@ -23,6 +27,29 @@ module AMQProxy end end + def listen_tls(address : String, port : Int, cert_path : String, key_path : String) + server = TCPServer.new(address, port) + puts "Proxy listening on #{server.local_address} (TLS)" + + context = OpenSSL::SSL::Context::Server.new + context.private_key = key_path + context.certificate_chain = cert_path + + loop do + if socket = server.accept? + begin + ssl_socket = OpenSSL::SSL::Socket::Server.new(socket, context) + ssl_socket.sync = true + spawn handle_connection(ssl_socket) + rescue e : OpenSSL::SSL::Error + print "Error accepting OpenSSL connection: ", e.message, "\n" + end + else + break + end + end + end + def handle_connection(socket) client = Client.new(socket) puts "Client connection opened" @@ -32,19 +59,15 @@ module AMQProxy loop do idx, frame = Channel.select([upstream.next_frame, client.next_frame]) case idx - when 0 + when 0 # Upstream break if frame.nil? client.write frame.to_slice - when 1 - if frame.nil? - upstream.close_all_open_channels - break - else - upstream.write frame.to_slice - end + when 1 # Client + break if frame.nil? + upstream.write frame.to_slice end end - rescue ex : IO::EOFError | Errno + rescue ex : IO::Error | IO::EOFError | Errno puts "Client loop #{ex.inspect}" ensure puts "Client connection closed" diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 7d0c8d7..6fc1b27 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -13,35 +13,21 @@ module AMQProxy @default_prefetch = default_prefetch @socket = uninitialized IO - @connected = false - @reconnect = Channel(Nil).new - @frame_channel = Channel(AMQP::Frame?).new - @open_channels = Set(UInt16).new - spawn connect! + @outbox = Channel(AMQP::Frame?).new + connect! + spawn decode_frames end def connect! - loop do - begin - tcp_socket = TCPSocket.new(@host, @port) - @socket = if @tls - context = OpenSSL::SSL::Context::Client.new - OpenSSL::SSL::Socket::Client.new(tcp_socket, context) - else - tcp_socket - end - @connected = true - negotiate_server - spawn decode_frames - puts "Connected to upstream #{@host}:#{@port}" - @reconnect.receive - rescue ex : Errno - puts "When connecting: ", ex.message - ensure - @connected = false - end - sleep 1 - end + tcp_socket = TCPSocket.new(@host, @port) + @socket = if @tls + context = OpenSSL::SSL::Context::Client.new + OpenSSL::SSL::Socket::Client.new(tcp_socket, context) + else + tcp_socket + end + negotiate_server + puts "Connected to upstream #{@host}:#{@port}" end def decode_frames @@ -49,52 +35,36 @@ module AMQProxy frame = AMQP::Frame.decode @socket case frame when AMQP::Channel::OpenOk - @open_channels.add frame.channel if @default_prefetch > 0_u16 write AMQP::Basic::Qos.new(frame.channel, 0_u32, @default_prefetch, false).to_slice nextFrame = AMQP::Frame.decode @socket if nextFrame.class != AMQP::Basic::QosOk || nextFrame.channel != frame.channel - raise "Unexpected frame after setting default prefetch: #{nextFrame.inspect}" + raise "Unexpected frame after setting default prefetch: #{nextFrame.class}" end end - when AMQP::Channel::CloseOk - @open_channels.delete frame.channel end - @frame_channel.send frame + @outbox.send frame end rescue ex : Errno | IO::EOFError - puts "proxy decode frame, reconnect: ", ex.message - ex.backtrace.each { |l| puts l } - @open_channels.clear - @frame_channel.receive? - @reconnect.send nil + print "proxy decode frame: ", ex.message, "\n" + @outbox.send nil end def next_frame - @frame_channel.receive_select_action + @outbox.receive_select_action end def write(bytes : Slice(UInt8)) @socket.write bytes rescue ex : Errno | IO::EOFError - puts "proxy write bytes, reconnect: #{ex.message}" - @reconnect.send nil - Fiber.yield - write(bytes) + puts "proxy write bytes: #{ex.message}" + @outbox.send nil end def closed? !@connected || @socket.closed? end - def close_all_open_channels - @open_channels.each do |ch| - puts "Closing client channel #{ch}" - @socket.write AMQP::Channel::Close.new(ch, 200_u16, "", 0_u16, 0_u16).to_slice - @frame_channel.receive - end - end - private def negotiate_server @socket.write AMQP::PROTOCOL_START end