From a74d4a645d03abbf7a3b10f907b099b3d8179a15 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Wed, 19 Jun 2024 15:38:55 +0200 Subject: [PATCH] update specs to start amqp servers where needed --- spec/stream_queue_spec.cr | 282 ++++++++++++++++++++------------------ 1 file changed, 150 insertions(+), 132 deletions(-) diff --git a/spec/stream_queue_spec.cr b/spec/stream_queue_spec.cr index 9a5fdb0d03..4829f4429b 100644 --- a/spec/stream_queue_spec.cr +++ b/spec/stream_queue_spec.cr @@ -2,17 +2,17 @@ require "./spec_helper" require "./../src/lavinmq/queue" module StreamQueueSpecHelpers - def self.publish(queue_name, nr_of_messages) + def self.publish(s, queue_name, nr_of_messages) args = {"x-queue-type": "stream"} - with_channel do |ch| + with_channel(s) do |ch| q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) nr_of_messages.times { |i| q.publish "m#{i}" } end end - def self.consume_one(queue_name, c_tag, c_args = AMQP::Client::Arguments.new) + def self.consume_one(s, queue_name, c_tag, c_args = AMQP::Client::Arguments.new) args = {"x-queue-type": "stream"} - with_channel do |ch| + with_channel(s) do |ch| ch.prefetch 1 q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) msgs = Channel(AMQP::Client::DeliverMessage).new @@ -242,58 +242,63 @@ describe LavinMQ::StreamQueue do consumer_tag = Random::Secure.hex offset = 3 - StreamQueueSpecHelpers.publish(queue_name, offset + 1) - - offset.times { StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag) } - sleep 0.1 + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, offset + 1) + offset.times { StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) } + sleep 0.1 - # consume again, should start from last offset automatically - msg = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag) - StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq offset + 1 + # consume again, should start from last offset automatically + msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) + StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq offset + 1 + end end it "reads offsets from file on init" do queue_name = Random::Secure.hex - vhost = Server.vhosts["/"] offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64] tag_prefix = "ctag-" - StreamQueueSpecHelpers.publish(queue_name, 1) - data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name) - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) - offsets.each_with_index do |offset, i| - msg_store.update_consumer_offset(tag_prefix + i.to_s, offset) - end - msg_store.close - sleep 0.1 + with_amqp_server do |s| + vhost = s.vhosts["/"] + StreamQueueSpecHelpers.publish(s, queue_name, 1) + + data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name) + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + offsets.each_with_index do |offset, i| + msg_store.update_consumer_offset(tag_prefix + i.to_s, offset) + end + msg_store.close + sleep 0.1 - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) - offsets.each_with_index do |offset, i| - msg_store.last_offset_by_consumer_tag(tag_prefix + i.to_s).should eq offset + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + offsets.each_with_index do |offset, i| + msg_store.last_offset_by_consumer_tag(tag_prefix + i.to_s).should eq offset + end + msg_store.close end - msg_store.close end it "only saves one entry per consumer tag" do queue_name = Random::Secure.hex - vhost = Server.vhosts["/"] offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64] consumer_tag = "ctag-1" - StreamQueueSpecHelpers.publish(queue_name, 1) + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 1) - data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name) - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) - offsets.each do |offset| - msg_store.update_consumer_offset(consumer_tag, offset) - end - msg_store.close - sleep 0.1 + data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + offsets.each do |offset| + msg_store.update_consumer_offset(consumer_tag, offset) + end + msg_store.close + sleep 0.1 - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) - msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last - msg_store.@consumer_offsets.size.should eq 15 + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last + msg_store.@consumer_offsets.size.should eq 15 - msg_store.close + msg_store.close + end end it "does not track offset if x-stream-offset is set" do @@ -301,14 +306,16 @@ describe LavinMQ::StreamQueue do consumer_tag = Random::Secure.hex c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0}) - StreamQueueSpecHelpers.publish(queue_name, 2) - msg = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag, c_args) - StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1 - sleep 0.1 - - # should consume the same message again since tracking was not saved from last consume - msg_2 = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag) - StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1 + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 2) + msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args) + StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1 + sleep 0.1 + + # should consume the same message again since tracking was not saved from last consume + msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) + StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1 + end end it "should not use saved offset if x-stream-offset is set" do @@ -316,16 +323,18 @@ describe LavinMQ::StreamQueue do consumer_tag = Random::Secure.hex c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0}) - StreamQueueSpecHelpers.publish(queue_name, 2) + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 2) - # get message without x-stream-offset, tracks offset - msg = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag) - StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1 - sleep 0.1 + # get message without x-stream-offset, tracks offset + msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) + StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1 + sleep 0.1 - # consume with x-stream-offset set, should consume the same message again - msg_2 = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag, c_args) - StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1 + # consume with x-stream-offset set, should consume the same message again + msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args) + StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1 + end end it "should use saved offset if x-stream-offset & x-stream-automatic-offset-tracking is set" do @@ -333,123 +342,132 @@ describe LavinMQ::StreamQueue do consumer_tag = Random::Secure.hex c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0, "x-stream-automatic-offset-tracking": true}) - StreamQueueSpecHelpers.publish(queue_name, 2) + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 2) - # get message without x-stream-offset, tracks offset - msg = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag, c_args) - StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1 - sleep 0.1 + # get message without x-stream-offset, tracks offset + msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args) + StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1 + sleep 0.1 - # consume with x-stream-offset set, should consume the same message again - msg_2 = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag, c_args) - StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 2 + # consume with x-stream-offset set, should consume the same message again + msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args) + StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 2 + end end it "cleanup_consumer_offsets removes outdated offset" do queue_name = Random::Secure.hex - vhost = Server.vhosts["/"] offsets = [84_i64, -10_i64] tag_prefix = "ctag-" - StreamQueueSpecHelpers.publish(queue_name, 1) - data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name) - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) - offsets.each_with_index do |offset, i| - msg_store.update_consumer_offset(tag_prefix + i.to_s, offset) + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 1) + + data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + offsets.each_with_index do |offset, i| + msg_store.update_consumer_offset(tag_prefix + i.to_s, offset) + end + sleep 0.1 + msg_store.cleanup_consumer_offsets + msg_store.close + sleep 0.1 + + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil + msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0] + msg_store.close end - sleep 0.1 - msg_store.cleanup_consumer_offsets - msg_store.close - sleep 0.1 - - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) - msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil - msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0] - msg_store.close end it "runs cleanup when removing segment" do consumer_tag = "ctag-1" - vhost = Server.vhosts["/"] queue_name = Random::Secure.hex args = {"x-queue-type": "stream", "x-max-length": 1} msg_body = Bytes.new(LavinMQ::Config.instance.segment_size) - data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name) - with_channel do |ch| - q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) - q.publish_confirm msg_body - end + with_amqp_server do |s| + data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) - with_channel do |ch| - ch.prefetch 1 - q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) - msgs = Channel(AMQP::Client::DeliverMessage).new - q.subscribe(no_ack: false, tag: consumer_tag) do |msg| - msgs.send msg - msg.ack + with_channel(s) do |ch| + q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) + q.publish_confirm msg_body + end + + with_channel(s) do |ch| + ch.prefetch 1 + q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) + msgs = Channel(AMQP::Client::DeliverMessage).new + q.subscribe(no_ack: false, tag: consumer_tag) do |msg| + msgs.send msg + msg.ack + end + msgs.receive end - msgs.receive - end - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) - msg_store.last_offset_by_consumer_tag(consumer_tag).should eq 2 + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store.last_offset_by_consumer_tag(consumer_tag).should eq 2 - with_channel do |ch| - q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) - 2.times { q.publish_confirm msg_body } - end + with_channel(s) do |ch| + q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) + 2.times { q.publish_confirm msg_body } + end - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) - msg_store.last_offset_by_consumer_tag(consumer_tag).should eq nil + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store.last_offset_by_consumer_tag(consumer_tag).should eq nil + end end it "does not track offset if c-tag is auto-generated" do queue_name = Random::Secure.hex - StreamQueueSpecHelpers.publish(queue_name, 1) - args = {"x-queue-type": "stream"} - c_tag = "" - with_channel do |ch| - ch.prefetch 1 - q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) - msgs = Channel(AMQP::Client::DeliverMessage).new - c_tag = q.subscribe(no_ack: false) do |msg| - msgs.send msg - msg.ack + + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 1) + args = {"x-queue-type": "stream"} + c_tag = "" + with_channel(s) do |ch| + ch.prefetch 1 + q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) + msgs = Channel(AMQP::Client::DeliverMessage).new + c_tag = q.subscribe(no_ack: false) do |msg| + msgs.send msg + msg.ack + end + msgs.receive end - msgs.receive - end - sleep 0.1 - vhost = Server.vhosts["/"] - data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name) - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) - msg_store.last_offset_by_consumer_tag(c_tag).should eq nil + sleep 0.1 + data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store.last_offset_by_consumer_tag(c_tag).should eq nil + end end it "expands consumer offset file when needed" do queue_name = Random::Secure.hex - vhost = Server.vhosts["/"] consumer_tag_prefix = "ctag-" - StreamQueueSpecHelpers.publish(queue_name, 1) + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 1) - data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name) - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) - 2000.times do |i| - next if i == 0 - msg_store.update_consumer_offset("#{consumer_tag_prefix}#{i}", i) - end - msg_store.close + data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + 2000.times do |i| + next if i == 0 + msg_store.update_consumer_offset("#{consumer_tag_prefix}#{i}", i) + end + msg_store.close - msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) - msg_store.@consumer_offsets.size.should eq 34_875 + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store.@consumer_offsets.size.should eq 34_875 - 2000.times do |i| - next if i == 0 - msg_store.last_offset_by_consumer_tag("#{consumer_tag_prefix}#{i}").should eq i - end + 2000.times do |i| + next if i == 0 + msg_store.last_offset_by_consumer_tag("#{consumer_tag_prefix}#{i}").should eq i + end - msg_store.close + msg_store.close + end end end end