From 2c8cc69b27cf6ab648c42428df76c2548cc8f9a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Mon, 11 Mar 2024 10:15:45 +0100 Subject: [PATCH 01/14] Count bytes added to action queue as sent --- src/lavinmq/replication/actions.cr | 40 ++++++++++++++++++++++++++++- src/lavinmq/replication/follower.cr | 12 ++++++--- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/lavinmq/replication/actions.cr b/src/lavinmq/replication/actions.cr index f2897d3cc8..ca126d8445 100644 --- a/src/lavinmq/replication/actions.cr +++ b/src/lavinmq/replication/actions.cr @@ -12,10 +12,18 @@ module LavinMQ def initialize(@path : String) end + abstract def bytesize : Int32 abstract def send(socket : IO) : Int64 + protected def filename_bytesize : Int32 + @path.bytesize.to_i32 + end + + def filename + @filename ||= @path[Config.instance.data_dir.bytesize + 1..] + end + private def send_filename(socket : IO) - filename = @path[Config.instance.data_dir.bytesize + 1..] socket.write_bytes filename.bytesize.to_i32, IO::ByteFormat::LittleEndian socket.write filename.to_slice end @@ -25,6 +33,16 @@ module LavinMQ def initialize(@path : String, @mfile : MFile? = nil) end + def bytesize : Int32 + if mfile = @mfile + sizeof(Int32) + filename.bytesize + + sizeof(Int64) + mfile.size.to_i64 + else + sizeof(Int32) + filename.bytesize + + sizeof(Int64) + File.size(@path).to_i64 + end + end + def send(socket) : Int64 Log.debug { "Add #{@path}" } send_filename(socket) @@ -48,6 +66,19 @@ module LavinMQ def initialize(@path : String, @obj : Bytes | FileRange | UInt32 | Int32) end + def bytesize : Int32 + datasize = case obj = @obj + in Bytes + obj.bytesize.to_i64 + in FileRange + obj.len.to_i64 + in UInt32, Int32 + 4i64 + end + sizeof(Int32) + filename.bytesize + + sizeof(Int64) + datasize + end + def send(socket) : Int64 send_filename(socket) len : Int64 @@ -71,6 +102,13 @@ module LavinMQ end struct DeleteAction < Action + def bytesize : Int32 + # Maybe it would be ok to not include delete action in lag, because + # the follower should have all info necessary to GC the file during + # startup? + sizeof(Int32) + filename.bytesize + sizeof(Int64) + end + def send(socket) : Int64 Log.debug { "Delete #{@path}" } send_filename(socket) diff --git a/src/lavinmq/replication/follower.cr b/src/lavinmq/replication/follower.cr index 3244557a79..f20d450957 100644 --- a/src/lavinmq/replication/follower.cr +++ b/src/lavinmq/replication/follower.cr @@ -130,15 +130,21 @@ module LavinMQ end def add(path, mfile : MFile? = nil) - @actions.send AddAction.new(path, mfile) + action = AddAction.new(path, mfile) + @sent_bytes += action.bytesize + @actions.send action end def append(path, obj) - @actions.send AppendAction.new(path, obj) + action = AppendAction.new(path, obj) + @sent_bytes += action.bytesize + @actions.send action end def delete(path) - @actions.send DeleteAction.new(path) + action = DeleteAction.new(path) + @sent_bytes += action.bytesize + @actions.send action end def close(synced_close : Channel({Follower, Bool})? = nil) From a65d7302427d77985af4873bcec02fb3ee88fd8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 28 Mar 2024 14:20:06 +0100 Subject: [PATCH 02/14] Ack all bytes not only data --- src/lavinmq/replication/client.cr | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lavinmq/replication/client.cr b/src/lavinmq/replication/client.cr index 8021e047af..55185abb16 100644 --- a/src/lavinmq/replication/client.cr +++ b/src/lavinmq/replication/client.cr @@ -195,7 +195,8 @@ module LavinMQ f.truncate IO.copy(socket, f, len) == len || raise IO::EOFError.new end - @socket.write_bytes len.abs, IO::ByteFormat::LittleEndian # ack + ack_value = sizeof(Int32) + filename_len + sizeof(Int64) + len.abs + @socket.write_bytes ack_value, IO::ByteFormat::LittleEndian # ack @socket.flush end end From cd00296000a46768afe21f6b2651985a0e81fa2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Wed, 3 Apr 2024 11:10:35 +0200 Subject: [PATCH 03/14] Count bytes in one place --- src/lavinmq/replication/follower.cr | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/lavinmq/replication/follower.cr b/src/lavinmq/replication/follower.cr index f20d450957..733b0d238e 100644 --- a/src/lavinmq/replication/follower.cr +++ b/src/lavinmq/replication/follower.cr @@ -130,19 +130,18 @@ module LavinMQ end def add(path, mfile : MFile? = nil) - action = AddAction.new(path, mfile) - @sent_bytes += action.bytesize - @actions.send action + send_action AddAction.new(path, mfile) end def append(path, obj) - action = AppendAction.new(path, obj) - @sent_bytes += action.bytesize - @actions.send action + send_action AppendAction.new(path, obj) end def delete(path) - action = DeleteAction.new(path) + send_action DeleteAction.new(path) + end + + private def send_action(action : Action) : Nil @sent_bytes += action.bytesize @actions.send action end From d416d1d620c2fc194b997c317758f6ea29f91c51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Wed, 3 Apr 2024 15:23:14 +0200 Subject: [PATCH 04/14] Make it possible to read single acks (for specs) --- src/lavinmq/replication/follower.cr | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/lavinmq/replication/follower.cr b/src/lavinmq/replication/follower.cr index 733b0d238e..bbc30eb3fa 100644 --- a/src/lavinmq/replication/follower.cr +++ b/src/lavinmq/replication/follower.cr @@ -46,12 +46,16 @@ module LavinMQ def read_acks(socket = @socket) : Nil spawn action_loop, name: "Follower#action_loop" loop do - len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian) - @acked_bytes += len + read_ack(socket) end rescue IO::Error end + def read_ack(socket = @socket) : Nil + len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian) + @acked_bytes += len + end + private def action_loop(socket = @lz4) while action = @actions.receive? @sent_bytes += action.send(socket) From 3a499fc474661390729191a2b884bf59067aefff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Wed, 3 Apr 2024 15:23:52 +0200 Subject: [PATCH 05/14] Return the size of the sent action --- src/lavinmq/replication/follower.cr | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/lavinmq/replication/follower.cr b/src/lavinmq/replication/follower.cr index bbc30eb3fa..714660ea7f 100644 --- a/src/lavinmq/replication/follower.cr +++ b/src/lavinmq/replication/follower.cr @@ -133,21 +133,23 @@ module LavinMQ end end - def add(path, mfile : MFile? = nil) + def add(path, mfile : MFile? = nil) : Int32 send_action AddAction.new(path, mfile) end - def append(path, obj) + def append(path, obj) : Int32 send_action AppendAction.new(path, obj) end - def delete(path) + def delete(path) : Int32 send_action DeleteAction.new(path) end - private def send_action(action : Action) : Nil - @sent_bytes += action.bytesize + private def send_action(action : Action) : Int32 + action_size = action.bytesize + @sent_bytes += action_size @actions.send action + action_size end def close(synced_close : Channel({Follower, Bool})? = nil) From 46c748d463239c1636bcd789d3ad07894f756660 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 4 Apr 2024 07:57:08 +0200 Subject: [PATCH 06/14] Make Action#bytesize Int64 --- src/lavinmq/replication/actions.cr | 16 ++++++++-------- src/lavinmq/replication/follower.cr | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/lavinmq/replication/actions.cr b/src/lavinmq/replication/actions.cr index ca126d8445..9b94c937e5 100644 --- a/src/lavinmq/replication/actions.cr +++ b/src/lavinmq/replication/actions.cr @@ -12,7 +12,7 @@ module LavinMQ def initialize(@path : String) end - abstract def bytesize : Int32 + abstract def bytesize : Int64 abstract def send(socket : IO) : Int64 protected def filename_bytesize : Int32 @@ -33,12 +33,12 @@ module LavinMQ def initialize(@path : String, @mfile : MFile? = nil) end - def bytesize : Int32 + def bytesize : Int64 if mfile = @mfile - sizeof(Int32) + filename.bytesize + + 0i64 + sizeof(Int32) + filename.bytesize + sizeof(Int64) + mfile.size.to_i64 else - sizeof(Int32) + filename.bytesize + + 0i64 + sizeof(Int32) + filename.bytesize + sizeof(Int64) + File.size(@path).to_i64 end end @@ -66,7 +66,7 @@ module LavinMQ def initialize(@path : String, @obj : Bytes | FileRange | UInt32 | Int32) end - def bytesize : Int32 + def bytesize : Int64 datasize = case obj = @obj in Bytes obj.bytesize.to_i64 @@ -75,7 +75,7 @@ module LavinMQ in UInt32, Int32 4i64 end - sizeof(Int32) + filename.bytesize + + 0i64 + sizeof(Int32) + filename.bytesize + sizeof(Int64) + datasize end @@ -102,11 +102,11 @@ module LavinMQ end struct DeleteAction < Action - def bytesize : Int32 + def bytesize : Int64 # Maybe it would be ok to not include delete action in lag, because # the follower should have all info necessary to GC the file during # startup? - sizeof(Int32) + filename.bytesize + sizeof(Int64) + (sizeof(Int32) + filename.bytesize + sizeof(Int64)).to_i64 end def send(socket) : Int64 diff --git a/src/lavinmq/replication/follower.cr b/src/lavinmq/replication/follower.cr index 714660ea7f..6696c4caf7 100644 --- a/src/lavinmq/replication/follower.cr +++ b/src/lavinmq/replication/follower.cr @@ -133,19 +133,19 @@ module LavinMQ end end - def add(path, mfile : MFile? = nil) : Int32 + def add(path, mfile : MFile? = nil) : Int64 send_action AddAction.new(path, mfile) end - def append(path, obj) : Int32 + def append(path, obj) : Int64 send_action AppendAction.new(path, obj) end - def delete(path) : Int32 + def delete(path) : Int64 send_action DeleteAction.new(path) end - private def send_action(action : Action) : Int32 + private def send_action(action : Action) : Int64 action_size = action.bytesize @sent_bytes += action_size @actions.send action From 4c4c54d2bbf4601c8b2ad08231028ba7f02d7acf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 4 Apr 2024 08:08:06 +0200 Subject: [PATCH 07/14] Add specs for Action#bytesize --- spec/replication/actions_spec.cr | 59 ++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 3 deletions(-) diff --git a/spec/replication/actions_spec.cr b/spec/replication/actions_spec.cr index d6eaa99ff9..fa5b26fdfe 100644 --- a/spec/replication/actions_spec.cr +++ b/spec/replication/actions_spec.cr @@ -1,7 +1,7 @@ require "../spec_helper" -def with_datadir_tempfile(&) - relative_path = Path.new "data.spec" +def with_datadir_tempfile(filename = "data.spec", &) + relative_path = Path.new filename absolute_path = Path.new(LavinMQ::Config.instance.data_dir).join relative_path yield relative_path.to_s, absolute_path.to_s ensure @@ -55,6 +55,16 @@ describe LavinMQ::Replication::Action do end end end + + describe "#bytesize" do + it "should count filename and filesize" do + with_datadir_tempfile("data") do |_relative, absolute| + File.write absolute, "foo" + action = LavinMQ::Replication::AddAction.new absolute + action.bytesize.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + "foo".bytesize) + end + end + end end describe "with @mfile" do @@ -72,6 +82,15 @@ describe LavinMQ::Replication::Action do end end end + describe "#bytesize" do + it "should count filename and filesize" do + with_datadir_tempfile("data") do |_relative, absolute| + File.write absolute, "foo" + action = LavinMQ::Replication::AddAction.new absolute + action.bytesize.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + "foo".bytesize) + end + end + end end end @@ -90,9 +109,17 @@ describe LavinMQ::Replication::Action do end end end + describe "#bytesize" do + it "should count filename and size of Int32" do + with_datadir_tempfile("data") do |_relative, absolute| + action = LavinMQ::Replication::AppendAction.new absolute, 123i32 + action.bytesize.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + sizeof(Int32)) + end + end + end end - describe "with Int32" do + describe "with UInt32" do describe "#send" do it "writes filename and data to IO" do with_datadir_tempfile do |relative, absolute| @@ -106,6 +133,14 @@ describe LavinMQ::Replication::Action do end end end + describe "#bytesize" do + it "should count filename and size of UInt32" do + with_datadir_tempfile("data") do |_relative, absolute| + action = LavinMQ::Replication::AppendAction.new absolute, 123u32 + action.bytesize.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + sizeof(UInt32)) + end + end + end end describe "with Bytes" do @@ -122,6 +157,14 @@ describe LavinMQ::Replication::Action do end end end + describe "#bytesize" do + it "should count filename and size of Bytes" do + with_datadir_tempfile("data") do |_relative, absolute| + action = LavinMQ::Replication::AppendAction.new absolute, "foo".to_slice + action.bytesize.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + "foo".to_slice.bytesize) + end + end + end end describe "with FileRange" do @@ -141,6 +184,16 @@ describe LavinMQ::Replication::Action do end end end + describe "#bytesize" do + it "should count filename and size of FileRange" do + with_datadir_tempfile("data") do |_relative, absolute| + File.write absolute, "foo bar baz" + range = LavinMQ::Replication::FileRange.new(MFile.new(absolute), 4, 3) + action = LavinMQ::Replication::AppendAction.new absolute, range + action.bytesize.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + range.len) + end + end + end end end end From 34e1d81ba42746924abe7996e81a8fab4ee332a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 4 Apr 2024 08:33:25 +0200 Subject: [PATCH 08/14] Add specs for Follower#lag --- spec/replication/follower_spec.cr | 25 +++++++++++++++++++++++++ src/lavinmq/replication/follower.cr | 1 - 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/spec/replication/follower_spec.cr b/spec/replication/follower_spec.cr index 6147a88ead..824fa0dd6d 100644 --- a/spec/replication/follower_spec.cr +++ b/spec/replication/follower_spec.cr @@ -163,6 +163,7 @@ module FollowerSpec client_lz4 = Compress::LZ4::Reader.new(client_socket) spawn { follower.read_acks } + data_size = 0i64 FollowerSpec.with_datadir_tempfile("file1") do |_rel_path, abs_path| File.write abs_path, "foo" @@ -220,4 +221,28 @@ module FollowerSpec client_socket.try &.close end end + + describe "#lag" do + it "should count bytes added to action queue" do + follower_socket, _client_socket = FakeSocket.pair + file_index = FakeFileIndex.new + follower = LavinMQ::Replication::Follower.new(follower_socket, file_index) + filename = "#{LavinMQ::Config.instance.data_dir}/file1" + size = follower.append filename, Bytes.new(10) + follower.lag.should eq size + end + + it "should subtract acked bytes from lag" do + follower_socket, client_socket = FakeSocket.pair + file_index = FakeFileIndex.new + follower = LavinMQ::Replication::Follower.new(follower_socket, file_index) + filename = "#{LavinMQ::Config.instance.data_dir}/file1" + size = follower.append filename, Bytes.new(10) + size2 = follower.append filename, Bytes.new(20) + # send ack for first message + client_socket.write_bytes size.to_i64, IO::ByteFormat::LittleEndian + follower.read_ack + follower.lag.should eq size2 + end + end end diff --git a/src/lavinmq/replication/follower.cr b/src/lavinmq/replication/follower.cr index 6696c4caf7..88508074f8 100644 --- a/src/lavinmq/replication/follower.cr +++ b/src/lavinmq/replication/follower.cr @@ -1,4 +1,3 @@ -require "../replication" require "./actions" require "./file_index" From da8cfa694814a74f8d3e10b236830880cadbe866 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Tue, 9 Apr 2024 08:54:31 +0200 Subject: [PATCH 09/14] Rename Action#bytesize to #lag_size It may not always represent the entire Action size sent over the socket. --- spec/replication/actions_spec.cr | 24 ++++++++++++------------ src/lavinmq/replication/actions.cr | 8 ++++---- src/lavinmq/replication/follower.cr | 6 +++--- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/spec/replication/actions_spec.cr b/spec/replication/actions_spec.cr index fa5b26fdfe..b975260e0f 100644 --- a/spec/replication/actions_spec.cr +++ b/spec/replication/actions_spec.cr @@ -56,12 +56,12 @@ describe LavinMQ::Replication::Action do end end - describe "#bytesize" do + describe "#lag_size" do it "should count filename and filesize" do with_datadir_tempfile("data") do |_relative, absolute| File.write absolute, "foo" action = LavinMQ::Replication::AddAction.new absolute - action.bytesize.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + "foo".bytesize) + action.lag_size.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + "foo".bytesize) end end end @@ -82,12 +82,12 @@ describe LavinMQ::Replication::Action do end end end - describe "#bytesize" do + describe "#lag_size" do it "should count filename and filesize" do with_datadir_tempfile("data") do |_relative, absolute| File.write absolute, "foo" action = LavinMQ::Replication::AddAction.new absolute - action.bytesize.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + "foo".bytesize) + action.lag_size.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + "foo".bytesize) end end end @@ -109,11 +109,11 @@ describe LavinMQ::Replication::Action do end end end - describe "#bytesize" do + describe "#lag_size" do it "should count filename and size of Int32" do with_datadir_tempfile("data") do |_relative, absolute| action = LavinMQ::Replication::AppendAction.new absolute, 123i32 - action.bytesize.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + sizeof(Int32)) + action.lag_size.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + sizeof(Int32)) end end end @@ -133,11 +133,11 @@ describe LavinMQ::Replication::Action do end end end - describe "#bytesize" do + describe "#lag_size" do it "should count filename and size of UInt32" do with_datadir_tempfile("data") do |_relative, absolute| action = LavinMQ::Replication::AppendAction.new absolute, 123u32 - action.bytesize.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + sizeof(UInt32)) + action.lag_size.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + sizeof(UInt32)) end end end @@ -157,11 +157,11 @@ describe LavinMQ::Replication::Action do end end end - describe "#bytesize" do + describe "#lag_size" do it "should count filename and size of Bytes" do with_datadir_tempfile("data") do |_relative, absolute| action = LavinMQ::Replication::AppendAction.new absolute, "foo".to_slice - action.bytesize.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + "foo".to_slice.bytesize) + action.lag_size.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + "foo".to_slice.bytesize) end end end @@ -184,13 +184,13 @@ describe LavinMQ::Replication::Action do end end end - describe "#bytesize" do + describe "#lag_size" do it "should count filename and size of FileRange" do with_datadir_tempfile("data") do |_relative, absolute| File.write absolute, "foo bar baz" range = LavinMQ::Replication::FileRange.new(MFile.new(absolute), 4, 3) action = LavinMQ::Replication::AppendAction.new absolute, range - action.bytesize.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + range.len) + action.lag_size.should eq(sizeof(Int32) + "data".bytesize + sizeof(Int64) + range.len) end end end diff --git a/src/lavinmq/replication/actions.cr b/src/lavinmq/replication/actions.cr index 9b94c937e5..8ccb180659 100644 --- a/src/lavinmq/replication/actions.cr +++ b/src/lavinmq/replication/actions.cr @@ -12,7 +12,7 @@ module LavinMQ def initialize(@path : String) end - abstract def bytesize : Int64 + abstract def lag_size : Int64 abstract def send(socket : IO) : Int64 protected def filename_bytesize : Int32 @@ -33,7 +33,7 @@ module LavinMQ def initialize(@path : String, @mfile : MFile? = nil) end - def bytesize : Int64 + def lag_size : Int64 if mfile = @mfile 0i64 + sizeof(Int32) + filename.bytesize + sizeof(Int64) + mfile.size.to_i64 @@ -66,7 +66,7 @@ module LavinMQ def initialize(@path : String, @obj : Bytes | FileRange | UInt32 | Int32) end - def bytesize : Int64 + def lag_size : Int64 datasize = case obj = @obj in Bytes obj.bytesize.to_i64 @@ -102,7 +102,7 @@ module LavinMQ end struct DeleteAction < Action - def bytesize : Int64 + def lag_size : Int64 # Maybe it would be ok to not include delete action in lag, because # the follower should have all info necessary to GC the file during # startup? diff --git a/src/lavinmq/replication/follower.cr b/src/lavinmq/replication/follower.cr index 88508074f8..d9316a91fe 100644 --- a/src/lavinmq/replication/follower.cr +++ b/src/lavinmq/replication/follower.cr @@ -145,10 +145,10 @@ module LavinMQ end private def send_action(action : Action) : Int64 - action_size = action.bytesize - @sent_bytes += action_size + lag_size = action.lag_size + @sent_bytes += lag_size @actions.send action - action_size + lag_size end def close(synced_close : Channel({Follower, Bool})? = nil) From 44510968ccde034a486fcc4279e119e313d805a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Fri, 10 May 2024 09:31:28 +0200 Subject: [PATCH 10/14] Only count bytes when actions are added to queue --- src/lavinmq/replication/follower.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lavinmq/replication/follower.cr b/src/lavinmq/replication/follower.cr index d9316a91fe..0c35da4fd6 100644 --- a/src/lavinmq/replication/follower.cr +++ b/src/lavinmq/replication/follower.cr @@ -57,9 +57,9 @@ module LavinMQ private def action_loop(socket = @lz4) while action = @actions.receive? - @sent_bytes += action.send(socket) + action.send(socket) while action2 = @actions.try_receive? - @sent_bytes += action2.send(socket) + action2.send(socket) end socket.flush end From 202eca8211a5f7b1776d1317b93036e279f04be8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Fri, 10 May 2024 09:44:12 +0200 Subject: [PATCH 11/14] Type ack value as int64 --- src/lavinmq/replication/client.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lavinmq/replication/client.cr b/src/lavinmq/replication/client.cr index 55185abb16..10e53b96c2 100644 --- a/src/lavinmq/replication/client.cr +++ b/src/lavinmq/replication/client.cr @@ -195,7 +195,7 @@ module LavinMQ f.truncate IO.copy(socket, f, len) == len || raise IO::EOFError.new end - ack_value = sizeof(Int32) + filename_len + sizeof(Int64) + len.abs + ack_value : Int64 = sizeof(Int32) + filename_len + sizeof(Int64) + len.abs @socket.write_bytes ack_value, IO::ByteFormat::LittleEndian # ack @socket.flush end From 8f30d5efbd4e2d74dcd93ee6344326a043b41e45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Fri, 10 May 2024 09:44:36 +0200 Subject: [PATCH 12/14] All data is counted in lag --- spec/replication/follower_spec.cr | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/spec/replication/follower_spec.cr b/spec/replication/follower_spec.cr index 824fa0dd6d..6d2a3a7ca1 100644 --- a/spec/replication/follower_spec.cr +++ b/spec/replication/follower_spec.cr @@ -164,18 +164,20 @@ module FollowerSpec spawn { follower.read_acks } - data_size = 0i64 + ack_size = 0i64 FollowerSpec.with_datadir_tempfile("file1") do |_rel_path, abs_path| File.write abs_path, "foo" follower.add(abs_path) filename_size = client_lz4.read_bytes(Int32, IO::ByteFormat::LittleEndian) + ack_size += filename_size + sizeof(Int32) client_lz4.skip filename_size data_size = client_lz4.read_bytes(Int64, IO::ByteFormat::LittleEndian) client_lz4.skip data_size + ack_size += data_size + sizeof(Int64) end let_sync = Channel({LavinMQ::Replication::Follower, Bool}).new spawn { follower.close(let_sync) } - spawn { client_socket.write_bytes data_size, IO::ByteFormat::LittleEndian } + spawn { client_socket.write_bytes ack_size, IO::ByteFormat::LittleEndian } select when res = let_sync.receive From 805eba3b86d533175de817019e01e3d633d07c67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Fri, 10 May 2024 10:06:38 +0200 Subject: [PATCH 13/14] fixup! Type ack value as int64 --- src/lavinmq/replication/client.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lavinmq/replication/client.cr b/src/lavinmq/replication/client.cr index 10e53b96c2..67881cba70 100644 --- a/src/lavinmq/replication/client.cr +++ b/src/lavinmq/replication/client.cr @@ -195,7 +195,7 @@ module LavinMQ f.truncate IO.copy(socket, f, len) == len || raise IO::EOFError.new end - ack_value : Int64 = sizeof(Int32) + filename_len + sizeof(Int64) + len.abs + ack_value : Int64 = len.abs + sizeof(Int64) + filename_len + sizeof(Int32) @socket.write_bytes ack_value, IO::ByteFormat::LittleEndian # ack @socket.flush end From 7d5c75296051a2a4ff3a3b47b769b484a6164f92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Fri, 10 May 2024 10:26:03 +0200 Subject: [PATCH 14/14] Update changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f881e32cfc..bc8f2617e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed +- Follower lag is now based on bytes written to action queue instead of socket [#652](https://github.com/cloudamqp/lavinmq/pull/652) + ## [1.2.11] - 2024-04-26 ### Fixed