Skip to content

Commit

Permalink
Refactor to make testing easier
Browse files Browse the repository at this point in the history
  • Loading branch information
spuun committed Oct 23, 2024
1 parent 7f1e1b7 commit 729658f
Showing 1 changed file with 30 additions and 16 deletions.
46 changes: 30 additions & 16 deletions src/amqproxy/cli.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class AMQProxy::CLI
@term_timeout = -1
@term_client_close_timeout = 0
@upstream = ENV["AMQP_URL"]?
@server : AMQProxy::Server? = nil

def parse_config(path) # ameba:disable Metrics/CyclomaticComplexity
INI.parse(File.read(path)).each do |name, section|
Expand Down Expand Up @@ -49,6 +50,8 @@ class AMQProxy::CLI
end

def run(argv)
raise "run cant be called multiple times" unless @server.nil?

p = OptionParser.parse(argv) do |parser|
parser.banner = "Usage: amqproxy [options] [amqp upstream url]"
parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v|
Expand Down Expand Up @@ -93,24 +96,15 @@ class AMQProxy::CLI
end
::Log.setup_from_env(default_level: @log_level, backend: log_backend)

server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout)
Signal::INT.trap &->self.initiate_shutdown(Signal)
Signal::TERM.trap &->self.initiate_shutdown(Signal)

first_shutdown = true
initiate_shutdown = ->(_s : Signal) do
if first_shutdown
first_shutdown = false
server.stop_accepting_clients
else
abort "Exiting with #{server.client_connections} client connections still open"
end
end
Signal::INT.trap &initiate_shutdown
Signal::TERM.trap &initiate_shutdown
server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout)

HTTPServer.new(server, @listen_address, @http_port.to_i)
server.listen(@listen_address, @listen_port.to_i)

shutdown server
shutdown

# wait until all client connections are closed
until server.client_connections.zero?
Expand All @@ -119,10 +113,27 @@ class AMQProxy::CLI
Log.info { "No clients left. Exiting." }
end

def shutdown(server)
@first_shutdown = true

def initiate_shutdown(_s : Signal)
unless server = @server
exit 0
end
if @first_shutdown
@first_shutdown = false
server.stop_accepting_clients
else
abort "Exiting with #{server.client_connections} client connections still open"
end
end

def shutdown
unless server = @server
raise "Can't call shutdown before run"
end
if server.client_connections > 0
if @term_client_close_timeout > 0
wait_for_clients_to_close(server, @term_client_close_timeout.seconds)
wait_for_clients_to_close @term_client_close_timeout.seconds
end
server.disconnect_clients
end
Expand All @@ -137,7 +148,10 @@ class AMQProxy::CLI
end
end

def wait_for_clients_to_close(server, close_timeout)
def wait_for_clients_to_close(close_timeout)
unless server = @server
raise "Can't call shutdown before run"
end
Log.info { "Waiting for clients to close their connections." }
ch = Channel(Bool).new
spawn do
Expand Down

0 comments on commit 729658f

Please sign in to comment.