Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel pool #142

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .ameba.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Excluded:
- test/
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
*.tar.gz
*.swp
/builds/
shard.override.yml
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [v2.0.0] - 2024-02-19

- Rewrite of the proxy where Channels are pooled rather than connections. When a client opens a channel it will get a channel on a shared upstream connection, the proxy will remap the channel numbers between the two. Many client connections can therefor share a single upstream connection. Upside is that way fewer connections are needed to the upstream server, downside is that if there's a misbehaving client, for which the server closes the connection, all channels for other clients on that shared connection will also be closed.

## [v1.0.0] - 2024-02-19

- Nothing changed from v0.8.14
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Only "safe" channels are reused, that is channels where only Basic Publish or Ba

In our benchmarks publishing one message per connection to a server (using TLS) with a round-trip latency of 50ms, takes on avarage 10ms using the proxy and 500ms without. You can read more about the proxy here [Maintaining long-lived connections with AMQProxy](https://www.cloudamqp.com/blog/2019-05-29-maintaining-long-lived-connections-with-AMQProxy.html)

As of version 2.0.0 connections to the server can be shared by multiple client connections. When a client opens a channel it will get a channel on a shared upstream connection, the proxy will remap the channel numbers between the two. Many client connections can therefor share a single upstream connection. The benefit is that way fewer connections are needed to the upstream server. For instance, establihsing 10.000 connections after a server reboot might normally take several minutes, but with this proxy it can happen in seconds.

## Installation

### Debian/Ubuntu
Expand Down
10 changes: 3 additions & 7 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@ version: 2.0
shards:
ameba:
git: https://github.com/crystal-ameba/ameba.git
version: 1.5.0
version: 1.6.1

amq-protocol:
git: https://github.com/cloudamqp/amq-protocol.cr.git
version: 1.1.4
version: 1.1.14

amqp-client:
git: https://github.com/cloudamqp/amqp-client.cr.git
version: 1.0.11

logger:
git: https://github.com/84codes/logger.cr.git
version: 1.0.2
version: 1.2.1

4 changes: 1 addition & 3 deletions shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: amqproxy
version: 1.0.0
version: 2.0.0

authors:
- CloudAMQP <[email protected]>
Expand All @@ -11,8 +11,6 @@ targets:
dependencies:
amq-protocol:
github: cloudamqp/amq-protocol.cr
logger:
github: 84codes/logger.cr

development_dependencies:
amqp-client:
Expand Down
47 changes: 35 additions & 12 deletions spec/amqproxy_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ require "./spec_helper"

describe AMQProxy::Server do
it "keeps connections open" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
10.times do
AMQP::Client.start("amqp://localhost:5673") do |conn|
conn.channel
ch = conn.channel
ch.basic_publish "foobar", "amq.fanout", ""
s.client_connections.should eq 1
s.upstream_connections.should eq 1
end
sleep 0.1
end
s.client_connections.should eq 0
s.upstream_connections.should eq 1
Expand All @@ -22,7 +22,7 @@ describe AMQProxy::Server do
end

it "publish and consume works" do
server = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
server = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { server.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -38,18 +38,19 @@ describe AMQProxy::Server do
queue = channel.queue(queue_name)
queue.publish_confirm(message_payload)
end
sleep 0.1
end
sleep 0.1

AMQP::Client.start("amqp://localhost:5673") do |conn|
channel = conn.channel
channel.basic_consume(queue_name, block: true, tag: "AMQProxy specs") do |msg|
channel.basic_consume(queue_name, no_ack: false, tag: "AMQProxy specs") do |msg|
body = msg.body_io.to_s
if body == message_payload
channel.basic_ack(msg.delivery_tag)
num_received_messages += 1
end
end
sleep 0.1
end

num_received_messages.should eq num_messages_to_publish
Expand All @@ -58,8 +59,30 @@ describe AMQProxy::Server do
end
end

it "a client can open all channels" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
max = 4000
AMQP::Client.start("amqp://localhost:5673?channel_max=#{max}") do |conn|
conn.channel_max.should eq max
conn.channel_max.times do
conn.channel
end
s.client_connections.should eq 1
s.upstream_connections.should eq 2
end
sleep 0.1
s.client_connections.should eq 0
s.upstream_connections.should eq 2
ensure
s.stop_accepting_clients
end
end

it "can reconnect if upstream closes" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -83,7 +106,7 @@ describe AMQProxy::Server do

it "responds to upstream heartbeats" do
system("#{MAYBE_SUDO}rabbitmqctl eval 'application:set_env(rabbit, heartbeat, 1).' > /dev/null").should be_true
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -102,7 +125,7 @@ describe AMQProxy::Server do
it "supports waiting for client connections on graceful shutdown" do
started = Time.utc.to_unix

s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5)
s = AMQProxy::Server.new("127.0.0.1", 5672, false, 5)
wait_for_channel = Channel(Int32).new # channel used to wait for certain calls, to test certain behaviour
spawn do
s.listen("127.0.0.1", 5673)
Expand Down Expand Up @@ -133,11 +156,11 @@ describe AMQProxy::Server do
end
wait_for_channel.receive.should eq 2 # wait 2
s.client_connections.should eq 2
s.upstream_connections.should eq 2
s.upstream_connections.should eq 1
spawn s.stop_accepting_clients
wait_for_channel.receive.should eq 3 # wait 3
s.client_connections.should eq 1
s.upstream_connections.should eq 2 # since connection stays open
s.upstream_connections.should eq 1 # since connection stays open
spawn do
begin
AMQP::Client.start("amqp://localhost:5673") do |conn|
Expand All @@ -153,7 +176,7 @@ describe AMQProxy::Server do
end
wait_for_channel.receive.should eq 4 # wait 4
s.client_connections.should eq 1 # since the new connection should not have worked
s.upstream_connections.should eq 2 # since connections stay open
s.upstream_connections.should eq 1 # since connections stay open
wait_for_channel.receive.should eq 5 # wait 5
s.client_connections.should eq 0 # since now the server should be closed
s.upstream_connections.should eq 1
Expand Down
30 changes: 0 additions & 30 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,3 @@ require "../src/amqproxy/version"
require "amqp-client"

MAYBE_SUDO = (ENV.has_key?("NO_SUDO") || `id -u` == "0\n") ? "" : "sudo "

# Spec timeout borrowed from Crystal project:
# https://github.com/crystal-lang/crystal/blob/1.10.1/spec/support/mt_abort_timeout.cr

private SPEC_TIMEOUT = 15.seconds

Spec.around_each do |example|
done = Channel(Exception?).new

spawn(same_thread: true) do
begin
example.run
rescue e
done.send(e)
else
done.send(nil)
end
end

timeout = SPEC_TIMEOUT

select
when res = done.receive
raise res if res
when timeout(timeout)
_it = example.example
ex = Spec::AssertionFailed.new("spec timed out after #{timeout}", _it.file, _it.line)
_it.parent.report(:fail, _it.description, _it.file, _it.line, timeout, ex)
end
end
44 changes: 37 additions & 7 deletions src/amqproxy.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ require "./amqproxy/server"
require "option_parser"
require "uri"
require "ini"
require "logger"
require "log"

class AMQProxy::CLI
@listen_address = ENV["LISTEN_ADDRESS"]? || "localhost"
@listen_port = ENV["LISTEN_PORT"]? || 5673
@log_level : Logger::Severity = Logger::INFO
@log_level : Log::Severity = Log::Severity::Info
@idle_connection_timeout : Int32 = ENV.fetch("IDLE_CONNECTION_TIMEOUT", "5").to_i
@upstream = ENV["AMQP_URL"]?

Expand All @@ -19,7 +19,7 @@ class AMQProxy::CLI
section.each do |key, value|
case key
when "upstream" then @upstream = value
when "log_level" then @log_level = Logger::Severity.parse(value)
when "log_level" then @log_level = Log::Severity.parse(value)
when "idle_connection_timeout" then @idle_connection_timeout = value.to_i
else raise "Unsupported config #{name}/#{key}"
end
Expand All @@ -29,7 +29,7 @@ class AMQProxy::CLI
case key
when "port" then @listen_port = value
when "bind", "address" then @listen_address = value
when "log_level" then @log_level = Logger::Severity.parse(value)
when "log_level" then @log_level = Log::Severity.parse(value)
else raise "Unsupported config #{name}/#{key}"
end
end
Expand All @@ -50,7 +50,7 @@ class AMQProxy::CLI
parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maxiumum time in seconds an unused pooled connection stays open (default 5s)") do |v|
@idle_connection_timeout = v.to_i
end
parser.on("-d", "--debug", "Verbose logging") { @log_level = Logger::DEBUG }
parser.on("-d", "--debug", "Verbose logging") { @log_level = Log::Severity::Debug }
parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) }
parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 }
parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 }
Expand All @@ -71,15 +71,23 @@ class AMQProxy::CLI
port = u.port || default_port
tls = u.scheme == "amqps"

