Skip to content

Commit

Permalink
Use MFiles in follower
Browse files Browse the repository at this point in the history
Close and unmap files that haven't been accessed for a moment
  • Loading branch information
spuun committed Feb 7, 2025
1 parent 4467795 commit 9ee24e6
Showing 1 changed file with 63 additions and 2 deletions.
65 changes: 63 additions & 2 deletions src/lavinmq/clustering/client.cr
Original file line number Diff line number Diff line change
@@ -1,10 +1,62 @@
require "../clustering"
require "../clustering/proxy"
require "../data_dir_lock"
require "../mfile"
require "lz4"

module LavinMQ
module Clustering
class ReplicatedFile
getter last_access = Time.monotonic
getter path : String

@file : MFile

def initialize(@path : String, @capacity : Int64)
# Ensure the file exists
@file = MFile.new(@path, @capacity)
end

def write(data : Bytes)
@last_access = Time.monotonic
resize_to_fit(data.size)
@file.write data
end

def resize_to_fit(bytes_to_fit)
new_capacity = size + bytes_to_fit
if new_capacity > @capacity
@file.resize new_capacity
@capacity = new_capacity
end
end

def size
@file.try &.size || File.size(@path)
end

def rename(new_filename)
@last_access = Time.monotonic
File.rename @path, new_filename
@path = new_filename
end

def delete
@last_access = Time.monotonic
File.delete @path
end

def close
@last_access = Time.monotonic
@file.close
end

private def with_file(&)
@last_access = Time.monotonic
yield (@file ||= MFile.new(@path, @capacity))
end
end

class Client
Log = LavinMQ::Log.for "clustering.client"
@data_dir_lock : DataDirLock
Expand All @@ -19,10 +71,18 @@ module LavinMQ
def initialize(@config : Config, @id : Int32, @password : String, proxy = true)
System.maximize_fd_limit
@data_dir = config.data_dir
@files = Hash(String, File).new do |h, k|
@files = Hash(String, ReplicatedFile).new do |h, k|
path = File.join(@data_dir, k)
Dir.mkdir_p File.dirname(path)
h[k] = File.open(path, "a").tap &.sync = true
h[k] = ReplicatedFile.new(path, Config.instance.segment_size)
h.reject! do |_, f|
if f.last_access < (Time.monotonic - 10.seconds)
f.close
true
end
false
end
h[k]
end
Dir.mkdir_p @data_dir
@data_dir_lock = DataDirLock.new(@data_dir).tap &.acquire
Expand Down Expand Up @@ -213,6 +273,7 @@ module LavinMQ
when .negative? # append bytes to file
Log.debug { "Appending #{len.abs} bytes to #{filename}" }
f = @files[filename]
f.resize_to_fit(len.abs)
IO.copy(lz4, f, len.abs) == len.abs || raise IO::EOFError.new("Full append not received")
when .zero? # file is deleted
Log.debug { "Deleting #{filename}" }
Expand Down

0 comments on commit 9ee24e6

Please sign in to comment.