From 3ec181625a602161cff80fff4a5c33aaefd52439 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Wed, 25 Dec 2024 00:50:29 +0100 Subject: [PATCH 1/3] Use CRC32 for the replication protocol It's about 7 times faster than SHA1, and has similar or better characteristics for identifying file changes. --- src/lavinmq/clustering.cr | 3 ++- src/lavinmq/clustering/client.cr | 12 ++++++------ src/lavinmq/clustering/server.cr | 14 +++++++------- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/lavinmq/clustering.cr b/src/lavinmq/clustering.cr index 5d038246d3..7fac75f535 100644 --- a/src/lavinmq/clustering.cr +++ b/src/lavinmq/clustering.cr @@ -4,7 +4,8 @@ require "digest/sha1" module LavinMQ module Clustering - Start = Bytes['R'.ord, 'E'.ord, 'P'.ord, 'L'.ord, 'I'.ord, 1, 0, 0] + Start100 = Bytes['R'.ord, 'E'.ord, 'P'.ord, 'L'.ord, 'I'.ord, 1, 0, 0] + Start = Bytes['R'.ord, 'E'.ord, 'P'.ord, 'L'.ord, 'I'.ord, 1, 0, 1] class Error < Exception; end diff --git a/src/lavinmq/clustering/client.cr b/src/lavinmq/clustering/client.cr index 6e5e88c092..f76017fc62 100644 --- a/src/lavinmq/clustering/client.cr +++ b/src/lavinmq/clustering/client.cr @@ -125,9 +125,9 @@ module LavinMQ private def sync_files(socket, lz4) Log.info { "Waiting for list of files" } - sha1 = Digest::SHA1.new - remote_hash = Bytes.new(sha1.digest_size) - local_hash = Bytes.new(sha1.digest_size) + crc32 = Digest::CRC32.new + remote_hash = Bytes.new(crc32.digest_size) + local_hash = Bytes.new(crc32.digest_size) files_to_delete = ls_r(@data_dir) missing_files = Array(String).new loop do @@ -139,9 +139,9 @@ module LavinMQ path = File.join(@data_dir, filename) files_to_delete.delete(path) if File.exists? path - sha1.file(path) - sha1.final(local_hash) - sha1.reset + crc32.file(path) + crc32.final(local_hash) + crc32.reset if local_hash != remote_hash Log.info { "Mismatching hash: #{path}" } move_to_backup path diff --git a/src/lavinmq/clustering/server.cr b/src/lavinmq/clustering/server.cr index 7834b28cfe..f97a2cc3fe 100644 --- a/src/lavinmq/clustering/server.cr +++ b/src/lavinmq/clustering/server.cr @@ -14,7 +14,7 @@ module LavinMQ # When a follower connects: # It sends a static header (wrong header disconnects the client) # It sends its password (servers closes the connection if the password is wrong) - # Server sends a list of files in its data directory and the sha1 hash of those files + # Server sends a list of files in its data directory and the crc32 hash of those files # Client requests files that is missing or has mismatching checksums of # In the meantime the server queues up changes (all publishes/consumes are paused) # When client doesn't request more files starts to stream changes @@ -72,12 +72,12 @@ module LavinMQ end def files_with_hash(& : Tuple(String, Bytes) -> Nil) - sha1 = Digest::SHA1.new - hash = Bytes.new(sha1.digest_size) - @files.each_key do |path| - sha1.file path - sha1.final hash - sha1.reset + crc32 = Digest::CRC32.new + hash = Bytes.new(crc32.digest_size) + @files.each do |path, mfile| + crc32.file path + crc32.final hash + crc32.reset yield({path, hash}) end end From 04281c027b4788783533d3bc081433766e32ff12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Fri, 3 Jan 2025 12:00:05 +0100 Subject: [PATCH 2/3] Backwards compabilit with replication protocl version 1.0.0 --- spec/clustering/follower_spec.cr | 22 ++++++------- src/lavinmq/clustering/client.cr | 47 ++++++++++++++++++++-------- src/lavinmq/clustering/file_index.cr | 2 +- src/lavinmq/clustering/follower.cr | 18 ++++++++--- src/lavinmq/clustering/server.cr | 13 ++++---- 5 files changed, 66 insertions(+), 36 deletions(-) diff --git a/spec/clustering/follower_spec.cr b/spec/clustering/follower_spec.cr index 20b73a39c0..942aa2ab1d 100644 --- a/spec/clustering/follower_spec.cr +++ b/spec/clustering/follower_spec.cr @@ -2,12 +2,12 @@ require "../spec_helper" require "lz4" module FollowerSpec - def self.sha1(str : String) - sha1 = Digest::SHA1.new - hash = Bytes.new(sha1.digest_size) - sha1.update str.to_slice - sha1.final hash - sha1.reset + def self.checksum(str : String) + algo = Digest::CRC32.new + hash = Bytes.new(algo.digest_size) + algo.update str.to_slice + algo.final hash + algo.reset hash end @@ -23,13 +23,13 @@ module FollowerSpec def initialize(data_dir : String, files_with_hash : Hash(String, Bytes)? = nil, @with_file : Hash(String, FileType) = DEFAULT_WITH_FILE) @files_with_hash = files_with_hash || Hash(String, Bytes){ - File.join(data_dir, "file1") => FollowerSpec.sha1("hash1"), - File.join(data_dir, "file2") => FollowerSpec.sha1("hash2"), - File.join(data_dir, "file3") => FollowerSpec.sha1("hash3"), + File.join(data_dir, "file1") => FollowerSpec.checksum("hash1"), + File.join(data_dir, "file2") => FollowerSpec.checksum("hash2"), + File.join(data_dir, "file3") => FollowerSpec.checksum("hash3"), } end - def files_with_hash(& : Tuple(String, Bytes) -> Nil) + def files_with_hash(algo : Digest, & : Tuple(String, Bytes) -> Nil) @files_with_hash.each do |values| yield values end @@ -132,7 +132,7 @@ module FollowerSpec loop do len = client_lz4.read_bytes Int32, IO::ByteFormat::LittleEndian break if len == 0 - hash = Bytes.new(20) + hash = Bytes.new(4) path = client_lz4.read_string len client_lz4.read_fully hash file_list[path] = hash diff --git a/src/lavinmq/clustering/client.cr b/src/lavinmq/clustering/client.cr index f76017fc62..6e96eb9ec1 100644 --- a/src/lavinmq/clustering/client.cr +++ b/src/lavinmq/clustering/client.cr @@ -15,6 +15,7 @@ module LavinMQ @unix_http_proxy : Proxy? @socket : TCPSocket? @streamed_bytes = 0_u64 + @version : Bytes = Start def initialize(@config : Config, @id : Int32, @password : String, proxy = true) System.maximize_fd_limit @@ -76,16 +77,24 @@ module LavinMQ @socket = socket = TCPSocket.new(host, port) socket.sync = true socket.read_buffering = false + Log.info { "Connected" } + authenticate(socket) + Log.info { "Authenticated" } + set_socket_opts(socket) lz4 = Compress::LZ4::Reader.new(socket) sync(socket, lz4) Log.info { "Streaming changes" } stream_changes(socket, lz4) + rescue ex : ProtocolVersionError + Log.info { "Leader is running an older replication protocol: #{String.new(ex.version).dump}" } + @version = ex.version rescue ex : IO::Error - lz4.try &.close - socket.try &.close break if @closed Log.info { "Disconnected from server #{host}:#{port} (#{ex}), retrying..." } sleep 1.seconds + ensure + lz4.try &.close + socket.try &.close end end @@ -104,10 +113,6 @@ module LavinMQ end private def sync(socket, lz4) - Log.info { "Connected" } - authenticate(socket) - Log.info { "Authenticated" } - set_socket_opts(socket) sync_files(socket, lz4) Log.info { "Bulk synchronised" } sync_files(socket, lz4) @@ -125,9 +130,9 @@ module LavinMQ private def sync_files(socket, lz4) Log.info { "Waiting for list of files" } - crc32 = Digest::CRC32.new - remote_hash = Bytes.new(crc32.digest_size) - local_hash = Bytes.new(crc32.digest_size) + checksum_algo = @version == Start ? Digest::CRC32.new : Digest::SHA1.new + remote_hash = Bytes.new(checksum_algo.digest_size) + local_hash = Bytes.new(checksum_algo.digest_size) files_to_delete = ls_r(@data_dir) missing_files = Array(String).new loop do @@ -139,9 +144,9 @@ module LavinMQ path = File.join(@data_dir, filename) files_to_delete.delete(path) if File.exists? path - crc32.file(path) - crc32.final(local_hash) - crc32.reset + checksum_algo.file(path) + checksum_algo.final(local_hash) + checksum_algo.reset if local_hash != remote_hash Log.info { "Mismatching hash: #{path}" } move_to_backup path @@ -268,13 +273,21 @@ module LavinMQ end private def authenticate(socket) - socket.write Start + socket.write @version socket.write_bytes @password.bytesize.to_u8, IO::ByteFormat::LittleEndian socket.write @password.to_slice case socket.read_byte when 0 # ok when 1 then raise AuthenticationError.new when nil then raise IO::EOFError.new + when 'R'.ord + buf = uninitialized UInt8[7] + socket.read(buf.to_slice) + if buf == Start100[1, -1] + raise ProtocolVersionError.new(Start100) + else + raise Error.new("Unknown response from authentication: #{String.new(buf.to_slice).dump}") + end else raise Error.new("Unknown response from authentication") end @@ -299,6 +312,14 @@ module LavinMQ super("Authentication error") end end + + class ProtocolVersionError < Error + getter version + + def initialize(@version : Bytes) + super("Protocol version: #{String.new(@version).dump}") + end + end end end end diff --git a/src/lavinmq/clustering/file_index.cr b/src/lavinmq/clustering/file_index.cr index e3db418bed..d475cc382e 100644 --- a/src/lavinmq/clustering/file_index.cr +++ b/src/lavinmq/clustering/file_index.cr @@ -1,7 +1,7 @@ module LavinMQ module Clustering module FileIndex - abstract def files_with_hash(& : Tuple(String, Bytes) -> Nil) + abstract def files_with_hash(algo : Digest, & : Tuple(String, Bytes) -> Nil) abstract def with_file(filename : String, & : MFile | File | Nil -> Nil) : Nil end end diff --git a/src/lavinmq/clustering/follower.cr b/src/lavinmq/clustering/follower.cr index e06a831fdb..cf37bf8220 100644 --- a/src/lavinmq/clustering/follower.cr +++ b/src/lavinmq/clustering/follower.cr @@ -3,6 +3,9 @@ require "./file_index" require "../config" require "socket" require "wait_group" +require "lz4" +require "digest/crc32" +require "digest/sha1" module LavinMQ module Clustering @@ -23,8 +26,10 @@ module LavinMQ @lz4 = Compress::LZ4::Writer.new(@socket, Compress::LZ4::CompressOptions.new(auto_flush: false, block_mode_linked: true)) end + @checksum_algo : Digest = Digest::CRC32.new + def negotiate!(password) : Nil - validate_header! + @checksum_algo = validate_header! authenticate!(password) @id = @socket.read_bytes Int32, IO::ByteFormat::LittleEndian @socket.tcp_nodelay = true @@ -74,11 +79,16 @@ module LavinMQ len end - private def validate_header! : Nil + private def validate_header! : Digest buf = uninitialized UInt8[8] slice = buf.to_slice @socket.read_fully(slice) - if slice != Start + case slice + when Start + Digest::CRC32.new + when Start100 + Digest::SHA1.new + else @socket.write(Start) raise InvalidStartHeaderError.new(slice) end @@ -99,7 +109,7 @@ module LavinMQ private def send_file_list(socket = @lz4) count = 0 - @file_index.files_with_hash do |path, hash| + @file_index.files_with_hash(@checksum_algo) do |path, hash| count &+= 1 path = path[@data_dir.bytesize + 1..] socket.write_bytes path.bytesize.to_i32, IO::ByteFormat::LittleEndian diff --git a/src/lavinmq/clustering/server.cr b/src/lavinmq/clustering/server.cr index f97a2cc3fe..b0b99245e6 100644 --- a/src/lavinmq/clustering/server.cr +++ b/src/lavinmq/clustering/server.cr @@ -71,13 +71,12 @@ module LavinMQ each_follower &.delete(path) end - def files_with_hash(& : Tuple(String, Bytes) -> Nil) - crc32 = Digest::CRC32.new - hash = Bytes.new(crc32.digest_size) - @files.each do |path, mfile| - crc32.file path - crc32.final hash - crc32.reset + def files_with_hash(algo : Digest, & : Tuple(String, Bytes) -> Nil) + hash = Bytes.new(algo.digest_size) + @files.each_key do |path| + algo.file path + algo.final hash + algo.reset yield({path, hash}) end end From 733425c0b349c8ba91cfd4937281e84cce420afa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Tue, 7 Jan 2025 21:45:21 +0100 Subject: [PATCH 3/3] spec --- spec/clustering/follower_spec.cr | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spec/clustering/follower_spec.cr b/spec/clustering/follower_spec.cr index 942aa2ab1d..915990ffd3 100644 --- a/spec/clustering/follower_spec.cr +++ b/spec/clustering/follower_spec.cr @@ -113,6 +113,9 @@ module FollowerSpec response.should eq 0u8 end end + + it "can negotiate older protocol version" do + end end end