server = AMQProxy::Server.new(u.host || "", port, tls, @log_level, @idle_connection_timeout)
log_backend = if ENV.has_key?("JOURNAL_STREAM")
Log::IOBackend.new(formatter: JournalLogFormat, dispatcher: ::Log::DirectDispatcher)
else
Log::IOBackend.new(formatter: StdoutLogFormat, dispatcher: ::Log::DirectDispatcher)
end
Log.setup_from_env(default_level: @log_level, backend: log_backend)

server = AMQProxy::Server.new(u.host || "", port, tls, @idle_connection_timeout)

first_shutdown = true
shutdown = ->(_s : Signal) do
if first_shutdown
first_shutdown = false
server.stop_accepting_clients
else
server.disconnect_clients
else
server.close_sockets
end
end
Signal::INT.trap &shutdown
Expand All @@ -92,6 +100,28 @@ class AMQProxy::CLI
sleep 0.2
end
end

struct JournalLogFormat < Log::StaticFormatter
def run
source
context(before: '[', after: ']')
string ' '
message
exception
end
end

struct StdoutLogFormat < Log::StaticFormatter
def run
timestamp
severity
source(before: ' ')
context(before: '[', after: ']')
string ' '
message
exception
end
end
end

AMQProxy::CLI.new.run
Loading
Loading