diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 84f9ce822e..75beb47acc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -97,6 +97,7 @@ jobs: steps: - name: Install dependencies run: | + brew update brew install crystal etcd echo "/opt/homebrew/opt/etcd/bin" >> $GITHUB_PATH env: diff --git a/CHANGELOG.md b/CHANGELOG.md index 039a90ab2f..e021e82e09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - New UI for management interface [#821](https://github.com/cloudamqp/lavinmq/pull/821) - Use sent/received bytes instead of messages to trigger when other tasks can run [#863](https://github.com/cloudamqp/lavinmq/pull/863) +- Spread out stream queues GC-loop over time [#876](https://github.com/cloudamqp/lavinmq/pull/876) +- Don't unmap files on USR2 or when last consumer disconnects [#876](https://github.com/cloudamqp/lavinmq/pull/876) +- Unmap files when they are no longer in use [#876](https://github.com/cloudamqp/lavinmq/pull/876) ### Fixed @@ -19,6 +22,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Removed duplicate metric rss_bytes [#881](https://github.com/cloudamqp/lavinmq/pull/881) - Release leadership on graceful shutdown [#871](https://github.com/cloudamqp/lavinmq/pull/871) - Rescue more exceptions while reading msg store on startup [#865](https://github.com/cloudamqp/lavinmq/pull/865) +- Crystal 1.15 support [#905](https://github.com/cloudamqp/lavinmq/pull/905) +- lavinmqctl now handles pagination of large result sets [#904](https://github.com/cloudamqp/lavinmq/pull/904) +- Make clustering more reliable [#879](https://github.com/cloudamqp/lavinmq/pull/879) +- Strip newlines from logs [#896](https://github.com/cloudamqp/lavinmq/pull/896) ### Added diff --git a/README.md b/README.md index 28cd2ef228..0ab38adc4b 100644 --- a/README.md +++ b/README.md @@ -269,21 +269,6 @@ Stream queues are like append-only logs and can be consumed multiple times. Each Messages are only deleted when `max-length`, `max-length-bytes` or `max-age` are applied, either as queue arguments or as policies. The limits are checked only when new messages are published to the queue, and only act on whole segments (which by default are 8MiB), so the limits aren't necessarily exact. So even if a `max-age` limit is set, but no messages are published to the queue, messages might still be available in the stream queue that is way older that the limit specified. -## Contributors - -- [Carl Hörberg](mailto:carl@84codes.com) -- [Anders Bälter](mailto:anders@84codes.com) -- [Magnus Landerblom](mailto:mange@cloudamqp.com) -- [Magnus Hörberg](mailto:magnus@cloudamqp.com) -- [Johan Eckerström](mailto:johan.e@cloudamqp.com) -- [Anton Dalgren](mailto:anton@cloudamqp.com) -- [Patrik Ragnarsson](mailto:patrik@84codes.com) -- [Oskar Gustafsson](mailto:oskar@84codes.com) -- [Tobias Brodén](mailto:tobias@84codes.com) -- [Christina Dahlén](mailto:christina@84codes.com) -- [Erica Weistrand](mailto:erica@84codes.com) -- [Viktor Erlingsson](mailto:viktor@84codes.com) - ## License The software is licensed under the [Apache License 2.0](LICENSE). @@ -291,3 +276,6 @@ The software is licensed under the [Apache License 2.0](LICENSE). Copyright 2018-2024 84codes AB LavinMQ is a trademark of 84codes AB + +## Contact Us +Do you want to learn more? [Talk with our product experts](https://webforms.pipedrive.com/f/64JnLsqIMAdF2BDQ06ioKLhC2NuNmkwNplNhRxtIqlm0nFnuIeX97eb7fZKej0vFHZ). diff --git a/spec/clustering_spec.cr b/spec/clustering_spec.cr index d1859ee6bb..9aff334e84 100644 --- a/spec/clustering_spec.cr +++ b/spec/clustering_spec.cr @@ -13,9 +13,12 @@ describe LavinMQ::Clustering::Client do "--log-level=error", "--unsafe-no-fsync=true", "--force-new-cluster=true", + "--listen-peer-urls=http://127.0.0.1:12380", + "--listen-client-urls=http://127.0.0.1:12379", + "--advertise-client-urls=http://127.0.0.1:12379", }, output: STDOUT, error: STDERR) - client = HTTP::Client.new("127.0.0.1", 2379) + client = HTTP::Client.new("127.0.0.1", 12379) i = 0 loop do sleep 0.02.seconds @@ -26,7 +29,7 @@ describe LavinMQ::Clustering::Client do end rescue e : Socket::ConnectError i += 1 - raise "Cant connect to etcd on port 2379. Giving up after 100 tries. (#{e.message})" if i >= 100 + raise "Cant connect to etcd on port 12379. Giving up after 100 tries. (#{e.message})" if i >= 100 next end client.close @@ -40,7 +43,7 @@ describe LavinMQ::Clustering::Client do end it "can stream changes" do - replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0) + replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0) tcp_server = TCPServer.new("localhost", 0) spawn(replicator.listen(tcp_server), name: "repli server spec") config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir @@ -60,7 +63,7 @@ describe LavinMQ::Clustering::Client do done.receive end - server = LavinMQ::Server.new(follower_data_dir) + server = LavinMQ::Server.new(config) begin q = server.vhosts["/"].queues["repli"].as(LavinMQ::AMQP::DurableQueue) q.message_count.should eq 1 @@ -70,10 +73,12 @@ describe LavinMQ::Clustering::Client do ensure server.close end + ensure + replicator.try &.close end it "can stream full file" do - replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0) + replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0) tcp_server = TCPServer.new("localhost", 0) spawn(replicator.listen(tcp_server), name: "repli server spec") config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir @@ -91,7 +96,7 @@ describe LavinMQ::Clustering::Client do done.receive end - server = LavinMQ::Server.new(follower_data_dir) + server = LavinMQ::Server.new(config) begin server.users["u1"].should_not be_nil ensure @@ -102,6 +107,7 @@ describe LavinMQ::Clustering::Client do it "will failover" do config1 = LavinMQ::Config.new config1.data_dir = "/tmp/failover1" + config1.clustering_etcd_endpoints = "localhost:12379" config1.clustering_advertised_uri = "tcp://localhost:5681" config1.clustering_port = 5681 config1.amqp_port = 5671 @@ -110,6 +116,7 @@ describe LavinMQ::Clustering::Client do config2 = LavinMQ::Config.new config2.data_dir = "/tmp/failover2" + config2.clustering_etcd_endpoints = "localhost:12379" config2.clustering_advertised_uri = "tcp://localhost:5682" config2.clustering_port = 5682 config2.amqp_port = 5672 @@ -118,7 +125,7 @@ describe LavinMQ::Clustering::Client do listen = Channel(String).new spawn(name: "etcd elect leader spec") do - etcd = LavinMQ::Etcd.new + etcd = LavinMQ::Etcd.new("localhost:12379") etcd.elect_listen("lavinmq/leader") do |value| listen.send value end diff --git a/spec/etcd_spec.cr b/spec/etcd_spec.cr index a2e73d9359..e0dad4d477 100644 --- a/spec/etcd_spec.cr +++ b/spec/etcd_spec.cr @@ -1,6 +1,7 @@ require "spec" require "../src/lavinmq/etcd" require "file_utils" +require "http/client" describe LavinMQ::Etcd do it "can put and get" do diff --git a/spec/mfile_spec.cr b/spec/mfile_spec.cr index 6bab024658..0fddbecd3b 100644 --- a/spec/mfile_spec.cr +++ b/spec/mfile_spec.cr @@ -12,4 +12,21 @@ describe MFile do file.delete end end + + it "can be read" do + file = File.tempfile "mfile_spec" + file.print "hello world" + file.flush + begin + MFile.open(file.path) do |mfile| + buf = Bytes.new(6) + cnt = mfile.read(buf) + String.new(buf[0, cnt]).should eq "hello " + cnt = mfile.read(buf) + String.new(buf[0, cnt]).should eq "world" + end + ensure + file.delete + end + end end diff --git a/spec/schema_version_spec.cr b/spec/schema_version_spec.cr index d05462b052..3b09dc675b 100644 --- a/spec/schema_version_spec.cr +++ b/spec/schema_version_spec.cr @@ -6,9 +6,10 @@ describe LavinMQ::SchemaVersion do it "Empty file should raise IO::EOFError" do with_datadir do |data_dir| path = File.join(data_dir, "test_schema_version") - file = MFile.new(path, 12) - expect_raises(IO::EOFError) do - LavinMQ::SchemaVersion.verify(file, :message) + MFile.open(path, 12) do |file| + expect_raises(IO::EOFError) do + LavinMQ::SchemaVersion.verify(file, :message) + end end end end @@ -16,17 +17,19 @@ describe LavinMQ::SchemaVersion do it "Should verify schema version" do with_datadir do |data_dir| path = File.join(data_dir, "test_schema_version") - file = MFile.new(path, 12) - file.write_bytes LavinMQ::Schema::VERSION - LavinMQ::SchemaVersion.verify(file, :message).should eq LavinMQ::SchemaVersion::VERSIONS[:message] + MFile.open(path, 12) do |file| + file.write_bytes LavinMQ::Schema::VERSION + LavinMQ::SchemaVersion.verify(file, :message).should eq LavinMQ::SchemaVersion::VERSIONS[:message] + end end end it "Deletes empty file and creates a new when it is the first file" do with_datadir do |data_dir| path = File.join(data_dir, "msgs.0000000001") - file = MFile.new(path, LavinMQ::Config.instance.segment_size) - file.resize(LavinMQ::Config.instance.segment_size) + MFile.open(path, LavinMQ::Config.instance.segment_size) do |file| + file.resize(LavinMQ::Config.instance.segment_size) + end # init new message store msg_store = LavinMQ::Queue::MessageStore.new(data_dir, nil) msg_store.@segments.first_value.size.should eq 4 @@ -39,8 +42,9 @@ describe LavinMQ::SchemaVersion do v.declare_queue("q", true, false) data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.@queue_data_dir path = File.join(data_dir, "msgs.0000000002") - file = MFile.new(path, LavinMQ::Config.instance.segment_size) - file.resize(LavinMQ::Config.instance.segment_size) + MFile.open(path, LavinMQ::Config.instance.segment_size) do |file| + file.resize(LavinMQ::Config.instance.segment_size) + end # init new message store msg_store = LavinMQ::Queue::MessageStore.new(data_dir, nil) msg_store.@segments.size.should eq 1 diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index f4bf0652e1..f89124c64d 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -74,7 +74,7 @@ end def with_amqp_server(tls = false, replicator = LavinMQ::Clustering::NoopServer.new, & : LavinMQ::Server -> Nil) tcp_server = TCPServer.new("localhost", 0) - s = LavinMQ::Server.new(LavinMQ::Config.instance.data_dir, replicator) + s = LavinMQ::Server.new(LavinMQ::Config.instance, replicator) begin if tls ctx = OpenSSL::SSL::Context::Server.new diff --git a/spec/storage_spec.cr b/spec/storage_spec.cr index fd0ed74e87..f139c61e1a 100644 --- a/spec/storage_spec.cr +++ b/spec/storage_spec.cr @@ -11,7 +11,8 @@ describe LavinMQ::AMQP::DurableQueue do end it "should succefully convert queue index" do - server = LavinMQ::Server.new("/tmp/lavinmq-spec-index-v2") + config = LavinMQ::Config.new.tap &.data_dir = "/tmp/lavinmq-spec-index-v2" + server = LavinMQ::Server.new(config) begin q = server.vhosts["/"].queues["queue"].as(LavinMQ::AMQP::DurableQueue) q.basic_get(true) do |env| @@ -176,7 +177,7 @@ describe LavinMQ::VHost do overhead = 21 body = Bytes.new(msg_size) - segments = ->{ Dir.new(vhost.data_dir).children.select!(/^msgs\./) } + segments = -> { Dir.new(vhost.data_dir).children.select!(/^msgs\./) } size_of_current_segment = File.size(File.join(vhost.data_dir, segments.call.last)) diff --git a/spec/upstream_spec.cr b/spec/upstream_spec.cr index 35a1b50cb2..9f7e53efa8 100644 --- a/spec/upstream_spec.cr +++ b/spec/upstream_spec.cr @@ -392,7 +392,7 @@ describe LavinMQ::Federation::Upstream do select when ch.receive? - when timeout 100.milliseconds + when timeout 3.seconds fail "federation didn't resume? timeout waiting for message on downstream queue" end diff --git a/spec/vhost_spec.cr b/spec/vhost_spec.cr index 81272856e8..03bca2d000 100644 --- a/spec/vhost_spec.cr +++ b/spec/vhost_spec.cr @@ -35,7 +35,7 @@ describe LavinMQ::VHost do end it "should be able to persist durable delayed exchanges when type = x-delayed-message" do - data_dir = "" + config = LavinMQ::Config.new with_amqp_server do |s| # This spec is to verify a fix where a server couldn't start again after a crash if # an delayed exchange had been declared by specifiying the type as "x-delayed-message". @@ -46,11 +46,11 @@ describe LavinMQ::VHost do # Start a new server with the same data dir as `Server` without stopping # `Server` first, because stopping would compact definitions and therefore "rewrite" - data_dir = s.data_dir + config.data_dir = s.data_dir end # the definitions file. This is to simulate a start after a "crash". # If this succeeds we assume it worked...? - LavinMQ::Server.new(data_dir) + LavinMQ::Server.new(config) end it "should be able to persist durable queues" do diff --git a/src/lavinmq/amqp/connection_factory.cr b/src/lavinmq/amqp/connection_factory.cr index a427ba5eee..a9774da4ae 100644 --- a/src/lavinmq/amqp/connection_factory.cr +++ b/src/lavinmq/amqp/connection_factory.cr @@ -47,7 +47,7 @@ module LavinMQ elsif proto != AMQP::PROTOCOL_START_0_9_1 && proto != AMQP::PROTOCOL_START_0_9 socket.write AMQP::PROTOCOL_START_0_9_1.to_slice socket.flush - log.warn { "Unexpected protocol '#{String.new(proto.to_slice)}', closing socket" } + log.warn { "Unexpected protocol #{String.new(proto.to_unsafe, count).inspect}, closing socket" } false else true diff --git a/src/lavinmq/amqp/queue/message_store.cr b/src/lavinmq/amqp/queue/message_store.cr index 809b240713..801db8d211 100644 --- a/src/lavinmq/amqp/queue/message_store.cr +++ b/src/lavinmq/amqp/queue/message_store.cr @@ -382,7 +382,8 @@ module LavinMQ bytesize = BytesMessage.skip(mfile) count += 1 next if deleted?(seg, pos) - update_stats_per_msg(seg, ts, bytesize) + @bytesize += bytesize + @size += 1 rescue ex : IO::EOFError break rescue ex : OverflowError | AMQ::Protocol::Error::FrameDecode @@ -402,11 +403,6 @@ module LavinMQ @log.info { "Loaded #{counter} segments, #{@size} messages" } end - private def update_stats_per_msg(seg, ts, bytesize) - @bytesize += bytesize - @size += 1 - end - private def delete_unused_segments : Nil current_seg = @segments.last_key @segments.reject! do |seg, mfile| diff --git a/src/lavinmq/amqp/queue/queue.cr b/src/lavinmq/amqp/queue/queue.cr index 06e8bae7c7..eeee78c8b0 100644 --- a/src/lavinmq/amqp/queue/queue.cr +++ b/src/lavinmq/amqp/queue/queue.cr @@ -854,9 +854,6 @@ module LavinMQ::AMQP delete else notify_consumers_empty(true) - @msg_store_lock.synchronize do - @msg_store.unmap_segments - end end end end diff --git a/src/lavinmq/amqp/queue/stream_queue.cr b/src/lavinmq/amqp/queue/stream_queue.cr index 165f809822..5b803debe9 100644 --- a/src/lavinmq/amqp/queue/stream_queue.cr +++ b/src/lavinmq/amqp/queue/stream_queue.cr @@ -152,6 +152,7 @@ module LavinMQ::AMQP end private def unmap_and_remove_segments_loop + sleep rand(60).seconds until closed? sleep 60.seconds unmap_and_remove_segments diff --git a/src/lavinmq/clustering/actions.cr b/src/lavinmq/clustering/actions.cr index 04277fbe2c..a65a87baac 100644 --- a/src/lavinmq/clustering/actions.cr +++ b/src/lavinmq/clustering/actions.cr @@ -14,6 +14,7 @@ module LavinMQ abstract def lag_size : Int64 abstract def send(socket : IO, log = Log) : Int64 + abstract def done getter filename @@ -25,15 +26,16 @@ module LavinMQ struct AddAction < Action def initialize(@data_dir : String, @filename : String, @mfile : MFile? = nil) + @mfile.try &.reserve end def lag_size : Int64 if mfile = @mfile - 0i64 + sizeof(Int32) + filename.bytesize + + 0i64 + sizeof(Int32) + @filename.bytesize + sizeof(Int64) + mfile.size.to_i64 else - 0i64 + sizeof(Int32) + filename.bytesize + - sizeof(Int64) + File.size(File.join(@data_dir, filename)).to_i64 + 0i64 + sizeof(Int32) + @filename.bytesize + + sizeof(Int64) + File.size(File.join(@data_dir, @filename)).to_i64 end end @@ -53,11 +55,22 @@ module LavinMQ size end end + ensure + done + end + + def done + if mfile = @mfile + mfile.unreserve + end end end struct AppendAction < Action def initialize(@data_dir : String, @filename : String, @obj : Bytes | FileRange | UInt32 | Int32) + if range = @obj.as?(FileRange) + range.mfile.reserve + end end def lag_size : Int64 @@ -69,7 +82,7 @@ module LavinMQ in UInt32, Int32 4i64 end - 0i64 + sizeof(Int32) + filename.bytesize + + 0i64 + sizeof(Int32) + @filename.bytesize + sizeof(Int64) + datasize end @@ -92,6 +105,14 @@ module LavinMQ end log.debug { "Append #{len} bytes to #{@filename}" } len + ensure + done + end + + def done + if fr = @obj.as?(FileRange) + fr.mfile.unreserve + end end end @@ -100,7 +121,7 @@ module LavinMQ # Maybe it would be ok to not include delete action in lag, because # the follower should have all info necessary to GC the file during # startup? - (sizeof(Int32) + filename.bytesize + sizeof(Int64)).to_i64 + (sizeof(Int32) + @filename.bytesize + sizeof(Int64)).to_i64 end def send(socket, log = Log) : Int64 @@ -109,6 +130,9 @@ module LavinMQ socket.write_bytes 0i64 0i64 end + + def done + end end end end diff --git a/src/lavinmq/clustering/client.cr b/src/lavinmq/clustering/client.cr index adf97b8b14..6e96eb9ec1 100644 --- a/src/lavinmq/clustering/client.cr +++ b/src/lavinmq/clustering/client.cr @@ -58,6 +58,7 @@ module LavinMQ end def follow(host : String, port : Int32) + Log.info { "Following #{host}:#{port}" } @host = host @port = port if amqp_proxy = @amqp_proxy diff --git a/src/lavinmq/clustering/controller.cr b/src/lavinmq/clustering/controller.cr index 19f81b7c4d..1988d33c5c 100644 --- a/src/lavinmq/clustering/controller.cr +++ b/src/lavinmq/clustering/controller.cr @@ -17,14 +17,26 @@ class LavinMQ::Clustering::Controller def run spawn(follow_leader, name: "Follower monitor") wait_to_be_insync - lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader - replicator = Clustering::Server.new(@config, @etcd) - @launcher = l = Launcher.new(@config, replicator, lease) - l.run + @lease = lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader + # TODO: make sure we still are in the ISR set + @launcher = Launcher.new(@config, @etcd).start + loop do + if lease.wait(30.seconds) + break if @stopped + Log.fatal { "Lost cluster leadership" } + exit 3 + else + GC.collect + end + end end + @stopped = false + def stop + @stopped = true @launcher.try &.stop + @lease.try &.release end # Each node in a cluster has an unique id, for tracking ISR @@ -66,6 +78,9 @@ class LavinMQ::Clustering::Controller spawn r.follow(uri), name: "Clustering client #{uri}" SystemD.notify_ready end + rescue ex + Log.fatal(exception: ex) { "Unhandled exception while following leader" } + exit 36 # 36 for CF (Cluster Follower) end def wait_to_be_insync diff --git a/src/lavinmq/clustering/follower.cr b/src/lavinmq/clustering/follower.cr index 55e3e64a71..53b50cc220 100644 --- a/src/lavinmq/clustering/follower.cr +++ b/src/lavinmq/clustering/follower.cr @@ -5,6 +5,7 @@ require "socket" require "lz4" require "digest/crc32" require "digest/sha1" +require "wait_group" module LavinMQ module Clustering @@ -14,7 +15,7 @@ module LavinMQ @acked_bytes = 0_i64 @sent_bytes = 0_i64 @actions = Channel(Action).new(Config.instance.clustering_max_unsynced_actions) - @closed = false + @running = WaitGroup.new getter id = -1 getter remote_address @@ -31,7 +32,6 @@ module LavinMQ @checksum_algo = validate_header! authenticate!(password) @id = @socket.read_bytes Int32, IO::ByteFormat::LittleEndian - @socket.read_timeout = nil @socket.tcp_nodelay = true @socket.read_buffering = false @socket.sync = true # Use buffering in lz4 @@ -52,6 +52,7 @@ module LavinMQ end def action_loop(lz4 = @lz4) + @running.add while action = @actions.receive? action.send(lz4, Log) sent_bytes = action.lag_size.to_i64 @@ -63,12 +64,7 @@ module LavinMQ sync(sent_bytes) end ensure - begin - @lz4.close - @socket.close - rescue IO::Error - # ignore connection errors while closing - end + @running.done end private def sync(bytes, socket = @socket) : Nil @@ -80,9 +76,6 @@ module LavinMQ private def read_ack(socket = @socket) : Int64 len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian) @acked_bytes += len - if @closed && lag_in_bytes.zero? - @closed_and_in_sync.close - end len end @@ -182,18 +175,20 @@ module LavinMQ lag_size end - @closed_and_in_sync = Channel(Nil).new - - def close(timeout : Time::Span = 30.seconds) - @closed = true + def close @actions.close - if lag_in_bytes > 0 - Log.info { "Waiting for follower to be in sync" } - select - when @closed_and_in_sync.receive? - when timeout(timeout) - Log.warn { "Timeout waiting for follower to be in sync" } - end + @running.wait # let action_loop finish + + # abort remaining actions (unmap pending files) + while action = @actions.receive? + action.done + end + + begin + @lz4.close + @socket.close + rescue IO::Error + # ignore connection errors while closing end end diff --git a/src/lavinmq/data_dir_lock.cr b/src/lavinmq/data_dir_lock.cr index 87d752a4f6..cbd63db05a 100644 --- a/src/lavinmq/data_dir_lock.cr +++ b/src/lavinmq/data_dir_lock.cr @@ -21,14 +21,13 @@ module LavinMQ @lock.flock_exclusive(blocking: true) Log.info { "Lock acquired" } end + Log.debug { "Data directory lock aquired" } @lock.truncate @lock.print "PID #{Process.pid} @ #{System.hostname}" - @lock.fsync end def release @lock.truncate - @lock.fsync @lock.flock_unlock end @@ -37,7 +36,8 @@ module LavinMQ def poll @lock.read_at(0, 1, &.read_byte) || raise IO::EOFError.new rescue ex : IO::Error | ArgumentError - abort "ERROR: Lost data directory lock! #{ex.inspect}" + Log.fatal(exception: ex) { "Lost data dir lock" } + exit 4 # 4 for D(dataDir) end end end diff --git a/src/lavinmq/etcd.cr b/src/lavinmq/etcd.cr index fab7a3728b..e9acad2135 100644 --- a/src/lavinmq/etcd.cr +++ b/src/lavinmq/etcd.cr @@ -1,4 +1,4 @@ -require "http/client" +require "socket" require "wait_group" require "json" require "./logger" @@ -35,7 +35,7 @@ module LavinMQ def watch(key, &) body = %({"create_request":{"key":"#{Base64.strict_encode key}"}}) - post_stream("/v3/watch", body) do |json| + stream("/v3/watch", body) do |json| next if json.dig?("result", "created") == true # "watch created" is first event if value = json.dig?("result", "events", 0, "kv", "value") @@ -61,10 +61,17 @@ module LavinMQ if ttl = json.dig?("result", "TTL") ttl.as_s.to_i else - raise Error.new("Lost lease #{id}") + raise Error.new("Lease #{id} expired") end end + def lease_ttl(id) : Int32 + json = post("/v3/lease/timetolive", body: %({"ID":"#{id}"})) + ttl = json["TTL"].as_s.to_i + raise Error.new("Lease #{id} expired") if ttl < 0 + ttl + end + def lease_revoke(id) : Nil post("/v3/lease/revoke", body: %({"ID":"#{id}"})) end @@ -81,56 +88,51 @@ module LavinMQ # Returns when elected leader # Returns a `Leadership` instance def elect(name, value, ttl = 10) : Leadership - channel = Channel(Nil).new - lease_id, ttl = lease_grant(ttl) - wg = WaitGroup.new(1) - spawn(name: "Etcd lease keepalive #{lease_id}") do - wg.done - loop do - select - when channel.receive? - lease_revoke(lease_id) - channel.close - break - when timeout((ttl * 0.7).seconds) - ttl = lease_keepalive(lease_id) - end - rescue ex - Log.warn { "Lost leadership of #{name}: #{ex}" } - channel.close - break - end - end + lease_id, _ttl = lease_grant(ttl) election_campaign(name, value, lease_id) - wg.wait - Leadership.new(self, lease_id, channel) + Leadership.new(self, lease_id) end - # Represents a holding a Leadership + # Represents holding a Leadership # Can be revoked or wait until lost class Leadership - def initialize(@etcd : Etcd, @lease_id : Int64, @lost_leadership_channel : Channel(Nil)) + def initialize(@etcd : Etcd, @lease_id : Int64) + @lost_leadership = Channel(Nil).new + spawn(keepalive_loop, name: "Etcd lease keepalive #{@lease_id}") end # Force release leadership def release @etcd.lease_revoke(@lease_id) + @lost_leadership.close end # Wait until looses leadership # Returns true when lost leadership, false when timeout occured def wait(timeout : Time::Span) : Bool select - when @lost_leadership_channel.receive? - return true + when @lost_leadership.receive? + true when timeout(timeout) - return false + false end end + + private def keepalive_loop + ttl = @etcd.lease_ttl(@lease_id) + loop do + sleep (ttl * 0.7).seconds + ttl = @etcd.lease_keepalive(@lease_id) + end + rescue ex + Log.error(exception: ex) { "Lost leadership" } unless @lost_leadership.closed? + ensure + @lost_leadership.close + end end def elect_listen(name, &) - post_stream("/v3/election/observe", %({"name":"#{Base64.strict_encode name}"})) do |json| + stream("/v3/election/observe", %({"name":"#{Base64.strict_encode name}"})) do |json| if value = json.dig?("result", "kv", "value") yield Base64.decode_string(value.as_s) else @@ -153,31 +155,37 @@ module LavinMQ chunks = read_chunks(tcp) parse_json! chunks else - body = tcp.read_string(content_length) + body = read_string(tcp, content_length) parse_json! body end end - private def post_stream(path, body, & : JSON::Any -> _) + private def stream(path, body, & : JSON::Any -> _) with_tcp do |tcp, address| - send_request(tcp, address, path, body) - content_length = read_headers(tcp) - if content_length == -1 # Chunked response - read_chunks(tcp) do |chunk| - yield parse_json! chunk - end - else - body = tcp.read_string(content_length) - yield parse_json! body + post_stream(tcp, address, path, body) do |chunk| + yield chunk end end end + private def post_stream(tcp, address, path, body, & : JSON::Any -> _) + send_request(tcp, address, path, body) + content_length = read_headers(tcp) + if content_length == -1 # Chunked response + read_chunks(tcp) do |chunk| + yield parse_json! chunk + end + else + body = read_string(tcp, content_length) + yield parse_json! body + end + end + private def read_chunks(tcp, & : String -> _) : Nil response_finished = false loop do - bytesize = tcp.read_line.to_i(16) - chunk = tcp.read_string(bytesize) + bytesize = read_chunk_size(tcp) + chunk = read_string(tcp, bytesize) tcp.skip(2) # each chunk ends with \r\n break if bytesize.zero? yield chunk @@ -196,6 +204,20 @@ module LavinMQ break if bytesize.zero? end end + rescue ex : IO::Error + raise Error.new("Read chunked response error", cause: ex) + end + + def read_string(tcp, content_length) : String + tcp.read_string(content_length) + rescue ex : IO::Error + raise Error.new("Read response error", cause: ex) + end + + def read_chunk_size(tcp) : Int32 + tcp.read_line.to_i(16) + rescue ex : IO::Error + raise Error.new("Read response error", cause: ex) end private def send_request(tcp : IO, address : String, path : String, body : String) @@ -205,6 +227,8 @@ module LavinMQ tcp << "\r\n" tcp << body tcp.flush + rescue ex : IO::Error + raise Error.new("Send request error", cause: ex) end # Parse response headers, return Content-Length (-1 implies chunked response) @@ -233,6 +257,8 @@ module LavinMQ end end content_length + rescue ex : IO::Error + raise Error.new("Read response error", cause: ex) end @connections = Deque(Tuple(TCPSocket, String)).new @@ -248,10 +274,6 @@ module LavinMQ Log.warn { "Service Unavailable at #{address}, #{ex.message}, retrying" } socket.close rescue nil sleep 0.1.seconds - rescue IO::Error - Log.warn { "Lost connection to #{address}, retrying" } - socket.close rescue nil - sleep 0.1.seconds ensure @connections.push({socket, address}) unless socket.closed? end @@ -313,18 +335,19 @@ module LavinMQ private def raise_if_error(json) if error = json["error"]? Log.debug { "etcd error: #{error}" } - if errorh = error.as_h? - error_msg = errorh["message"].as_s - case error_msg - when "error reading from server: EOF" - raise IO::EOFError.new(error_msg) - when "etcdserver: no leader" - raise NoLeader.new(error_msg) + error_msg = + if errorh = error.as_h? + errorh["message"].as_s else - raise Error.new error_msg + error.as_s end + case error_msg + when "error reading from server: EOF" + raise IO::EOFError.new(error_msg) + when "etcdserver: no leader" + raise NoLeader.new(error_msg) else - raise Error.new error.as_s + raise Error.new error_msg end end end diff --git a/src/lavinmq/http/constants.cr b/src/lavinmq/http/constants.cr index d3ccfafef8..d61cd33d60 100644 --- a/src/lavinmq/http/constants.cr +++ b/src/lavinmq/http/constants.cr @@ -1,5 +1,6 @@ module LavinMQ module HTTP INTERNAL_UNIX_SOCKET = "/tmp/lavinmqctl.sock" + MAX_PAGE_SIZE = 10_000 end end diff --git a/src/lavinmq/http/controller.cr b/src/lavinmq/http/controller.cr index 76b47c5004..f957649261 100644 --- a/src/lavinmq/http/controller.cr +++ b/src/lavinmq/http/controller.cr @@ -32,15 +32,13 @@ module LavinMQ value[:name]? || value["name"]? end - MAX_PAGE_SIZE = 10_000 - private def page(context, iterator : Iterator(SortableJSON)) params = context.request.query_params page = params["page"]?.try(&.to_i) || 1 page_size = params["page_size"]?.try(&.to_i) || 100 if page_size > MAX_PAGE_SIZE context.response.status_code = 413 - {error: "payload_too_large", reason: "Max allowed page_size 10000"}.to_json(context.response) + {error: "payload_too_large", reason: "Max allowed page_size #{MAX_PAGE_SIZE}"}.to_json(context.response) return context end iterator = iterator.map do |i| diff --git a/src/lavinmq/launcher.cr b/src/lavinmq/launcher.cr index ec1ada68e3..d4e1094c76 100644 --- a/src/lavinmq/launcher.cr +++ b/src/lavinmq/launcher.cr @@ -15,9 +15,8 @@ module LavinMQ @first_shutdown_attempt = true @data_dir_lock : DataDirLock? @closed = false - @leadership : Etcd::Leadership? - def initialize(@config : Config, replicator = Clustering::NoopServer.new, @leadership = nil) + def initialize(@config : Config, etcd : Etcd? = nil) print_environment_info print_max_map_count fd_limit = System.maximize_fd_limit @@ -30,7 +29,8 @@ module LavinMQ if @config.data_dir_lock? @data_dir_lock = DataDirLock.new(@config.data_dir).tap &.acquire end - @amqp_server = LavinMQ::Server.new(@config.data_dir, replicator) + @replicator = replicator = etcd ? Clustering::Server.new(@config, etcd) : Clustering::NoopServer.new + @amqp_server = LavinMQ::Server.new(@config, replicator) @http_server = LavinMQ::HTTP::Server.new(@amqp_server) @tls_context = create_tls_context if @config.tls_configured? reload_tls_context @@ -38,23 +38,17 @@ module LavinMQ setup_log_exchange end - def run + def start : self listen SystemD.notify_ready + self + end + + def run + start loop do - if leadership = @leadership - if leadership.wait(30.seconds) - Log.fatal { "Lost cluster leadership" } - exit 3 # 3rd character in the alphabet is C(lustering) - else - @data_dir_lock.try &.poll - GC.collect - end - else - sleep 30.seconds - @data_dir_lock.try &.poll - GC.collect - end + sleep 30.seconds + GC.collect end end @@ -66,7 +60,7 @@ module LavinMQ @http_server.close rescue nil @amqp_server.close rescue nil @data_dir_lock.try &.release - @leadership.try &.release + @replicator.close end private def print_environment_info @@ -173,17 +167,6 @@ module LavinMQ end private def run_gc - STDOUT.puts "Unmapping all segments" - STDOUT.flush - @amqp_server.vhosts.each_value do |vhost| - vhost.queues.each_value do |q| - if q = q.as(LavinMQ::AMQP::Queue) - msg_store = q.@msg_store - msg_store.@segments.each_value &.unmap - msg_store.@acks.each_value &.unmap - end - end - end STDOUT.puts "Garbage collecting" STDOUT.flush GC.collect diff --git a/src/lavinmq/mfile.cr b/src/lavinmq/mfile.cr index 98dc429d21..8101b7d5c3 100644 --- a/src/lavinmq/mfile.cr +++ b/src/lavinmq/mfile.cr @@ -1,3 +1,5 @@ +require "wait_group" + lib LibC MS_ASYNC = 1 MREMAP_MAYMOVE = 1 @@ -31,6 +33,7 @@ class MFile < IO getter fd : Int32 @buffer = Pointer(UInt8).null @deleted = false + @wg = WaitGroup.new # Map a file, if no capacity is given the file must exists and # the file will be mapped as readonly @@ -47,9 +50,8 @@ class MFile < IO end end - # Opens an existing file in readonly mode - def self.open(path, & : self -> _) - mfile = self.new(path) + def self.open(path, capacity : Int? = nil, writeonly = false, & : self -> _) + mfile = self.new(path, capacity, writeonly) begin yield mfile ensure @@ -100,13 +102,22 @@ class MFile < IO self end + def reserve + @wg.add + end + + def unreserve + @wg.done + end + # The file will be truncated to the current position unless readonly or deleted def close(truncate_to_size = true) - # unmap occurs on finalize if truncate_to_size && !@readonly && !@deleted && @fd > 0 code = LibC.ftruncate(@fd, @size) raise File::Error.from_errno("Error truncating file", file: @path) if code < 0 end + + unmap ensure unless @fd == -1 code = LibC.close(@fd) @@ -130,8 +141,22 @@ class MFile < IO # unload the memory mapping, will be remapped on demand def unmap : Nil - munmap + # unmap if non has reserved the file, race condition prone? + if @wg.@counter.get(:acquire).zero? + unsafe_unmap + else + spawn(name: "munmap #{@path}") do + @wg.wait + unsafe_unmap + end + end + end + + private def unsafe_unmap : Nil + b = @buffer + c = @capacity @buffer = Pointer(UInt8).null + munmap(b, c) end def unmapped? : Bool @@ -170,11 +195,6 @@ class MFile < IO raise RuntimeError.from_errno("msync") if code < 0 end - def finalize - LibC.close(@fd) if @fd > -1 - LibC.munmap(@buffer, @capacity) unless @buffer.null? - end - def write(slice : Bytes) : Nil size = @size new_size = size + slice.size @@ -185,11 +205,10 @@ class MFile < IO def read(slice : Bytes) pos = @pos - new_pos = pos + slice.size - raise IO::EOFError.new if new_pos > @size - slice.copy_from(buffer + pos, slice.size) - @pos = new_pos - slice.size + len = Math.min(slice.size, @size - pos) + slice.copy_from(buffer + pos, len) + @pos = pos + len + len end def rewind diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index 69b729079d..4d9e39cfb4 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -30,7 +30,8 @@ module LavinMQ @replicator : Clustering::Replicator Log = LavinMQ::Log.for "server" - def initialize(@data_dir : String, @replicator = Clustering::NoopServer.new) + def initialize(@config : Config, @replicator = Clustering::NoopServer.new) + @data_dir = @config.data_dir Dir.mkdir_p @data_dir Schema.migrate(@data_dir, @replicator) @users = UserStore.new(@data_dir, @replicator) @@ -99,17 +100,17 @@ module LavinMQ private def extract_conn_info(client) : ConnectionInfo remote_address = client.remote_address - case Config.instance.tcp_proxy_protocol + case @config.tcp_proxy_protocol when 1 then ProxyProtocol::V1.parse(client) when 2 then ProxyProtocol::V2.parse(client) else # Allow proxy connection from followers - if Config.instance.clustering? && + if @config.clustering? && client.peek[0, 5]? == "PROXY".to_slice && followers.any? { |f| f.remote_address.address == remote_address.address } # Expect PROXY protocol header if remote address is a follower ProxyProtocol::V1.parse(client) - elsif Config.instance.clustering? && + elsif @config.clustering? && client.peek[0, 8]? == ProxyProtocol::V2::Signature.to_slice[0, 8] && followers.any? { |f| f.remote_address.address == remote_address.address } # Expect PROXY protocol header if remote address is a follower @@ -130,7 +131,7 @@ module LavinMQ remote_address = client.remote_address set_buffer_size(client) conn_info = - case Config.instance.unix_proxy_protocol + case @config.unix_proxy_protocol when 1 then ProxyProtocol::V1.parse(client) when 2 then ProxyProtocol::V2.parse(client) else ConnectionInfo.local # TODO: use unix socket address, don't fake local @@ -205,7 +206,6 @@ module LavinMQ @listeners.each_key &.close Log.debug { "Closing vhosts" } @vhosts.close - @replicator.close end def add_parameter(parameter : Parameter) @@ -252,21 +252,21 @@ module LavinMQ private def set_socket_options(socket) unless socket.remote_address.loopback? - if keepalive = Config.instance.tcp_keepalive + if keepalive = @config.tcp_keepalive socket.keepalive = true socket.tcp_keepalive_idle = keepalive[0] socket.tcp_keepalive_interval = keepalive[1] socket.tcp_keepalive_count = keepalive[2] end end - socket.tcp_nodelay = true if Config.instance.tcp_nodelay? - Config.instance.tcp_recv_buffer_size.try { |v| socket.recv_buffer_size = v } - Config.instance.tcp_send_buffer_size.try { |v| socket.send_buffer_size = v } + socket.tcp_nodelay = true if @config.tcp_nodelay? + @config.tcp_recv_buffer_size.try { |v| socket.recv_buffer_size = v } + @config.tcp_send_buffer_size.try { |v| socket.send_buffer_size = v } end private def set_buffer_size(socket) - if Config.instance.socket_buffer_size.positive? - socket.buffer_size = Config.instance.socket_buffer_size + if @config.socket_buffer_size.positive? + socket.buffer_size = @config.socket_buffer_size socket.sync = false socket.read_buffering = true else @@ -288,8 +288,8 @@ module LavinMQ end def update_system_metrics(statm) - interval = Config.instance.stats_interval.milliseconds.to_i - log_size = Config.instance.stats_log_size + interval = @config.stats_interval.milliseconds.to_i + log_size = @config.stats_log_size rusage = System.resource_usage {% for m in METRICS %} @@ -353,7 +353,7 @@ module LavinMQ end control_flow! - sleep Config.instance.stats_interval.milliseconds + sleep @config.stats_interval.milliseconds end ensure statm.try &.close @@ -435,11 +435,11 @@ module LavinMQ end def disk_full? - @disk_free < 3_i64 * Config.instance.segment_size || @disk_free < Config.instance.free_disk_min + @disk_free < 3_i64 * @config.segment_size || @disk_free < @config.free_disk_min end def disk_usage_over_warning_level? - @disk_free < 6_i64 * Config.instance.segment_size || @disk_free < Config.instance.free_disk_warn + @disk_free < 6_i64 * @config.segment_size || @disk_free < @config.free_disk_warn end def flow(active : Bool) diff --git a/src/lavinmqctl.cr b/src/lavinmqctl.cr index fc8d076b31..7dd2dca32e 100644 --- a/src/lavinmqctl.cr +++ b/src/lavinmqctl.cr @@ -306,6 +306,21 @@ class LavinMQCtl URI.encode_www_form(@options["vhost"]) end + private def get(url, page = 1, items = Array(JSON::Any).new) + resp = http.get("#{url}?page=#{page}&page_size=#{LavinMQ::HTTP::MAX_PAGE_SIZE}", @headers) + handle_response(resp, 200) + if data = JSON.parse(resp.body).as_h? + items += data["items"].as_a + page = data["page"].as_i + if page < data["page_count"].as_i + return get(url, page + 1, items) + end + else + abort "Unexpected response from #{url}\n#{resp.body}" + end + items + end + private def import_definitions file = ARGV.shift? || "" resp = if file == "-" @@ -330,16 +345,12 @@ class LavinMQCtl end private def list_users - resp = http.get "/api/users", @headers - handle_response(resp, 200) puts "Listing users ..." unless quiet? - if users = JSON.parse(resp.body).as_a? - uu = users.map do |u| - next unless user = u.as_h? - {name: user["name"].to_s, tags: user["tags"].to_s} - end - output uu + uu = get("/api/users").map do |u| + next unless user = u.as_h? + {name: user["name"].to_s, tags: user["tags"].to_s} end + output uu end private def add_user @@ -375,18 +386,12 @@ class LavinMQCtl private def list_queues vhost = @options["vhost"]? || "/" - resp = http.get "/api/queues/#{URI.encode_www_form(vhost)}", @headers puts "Listing queues for vhost #{vhost} ..." unless quiet? - handle_response(resp, 200) - if queues = JSON.parse(resp.body).as_a? - qq = queues.map do |u| - next unless q = u.as_h? - {name: q["name"].to_s, messages: q["messages"].to_s} - end - output qq - else - abort "invalid data" + qq = get("/api/queues/#{URI.encode_www_form(vhost)}").map do |u| + next unless q = u.as_h? + {name: q["name"].to_s, messages: q["messages"].to_s} end + output qq end private def purge_queue @@ -416,35 +421,31 @@ class LavinMQCtl private def list_connections columns = ARGV columns = ["user", "peer_host", "peer_port", "state"] if columns.empty? - resp = http.get "/api/connections", @headers + conns = get("/api/connections") puts "Listing connections ..." unless quiet? - handle_response(resp, 200) - if conns = JSON.parse(resp.body).as_a? - if @options["format"]? == "json" - cc = conns.map do |u| - next unless conn = u.as_h? - conn.select { |k, _v| columns.includes? k } - end - output cc - else - puts columns.join(STDOUT, "\t") - conns.each do |u| - if conn = u.as_h? - columns.each_with_index do |c, i| - case c - when "client_properties" - print_erlang_terms(conn[c].as_h) - else - print conn[c] - end - print "\t" unless i == columns.size - 1 + + if @options["format"]? == "json" + cc = conns.map do |u| + next unless conn = u.as_h? + conn.select { |k, _v| columns.includes? k } + end + output cc + else + puts columns.join(STDOUT, "\t") + conns.each do |u| + if conn = u.as_h? + columns.each_with_index do |c, i| + case c + when "client_properties" + print_erlang_terms(conn[c].as_h) + else + print conn[c] end - puts + print "\t" unless i == columns.size - 1 end + puts end end - else - abort "invalid data" end end @@ -474,37 +475,26 @@ class LavinMQCtl end private def close_all_connections - resp = http.get "/api/connections", @headers - handle_response(resp, 200) + conns = get("/api/connections") closed_conns = [] of NamedTuple(name: String) - if conns = JSON.parse(resp.body).as_a? - @headers["X-Reason"] = ARGV.shift? || "Closed via lavinmqctl" - conns.each do |u| - next unless conn = u.as_h? - name = conn["name"].to_s - puts "Closing connection #{name} ..." unless quiet? - http.delete "/api/connections/#{URI.encode_path(name)}", @headers - closed_conns << {name: name} - end - else - abort "invalid data" + @headers["X-Reason"] = ARGV.shift? || "Closed via lavinmqctl" + conns.each do |u| + next unless conn = u.as_h? + name = conn["name"].to_s + puts "Closing connection #{name} ..." unless quiet? + http.delete "/api/connections/#{URI.encode_path(name)}", @headers + closed_conns << {name: name} end output closed_conns, ["closed_connections"] end private def list_vhosts - resp = http.get "/api/vhosts", @headers puts "Listing vhosts ..." unless quiet? - handle_response(resp, 200) - if vhosts = JSON.parse(resp.body).as_a? - vv = vhosts.map do |u| - next unless v = u.as_h? - {name: v["name"].to_s} - end - output vv - else - abort "invalid data" + vv = get("/api/vhosts").map do |u| + next unless v = u.as_h? + {name: v["name"].to_s} end + output vv end private def add_vhost @@ -531,14 +521,8 @@ class LavinMQCtl private def list_policies vhost = @options["vhost"]? || "/" - resp = http.get "/api/policies/#{URI.encode_www_form(vhost)}", @headers puts "Listing policies for vhost #{vhost} ..." unless quiet? - handle_response(resp, 200) - if policies = JSON.parse(resp.body).as_a? - output policies - else - abort "invalid data" - end + output get("/api/policies/#{URI.encode_www_form(vhost)}") end private def set_policy @@ -582,21 +566,16 @@ class LavinMQCtl private def list_exchanges vhost = @options["vhost"]? || "/" - resp = http.get "/api/exchanges/#{URI.encode_www_form(vhost)}", @headers puts "Listing exchanges for vhost #{vhost} ..." unless quiet? - handle_response(resp, 200) - if exchanges = JSON.parse(resp.body).as_a? - ee = exchanges.map do |u| - next unless e = u.as_h? - { - name: e["name"].to_s, - type: e["type"].to_s, - } - end - output ee - else - abort "invalid data" + + ee = get("/api/exchanges/#{URI.encode_www_form(vhost)}").map do |u| + next unless e = u.as_h? + { + name: e["name"].to_s, + type: e["type"].to_s, + } end + output ee end private def create_exchange diff --git a/src/stdlib/resource.cr b/src/stdlib/resource.cr index 080e1377ce..2b6c90ac66 100644 --- a/src/stdlib/resource.cr +++ b/src/stdlib/resource.cr @@ -1,20 +1,4 @@ lib LibC - {% if flag?(:linux) || flag?(:bsd) %} - alias RlimT = ULongLong - - struct Rlimit - rlim_cur : RlimT - rlim_max : RlimT - end - {% end %} - - {% if flag?(:linux) || flag?(:darwin) %} - RLIMIT_NOFILE = 7 - {% elsif flag?(:bsd) %} - RLIMIT_NOFILE = 8 - {% end %} - - fun getrlimit(Int, Rlimit*) : Int fun setrlimit(Int, Rlimit*) : Int end diff --git a/views/vhost.ecr b/views/vhost.ecr index 9abc448391..0533d8c4f1 100644 --- a/views/vhost.ecr +++ b/views/vhost.ecr @@ -100,12 +100,6 @@

Danger zone

-
- -
-