Skip to content

Commit

Permalink
remove methods, use instance variables directly. add cleanup function
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorerlingsson committed Apr 23, 2024
1 parent ce1d73b commit 13bd31e
Showing 1 changed file with 42 additions and 24 deletions.
66 changes: 42 additions & 24 deletions src/lavinmq/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ module LavinMQ
property max_age : Time::Span | Time::MonthSpan | Nil
getter last_offset : Int64
@segment_last_ts = Hash(UInt32, Int64).new(0i64) # used for max-age
@consumer_offset_positions : Hash(String, Int64)? # used for consumer offsets
@consumer_offsets : MFile?
@consumer_offset_positions = Hash(String, Int64).new # used for consumer offsets
@consumer_offsets : MFile

def initialize(@data_dir : String, @replicator : Replication::Replicator?)
super
@last_offset = get_last_offset
path = File.join(@data_dir, "consumer_offsets")
@consumer_offsets = MFile.new(path, 5000) # TODO: size?
@consumer_offset_positions = consumer_offset_positions
drop_overflow
end

Expand Down Expand Up @@ -95,24 +98,16 @@ module LavinMQ
end

def last_offset_by_consumer_tag(consumer_tag)
if consumer_offset_positions[consumer_tag]
pos = consumer_offset_positions[consumer_tag]
tx = consumer_offsets.to_slice(pos, 8)
if pos = @consumer_offset_positions[consumer_tag]
tx = @consumer_offsets.to_slice(pos, 8)
return IO::ByteFormat::SystemEndian.decode(Int64, tx)
end
rescue KeyError
end

def consumer_offsets : MFile
return @consumer_offsets.not_nil! if @consumer_offsets && !@consumer_offsets.not_nil!.closed?
path = File.join(@data_dir, "consumer_offsets")
@consumer_offsets = MFile.new(path, 5000) # TODO: size?
end

private def consumer_offset_positions
return @consumer_offset_positions.not_nil! if @consumer_offset_positions
positions = Hash(String, Int64).new
slice = consumer_offsets.to_slice
slice = @consumer_offsets.to_slice
return positions if slice.size.zero?
pos = 0

Expand All @@ -126,17 +121,17 @@ module LavinMQ
pos += 8
break if pos >= slice.size
end
consumer_offsets.resize(pos) # resize mfile to remove any empty bytes
@consumer_offset_positions = positions
@consumer_offsets.resize(pos) # resize mfile to remove any empty bytes
positions
end

def save_offset_by_consumer_tag(consumer_tag, new_offset)
pos = 0_i64
begin
if pos = consumer_offset_positions[consumer_tag]
if pos = @consumer_offset_positions[consumer_tag]
buf = uninitialized UInt8[8]
IO::ByteFormat::LittleEndian.encode(new_offset.as(Int64), buf.to_slice)
consumer_offsets.write_at(pos, buf.to_slice)
@consumer_offsets.write_at(pos, buf.to_slice)
end
rescue KeyError
write_new_ctag_to_file(consumer_tag, new_offset)
Expand All @@ -147,35 +142,58 @@ module LavinMQ
def write_new_ctag_to_file(consumer_tag, new_offset)
slice = consumer_tag.to_slice
consumer_tag_length = slice.size.to_u8
pos = consumer_offsets.size + slice.size + 1
pos = @consumer_offsets.size + slice.size + 1

length_buffer = uninitialized UInt8[1]
IO::ByteFormat::LittleEndian.encode(consumer_tag_length, length_buffer.to_slice)

offset_buffer = uninitialized UInt8[8]
IO::ByteFormat::LittleEndian.encode(new_offset.as(Int64), offset_buffer.to_slice)

consumer_offsets.write(length_buffer.to_slice + slice + offset_buffer.to_slice)
consumer_offset_positions[consumer_tag] = pos
@consumer_offsets.write(length_buffer.to_slice + slice + offset_buffer.to_slice)
@consumer_offset_positions[consumer_tag] = pos
end

def cleanup_consumer_offsets
offsets_to_save = Hash(String, Int64).new
lowest_offset_in_stream, _seg, _pos = offset_at(@segments.first_key, 4u32) # handle
@consumer_offset_positions.each do |ctag, pos|
offset = last_offset_by_consumer_tag(ctag).not_nil!
next if offset < lowest_offset_in_stream
# Other scenarios to remove?
offsets_to_save[ctag] = offset
end

delete_and_reopen_offsets_file
@consumer_offset_positions = Hash(String, Int64).new
offsets_to_save.each do |ctag, offset|
write_new_ctag_to_file(ctag, offset)
end
end

def remove_consumer_tag_from_file(consumer_tag)
@consumer_offset_positions = consumer_offset_positions.reject! { |k, _v| k == consumer_tag }
@consumer_offset_positions = @consumer_offset_positions.reject! { |k, _v| k == consumer_tag }

offsets_to_save = Hash(String, Int64).new
consumer_offset_positions.each do |ctag, _p|
@consumer_offset_positions.each do |ctag, _p|
offset = last_offset_by_consumer_tag(ctag)
next unless offset
offsets_to_save[ctag] = offset
end

consumer_offsets.close
consumer_offsets.delete
delete_and_reopen_offsets_file
offsets_to_save.each do |ctag, offset|
write_new_ctag_to_file(ctag, offset)
end
end

def delete_and_reopen_offsets_file
@consumer_offsets.close
@consumer_offsets.delete
path = File.join(@data_dir, "consumer_offsets")
@consumer_offsets = MFile.new(path, 5000) # TODO: size?
end

def shift?(consumer : Client::Channel::StreamConsumer) : Envelope?
raise ClosedError.new if @closed

Expand Down

0 comments on commit 13bd31e

Please sign in to comment.