Skip to content

Commit

Permalink
can listen for TLS connections
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Sep 20, 2017
1 parent 3eebf28 commit 0224cbd
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 72 deletions.
7 changes: 6 additions & 1 deletion src/amqproxy.cr
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ OptionParser.parse! do |parser|
end

server = AMQProxy::Server.new(config["server"])
server.listen(config["listen"]["address"], config["listen"]["port"].to_i)
if config["listen"]["certificateChain"]?
server.listen_tls(config["listen"]["address"], config["listen"]["port"].to_i,
config["listen"]["certificateChain"], config["listen"]["privateKey"])
else
server.listen(config["listen"]["address"], config["listen"]["port"].to_i)
end
15 changes: 4 additions & 11 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,18 @@ require "socket"

module AMQProxy
class Client
def initialize(@socket : TCPSocket)
def initialize(@socket : (TCPSocket | OpenSSL::SSL::Socket::Server))
negotiate_client(@socket)
@outbox = Channel(AMQP::Frame?).new
spawn decode_frames
end

def decode_frames
loop do
frame = AMQP::Frame.decode @socket
case frame
when AMQP::Connection::Close
@socket.write AMQP::Connection::CloseOk.new.to_slice
@outbox.send nil
break
end
@outbox.send frame
@outbox.send AMQP::Frame.decode(@socket)
end
rescue ex : IO::EOFError
puts "Client conn closed #{ex.message}"
rescue ex : IO::Error | IO::EOFError | OpenSSL::SSL::Error
puts "Client conn closed: #{ex.message}"
@outbox.send nil
end

Expand Down
45 changes: 34 additions & 11 deletions src/amqproxy/server.cr
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
require "socket"
require "openssl"
require "./amqp"
require "./client"
require "./upstream"

module AMQProxy
class Server
@upstream_url : String
@default_prefetch : UInt16

def initialize(config : Hash(String, String))
@upstream_url = config["upstream"]
@default_prefetch = @config.fetch("defaultPrefetch", "0").to_u16)
@upstream_url = config["upstream"].to_s
@default_prefetch = config.fetch("defaultPrefetch", "0").to_u16
puts "Proxy upstream: #{config["upstream"]}"
end

Expand All @@ -23,6 +27,29 @@ module AMQProxy
end
end

def listen_tls(address : String, port : Int, cert_path : String, key_path : String)
server = TCPServer.new(address, port)
puts "Proxy listening on #{server.local_address} (TLS)"

context = OpenSSL::SSL::Context::Server.new
context.private_key = key_path
context.certificate_chain = cert_path

loop do
if socket = server.accept?
begin
ssl_socket = OpenSSL::SSL::Socket::Server.new(socket, context)
ssl_socket.sync = true
spawn handle_connection(ssl_socket)
rescue e : OpenSSL::SSL::Error
print "Error accepting OpenSSL connection: ", e.message, "\n"
end
else
break
end
end
end

def handle_connection(socket)
client = Client.new(socket)
puts "Client connection opened"
Expand All @@ -32,19 +59,15 @@ module AMQProxy
loop do
idx, frame = Channel.select([upstream.next_frame, client.next_frame])
case idx
when 0
when 0 # Upstream
break if frame.nil?
client.write frame.to_slice
when 1
if frame.nil?
upstream.close_all_open_channels
break
else
upstream.write frame.to_slice
end
when 1 # Client
break if frame.nil?
upstream.write frame.to_slice
end
end
rescue ex : IO::EOFError | Errno
rescue ex : IO::Error | IO::EOFError | Errno
puts "Client loop #{ex.inspect}"
ensure
puts "Client connection closed"
Expand Down
68 changes: 19 additions & 49 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,88 +13,58 @@ module AMQProxy
@default_prefetch = default_prefetch

@socket = uninitialized IO
@connected = false
@reconnect = Channel(Nil).new
@frame_channel = Channel(AMQP::Frame?).new
@open_channels = Set(UInt16).new
spawn connect!
@outbox = Channel(AMQP::Frame?).new
connect!
spawn decode_frames
end

def connect!
loop do
begin
tcp_socket = TCPSocket.new(@host, @port)
@socket = if @tls
context = OpenSSL::SSL::Context::Client.new
OpenSSL::SSL::Socket::Client.new(tcp_socket, context)
else
tcp_socket
end
@connected = true
negotiate_server
spawn decode_frames
puts "Connected to upstream #{@host}:#{@port}"
@reconnect.receive
rescue ex : Errno
puts "When connecting: ", ex.message
ensure
@connected = false
end
sleep 1
end
tcp_socket = TCPSocket.new(@host, @port)
@socket = if @tls
context = OpenSSL::SSL::Context::Client.new
OpenSSL::SSL::Socket::Client.new(tcp_socket, context)
else
tcp_socket
end
negotiate_server
puts "Connected to upstream #{@host}:#{@port}"
end

def decode_frames
loop do
frame = AMQP::Frame.decode @socket
case frame
when AMQP::Channel::OpenOk
@open_channels.add frame.channel
if @default_prefetch > 0_u16
write AMQP::Basic::Qos.new(frame.channel, 0_u32, @default_prefetch, false).to_slice
nextFrame = AMQP::Frame.decode @socket
if nextFrame.class != AMQP::Basic::QosOk || nextFrame.channel != frame.channel
raise "Unexpected frame after setting default prefetch: #{nextFrame.inspect}"
raise "Unexpected frame after setting default prefetch: #{nextFrame.class}"
end
end
when AMQP::Channel::CloseOk
@open_channels.delete frame.channel
end
@frame_channel.send frame
@outbox.send frame
end
rescue ex : Errno | IO::EOFError
puts "proxy decode frame, reconnect: ", ex.message
ex.backtrace.each { |l| puts l }
@open_channels.clear
@frame_channel.receive?
@reconnect.send nil
print "proxy decode frame: ", ex.message, "\n"
@outbox.send nil
end

def next_frame
@frame_channel.receive_select_action
@outbox.receive_select_action
end

def write(bytes : Slice(UInt8))
@socket.write bytes
rescue ex : Errno | IO::EOFError
puts "proxy write bytes, reconnect: #{ex.message}"
@reconnect.send nil
Fiber.yield
write(bytes)
puts "proxy write bytes: #{ex.message}"
@outbox.send nil
end

def closed?
!@connected || @socket.closed?
end

def close_all_open_channels
@open_channels.each do |ch|
puts "Closing client channel #{ch}"
@socket.write AMQP::Channel::Close.new(ch, 200_u16, "", 0_u16, 0_u16).to_slice
@frame_channel.receive
end
end

private def negotiate_server
@socket.write AMQP::PROTOCOL_START
end
Expand Down

0 comments on commit 0224cbd

Please sign in to comment.