Skip to content

Commit

Permalink
specs that checks connection count and upstream count
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Feb 26, 2018
1 parent c97d4fc commit cd7fd3b
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 7 deletions.
2 changes: 1 addition & 1 deletion shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ version: 1.0
shards:
amqp:
github: cloudamqp/amqp.cr
commit: 692200ba05bb5e6479ec1e3eeaed38d67c61b3a4
commit: e5388a5dd6dd892be00a2ebf8b0f38e8e7221cea

18 changes: 13 additions & 5 deletions spec/amqproxy_spec.cr
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
require "./spec_helper"

describe AMQProxy do
# TODO: Write tests

it "works" do
false.should eq(true)
describe AMQProxy::Server do
it "keeps connections open" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
spawn { s.listen("127.0.0.1", 5673) }
sleep 0.001
AMQP::Connection.start(AMQP::Config.new(port: 5673)) do |conn|
conn.channel
s.client_connections.should eq(1)
s.upstream_connections.should eq(1)
end
s.client_connections.should eq(0)
s.upstream_connections.should eq(1)
s.close
end
end
4 changes: 3 additions & 1 deletion spec/spec_helper.cr
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
require "spec"
require "../src/amqproxy"
require "../src/amqproxy/server"
require "../src/amqproxy/version"
require "amqp"
5 changes: 5 additions & 0 deletions src/amqproxy/pool.cr
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
module AMQProxy
class Pool
getter :size
def initialize(@host : String, @port : Int32, @tls : Bool)
@pools = {} of String => Deque(Upstream)
@size = 0
end

def borrow(user : String, password : String, vhost : String, &block : (Upstream | Nil) -> _)
q = @pools[[user, password, vhost].join] ||= Deque(Upstream).new
u = q.shift do
@size += 1
Upstream.new(@host, @port, @tls).connect(user, password, vhost)
end
block.call u
ensure
if u.nil?
@size -= 1
print "Upstream connection could not be established\n"
elsif u.closed?
@size -= 1
print "Upstream connection closed when returned\n"
elsif !q.nil?
q.push u
Expand Down
9 changes: 9 additions & 0 deletions src/amqproxy/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ module AMQProxy
print "TLS" if upstream_tls
print "\n"
@pool = Pool.new(upstream_host, upstream_port, upstream_tls)
@client_connections = 0
end

getter :client_connections

def upstream_connections
@pool.size
end

def listen(address, port)
Expand Down Expand Up @@ -64,6 +71,7 @@ module AMQProxy
end

def handle_connection(socket, remote_address)
@client_connections += 1
c = Client.new(socket)
print "Client connection accepted from ", remote_address, "\n"
@pool.borrow(c.user, c.password, c.vhost) do |u|
Expand Down Expand Up @@ -97,6 +105,7 @@ module AMQProxy
ensure
print "Client connection closed from ", remote_address, "\n"
socket.close
@client_connections -= 1
end
end
end

0 comments on commit cd7fd3b

Please sign in to comment.