Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replication lag improvement #652

Merged
merged 14 commits into from
May 10, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed
- Follower lag is now based on bytes written to action queue instead of socket [#652](https://github.com/cloudamqp/lavinmq/pull/652)

## [1.2.11] - 2024-04-26

### Fixed
Expand Down
59 changes: 56 additions & 3 deletions spec/replication/actions_spec.cr
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require "../spec_helper"

def with_datadir_tempfile(&)
relative_path = Path.new "data.spec"
def with_datadir_tempfile(filename = "data.spec", &)
relative_path = Path.new filename
absolute_path = Path.new(LavinMQ::Config.instance.data_dir).join relative_path
yield relative_path.to_s, absolute_path.to_s
ensure
Expand Down Expand Up @@ -55,6 +55,16 @@ describe LavinMQ::Replication::Action do
end
end
end

describe "#lag_size" do
it "should count filename and filesize" do
with_datadir_tempfile("data") do |_relative, absolute|
File.write absolute, "foo"
action = LavinMQ::Replication::AddAction.new absolute
action.lag_size.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + "foo".bytesize)
end
end
end
end

describe "with @mfile" do
Expand All @@ -72,6 +82,15 @@ describe LavinMQ::Replication::Action do
end
end
end
describe "#lag_size" do
it "should count filename and filesize" do
with_datadir_tempfile("data") do |_relative, absolute|
File.write absolute, "foo"
action = LavinMQ::Replication::AddAction.new absolute
action.lag_size.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + "foo".bytesize)
end
end
end
end
end

Expand All @@ -90,9 +109,17 @@ describe LavinMQ::Replication::Action do
end
end
end
describe "#lag_size" do
it "should count filename and size of Int32" do
with_datadir_tempfile("data") do |_relative, absolute|
action = LavinMQ::Replication::AppendAction.new absolute, 123i32
action.lag_size.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + sizeof(Int32))
end
end
end
end

describe "with Int32" do
describe "with UInt32" do
describe "#send" do
it "writes filename and data to IO" do
with_datadir_tempfile do |relative, absolute|
Expand All @@ -106,6 +133,14 @@ describe LavinMQ::Replication::Action do
end
end
end
describe "#lag_size" do
it "should count filename and size of UInt32" do
with_datadir_tempfile("data") do |_relative, absolute|
action = LavinMQ::Replication::AppendAction.new absolute, 123u32
action.lag_size.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + sizeof(UInt32))
end
end
end
end

describe "with Bytes" do
Expand All @@ -122,6 +157,14 @@ describe LavinMQ::Replication::Action do
end
end
end
describe "#lag_size" do
it "should count filename and size of Bytes" do
with_datadir_tempfile("data") do |_relative, absolute|
action = LavinMQ::Replication::AppendAction.new absolute, "foo".to_slice
action.lag_size.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + "foo".to_slice.bytesize)
end
end
end
end

describe "with FileRange" do
Expand All @@ -141,6 +184,16 @@ describe LavinMQ::Replication::Action do
end
end
end
describe "#lag_size" do
it "should count filename and size of FileRange" do
with_datadir_tempfile("data") do |_relative, absolute|
File.write absolute, "foo bar baz"
range = LavinMQ::Replication::FileRange.new(MFile.new(absolute), 4, 3)
action = LavinMQ::Replication::AppendAction.new absolute, range
action.lag_size.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + range.len)
end
end
end
end
end
end
31 changes: 29 additions & 2 deletions spec/replication/follower_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,21 @@ module FollowerSpec
client_lz4 = Compress::LZ4::Reader.new(client_socket)

spawn { follower.read_acks }
data_size = 0i64

ack_size = 0i64
FollowerSpec.with_datadir_tempfile("file1") do |_rel_path, abs_path|
File.write abs_path, "foo"
follower.add(abs_path)
filename_size = client_lz4.read_bytes(Int32, IO::ByteFormat::LittleEndian)
ack_size += filename_size + sizeof(Int32)
client_lz4.skip filename_size
data_size = client_lz4.read_bytes(Int64, IO::ByteFormat::LittleEndian)
client_lz4.skip data_size
ack_size += data_size + sizeof(Int64)
end
let_sync = Channel({LavinMQ::Replication::Follower, Bool}).new
spawn { follower.close(let_sync) }
spawn { client_socket.write_bytes data_size, IO::ByteFormat::LittleEndian }
spawn { client_socket.write_bytes ack_size, IO::ByteFormat::LittleEndian }

select
when res = let_sync.receive
Expand Down Expand Up @@ -220,4 +223,28 @@ module FollowerSpec
client_socket.try &.close
end
end

describe "#lag" do
it "should count bytes added to action queue" do
follower_socket, _client_socket = FakeSocket.pair
file_index = FakeFileIndex.new
follower = LavinMQ::Replication::Follower.new(follower_socket, file_index)
filename = "#{LavinMQ::Config.instance.data_dir}/file1"
size = follower.append filename, Bytes.new(10)
follower.lag.should eq size
end

