From 221f2483eb39f879db0b25220400be68960dc043 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Thu, 27 Jun 2024 15:48:11 +0200 Subject: [PATCH] replicate consumer offsets file. remove unused code --- src/lavinmq/queue/stream_queue_message_store.cr | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/lavinmq/queue/stream_queue_message_store.cr b/src/lavinmq/queue/stream_queue_message_store.cr index a9e15fe30b..2f75f5c516 100644 --- a/src/lavinmq/queue/stream_queue_message_store.cr +++ b/src/lavinmq/queue/stream_queue_message_store.cr @@ -16,6 +16,7 @@ module LavinMQ super @last_offset = get_last_offset @consumer_offsets = MFile.new(File.join(@queue_data_dir, "consumer_offsets"), Config.instance.segment_size) + @replicator.try &.register_file @consumer_offsets @consumer_offset_positions = restore_consumer_offset_positions drop_overflow end @@ -133,19 +134,13 @@ module LavinMQ @consumer_offsets.write_bytes AMQ::Protocol::ShortString.new(consumer_tag) @consumer_offset_positions[consumer_tag] = @consumer_offsets.size @consumer_offsets.write_bytes new_offset - # replicate + @replicator.try &.append(@consumer_offsets.path, (@consumer_offsets.size - consumer_tag.bytesize - 1 - 8).to_i32) end def consumer_offset_file_full?(consumer_tag) (@consumer_offsets.size + 1 + consumer_tag.bytesize + 8) >= @consumer_offsets.capacity end - def expand_consumer_offset_file - pos = @consumer_offsets.size - @consumer_offsets = MFile.new(@consumer_offsets.path, @consumer_offsets.capacity + Config.instance.segment_size) - @consumer_offsets.resize(pos) - end - def cleanup_consumer_offsets return if @consumer_offsets.size.zero? @@ -172,6 +167,7 @@ module LavinMQ yield # fill the new file with correct data in this block @consumer_offsets.rename(old_consumer_offsets.path) old_consumer_offsets.close(truncate_to_size: false) + @replicator.try &.replace_file @consumer_offsets.path end def shift?(consumer : Client::Channel::StreamConsumer) : Envelope?