diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index dbd6e41..18dc78f 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -17,6 +17,7 @@ class AMQProxy::CLI @term_timeout = -1 @term_client_close_timeout = 0 @upstream = ENV["AMQP_URL"]? + @server : AMQProxy::Server? = nil def parse_config(path) # ameba:disable Metrics/CyclomaticComplexity INI.parse(File.read(path)).each do |name, section| @@ -49,6 +50,8 @@ class AMQProxy::CLI end def run(argv) + raise "run cant be called multiple times" unless @server.nil? + p = OptionParser.parse(argv) do |parser| parser.banner = "Usage: amqproxy [options] [amqp upstream url]" parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| @@ -93,24 +96,15 @@ class AMQProxy::CLI end ::Log.setup_from_env(default_level: @log_level, backend: log_backend) - server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout) + Signal::INT.trap &->self.initiate_shutdown(Signal) + Signal::TERM.trap &->self.initiate_shutdown(Signal) - first_shutdown = true - initiate_shutdown = ->(_s : Signal) do - if first_shutdown - first_shutdown = false - server.stop_accepting_clients - else - abort "Exiting with #{server.client_connections} client connections still open" - end - end - Signal::INT.trap &initiate_shutdown - Signal::TERM.trap &initiate_shutdown + server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout) HTTPServer.new(server, @listen_address, @http_port.to_i) server.listen(@listen_address, @listen_port.to_i) - shutdown server + shutdown # wait until all client connections are closed until server.client_connections.zero? @@ -119,10 +113,27 @@ class AMQProxy::CLI Log.info { "No clients left. Exiting." } end - def shutdown(server) + @first_shutdown = true + + def initiate_shutdown(_s : Signal) + unless server = @server + exit 0 + end + if @first_shutdown + @first_shutdown = false + server.stop_accepting_clients + else + abort "Exiting with #{server.client_connections} client connections still open" + end + end + + def shutdown + unless server = @server + raise "Can't call shutdown before run" + end if server.client_connections > 0 if @term_client_close_timeout > 0 - wait_for_clients_to_close(server, @term_client_close_timeout.seconds) + wait_for_clients_to_close @term_client_close_timeout.seconds end server.disconnect_clients end @@ -137,7 +148,10 @@ class AMQProxy::CLI end end - def wait_for_clients_to_close(server, close_timeout) + def wait_for_clients_to_close(close_timeout) + unless server = @server + raise "Can't call shutdown before run" + end Log.info { "Waiting for clients to close their connections." } ch = Channel(Bool).new spawn do