From 2ecc8d33eb47df550b3a7f6efdf368d9661ab597 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Tue, 23 Apr 2024 12:02:31 +0200 Subject: [PATCH] add spec for cleanup --- spec/stream_queue_spec.cr | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/spec/stream_queue_spec.cr b/spec/stream_queue_spec.cr index c14671daf2..b2ee65afea 100644 --- a/spec/stream_queue_spec.cr +++ b/spec/stream_queue_spec.cr @@ -258,7 +258,7 @@ describe LavinMQ::StreamQueue do msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last - msg_store.consumer_offsets.size.should eq 15 + msg_store.@consumer_offsets.size.should eq 15 msg_store.close end @@ -314,6 +314,30 @@ describe LavinMQ::StreamQueue do msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil + msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0] + msg_store.close + end + + it "cleanup_consumer_offsets removes outdated offset" do + queue_name = Random::Secure.hex + vhost = Server.vhosts["/"] + offsets = [84_i64, -10_i64] + tag_prefix = "ctag-" + StreamQueueSpecHelpers.publish(queue_name, 1) + + data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name) + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + offsets.each_with_index do |offset, i| + msg_store.save_offset_by_consumer_tag(tag_prefix + i.to_s, offset) + end + sleep 0.1 + msg_store.cleanup_consumer_offsets + msg_store.close + sleep 0.1 + + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil + msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0] msg_store.close end end