Skip to content

Commit

Permalink
set capacity of consumer offsets file to 1000*current size when compa…
Browse files Browse the repository at this point in the history
…cting
  • Loading branch information
viktorerlingsson committed Jun 27, 2024
1 parent 173377f commit 834bb33
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 20 deletions.
27 changes: 12 additions & 15 deletions spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -484,27 +484,24 @@ describe LavinMQ::StreamQueue do

it "expands consumer offset file when needed" do
queue_name = Random::Secure.hex
consumer_tag_prefix = "ctag-"
consumer_tag_prefix = Random::Secure.hex(32)
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
2000.times do |i|
next if i == 0
msg_store.store_consumer_offset("#{consumer_tag_prefix}#{i}", i)
one_offset_bytesize = "#{consumer_tag_prefix}#{1000}".bytesize + 1 + 8
offsets = (LavinMQ::Config.instance.segment_size / one_offset_bytesize).to_i32 + 1
bytesize = 0
offsets.times do |i|
consumer_tag = "#{consumer_tag_prefix}#{i + 1000}"
msg_store.store_consumer_offset(consumer_tag, i + 1000)
bytesize += consumer_tag.bytesize + 1 + 8
end
msg_store.close

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.@consumer_offsets.size.should eq 34_875

2000.times do |i|
next if i == 0
msg_store.last_offset_by_consumer_tag("#{consumer_tag_prefix}#{i}").should eq i
msg_store.@consumer_offsets.size.should eq bytesize
msg_store.@consumer_offsets.size.should be > LavinMQ::Config.instance.segment_size
offsets.times do |i|
msg_store.last_offset_by_consumer_tag("#{consumer_tag_prefix}#{i + 1000}").should eq i + 1000
end

msg_store.close
end
end
end
Expand Down
10 changes: 5 additions & 5 deletions src/lavinmq/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ module LavinMQ

def store_consumer_offset(consumer_tag : String, new_offset : Int64)
cleanup_consumer_offsets if consumer_offset_file_full?(consumer_tag)
expand_consumer_offset_file if consumer_offset_file_full?(consumer_tag)
@consumer_offsets.write_bytes AMQ::Protocol::ShortString.new(consumer_tag)
@consumer_offset_positions[consumer_tag] = @consumer_offsets.size
@consumer_offsets.write_bytes new_offset
Expand All @@ -152,23 +151,24 @@ module LavinMQ

offsets_to_save = Hash(String, Int64).new
lowest_offset_in_stream, _seg, _pos = offset_at(@segments.first_key, 4u32)
capacity = 0
@consumer_offset_positions.each do |ctag, _pos|
if offset = last_offset_by_consumer_tag(ctag)
offsets_to_save[ctag] = offset if offset >= lowest_offset_in_stream
capacity += ctag.bytesize + 1 + 8
end
end

@consumer_offset_positions = Hash(String, Int64).new
replace_offsets_file do
replace_offsets_file(capacity * 1000) do
offsets_to_save.each do |ctag, offset|
store_consumer_offset(ctag, offset)
end
end
end

def replace_offsets_file(&)
def replace_offsets_file(capacity : Int, &)
old_consumer_offsets = @consumer_offsets
@consumer_offsets = MFile.new("#{old_consumer_offsets.path}.tmp", Config.instance.segment_size)
@consumer_offsets = MFile.new("#{old_consumer_offsets.path}.tmp", capacity)
yield # fill the new file with correct data in this block
@consumer_offsets.rename(old_consumer_offsets.path)
old_consumer_offsets.close(truncate_to_size: false)
Expand Down

0 comments on commit 834bb33

Please sign in to comment.