it "should subtract acked bytes from lag" do
follower_socket, client_socket = FakeSocket.pair
file_index = FakeFileIndex.new
follower = LavinMQ::Replication::Follower.new(follower_socket, file_index)
filename = "#{LavinMQ::Config.instance.data_dir}/file1"
size = follower.append filename, Bytes.new(10)
size2 = follower.append filename, Bytes.new(20)
# send ack for first message
client_socket.write_bytes size.to_i64, IO::ByteFormat::LittleEndian
follower.read_ack
follower.lag.should eq size2
end
end
end
40 changes: 39 additions & 1 deletion src/lavinmq/replication/actions.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ module LavinMQ
def initialize(@path : String)
end

abstract def lag_size : Int64
abstract def send(socket : IO) : Int64

protected def filename_bytesize : Int32
@path.bytesize.to_i32
end

def filename
@filename ||= @path[Config.instance.data_dir.bytesize + 1..]
end

private def send_filename(socket : IO)
filename = @path[Config.instance.data_dir.bytesize + 1..]
socket.write_bytes filename.bytesize.to_i32, IO::ByteFormat::LittleEndian
socket.write filename.to_slice
end
Expand All @@ -25,6 +33,16 @@ module LavinMQ
def initialize(@path : String, @mfile : MFile? = nil)
end

def lag_size : Int64
if mfile = @mfile
0i64 + sizeof(Int32) + filename.bytesize +
sizeof(Int64) + mfile.size.to_i64
else
0i64 + sizeof(Int32) + filename.bytesize +
sizeof(Int64) + File.size(@path).to_i64
end
end

def send(socket) : Int64
Log.debug { "Add #{@path}" }
send_filename(socket)
Expand All @@ -48,6 +66,19 @@ module LavinMQ
def initialize(@path : String, @obj : Bytes | FileRange | UInt32 | Int32)
end

def lag_size : Int64
datasize = case obj = @obj
in Bytes
obj.bytesize.to_i64
in FileRange
obj.len.to_i64
in UInt32, Int32
4i64
end
0i64 + sizeof(Int32) + filename.bytesize +
sizeof(Int64) + datasize
end

def send(socket) : Int64
send_filename(socket)
len : Int64
Expand All @@ -71,6 +102,13 @@ module LavinMQ
end

struct DeleteAction < Action
def lag_size : Int64
# Maybe it would be ok to not include delete action in lag, because
# the follower should have all info necessary to GC the file during
# startup?
(sizeof(Int32) + filename.bytesize + sizeof(Int64)).to_i64
end

def send(socket) : Int64
Log.debug { "Delete #{@path}" }
send_filename(socket)
Expand Down
3 changes: 2 additions & 1 deletion src/lavinmq/replication/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ module LavinMQ
f.truncate
IO.copy(socket, f, len) == len || raise IO::EOFError.new
end
@socket.write_bytes len.abs, IO::ByteFormat::LittleEndian # ack
ack_value : Int64 = len.abs + sizeof(Int64) + filename_len + sizeof(Int32)
@socket.write_bytes ack_value, IO::ByteFormat::LittleEndian # ack
@socket.flush
end
end
Expand Down
32 changes: 21 additions & 11 deletions src/lavinmq/replication/follower.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
require "../replication"
require "./actions"
require "./file_index"

Expand Down Expand Up @@ -46,17 +45,21 @@ module LavinMQ
def read_acks(socket = @socket) : Nil
spawn action_loop, name: "Follower#action_loop"
loop do
len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian)
@acked_bytes += len
read_ack(socket)
end
rescue IO::Error
end

def read_ack(socket = @socket) : Nil
len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian)
@acked_bytes += len
end

private def action_loop(socket = @lz4)
while action = @actions.receive?
@sent_bytes += action.send(socket)
action.send(socket)
while action2 = @actions.try_receive?
@sent_bytes += action2.send(socket)
action2.send(socket)
end
socket.flush
end
Expand Down Expand Up @@ -129,16 +132,23 @@ module LavinMQ
end
end

def add(path, mfile : MFile? = nil)
@actions.send AddAction.new(path, mfile)
def add(path, mfile : MFile? = nil) : Int64
send_action AddAction.new(path, mfile)
end

def append(path, obj) : Int64
send_action AppendAction.new(path, obj)
end

def append(path, obj)
@actions.send AppendAction.new(path, obj)
def delete(path) : Int64
send_action DeleteAction.new(path)
end

def delete(path)
@actions.send DeleteAction.new(path)
private def send_action(action : Action) : Int64
lag_size = action.lag_size
@sent_bytes += lag_size
@actions.send action
lag_size
end

def close(synced_close : Channel({Follower, Bool})? = nil)
Expand Down
Loading