diff --git a/spec/clustering/follower_spec.cr b/spec/clustering/follower_spec.cr index 20b73a39c0..915990ffd3 100644 --- a/spec/clustering/follower_spec.cr +++ b/spec/clustering/follower_spec.cr @@ -2,12 +2,12 @@ require "../spec_helper" require "lz4" module FollowerSpec - def self.sha1(str : String) - sha1 = Digest::SHA1.new - hash = Bytes.new(sha1.digest_size) - sha1.update str.to_slice - sha1.final hash - sha1.reset + def self.checksum(str : String) + algo = Digest::CRC32.new + hash = Bytes.new(algo.digest_size) + algo.update str.to_slice + algo.final hash + algo.reset hash end @@ -23,13 +23,13 @@ module FollowerSpec def initialize(data_dir : String, files_with_hash : Hash(String, Bytes)? = nil, @with_file : Hash(String, FileType) = DEFAULT_WITH_FILE) @files_with_hash = files_with_hash || Hash(String, Bytes){ - File.join(data_dir, "file1") => FollowerSpec.sha1("hash1"), - File.join(data_dir, "file2") => FollowerSpec.sha1("hash2"), - File.join(data_dir, "file3") => FollowerSpec.sha1("hash3"), + File.join(data_dir, "file1") => FollowerSpec.checksum("hash1"), + File.join(data_dir, "file2") => FollowerSpec.checksum("hash2"), + File.join(data_dir, "file3") => FollowerSpec.checksum("hash3"), } end - def files_with_hash(& : Tuple(String, Bytes) -> Nil) + def files_with_hash(algo : Digest, & : Tuple(String, Bytes) -> Nil) @files_with_hash.each do |values| yield values end @@ -113,6 +113,9 @@ module FollowerSpec response.should eq 0u8 end end + + it "can negotiate older protocol version" do + end end end @@ -132,7 +135,7 @@ module FollowerSpec loop do len = client_lz4.read_bytes Int32, IO::ByteFormat::LittleEndian break if len == 0 - hash = Bytes.new(20) + hash = Bytes.new(4) path = client_lz4.read_string len client_lz4.read_fully hash file_list[path] = hash diff --git a/src/lavinmq/clustering.cr b/src/lavinmq/clustering.cr index 5d038246d3..7fac75f535 100644 --- a/src/lavinmq/clustering.cr +++ b/src/lavinmq/clustering.cr @@ -4,7 +4,8 @@ require "digest/sha1" module LavinMQ module Clustering - Start = Bytes['R'.ord, 'E'.ord, 'P'.ord, 'L'.ord, 'I'.ord, 1, 0, 0] + Start100 = Bytes['R'.ord, 'E'.ord, 'P'.ord, 'L'.ord, 'I'.ord, 1, 0, 0] + Start = Bytes['R'.ord, 'E'.ord, 'P'.ord, 'L'.ord, 'I'.ord, 1, 0, 1] class Error < Exception; end diff --git a/src/lavinmq/clustering/client.cr b/src/lavinmq/clustering/client.cr index 6e5e88c092..6e96eb9ec1 100644 --- a/src/lavinmq/clustering/client.cr +++ b/src/lavinmq/clustering/client.cr @@ -15,6 +15,7 @@ module LavinMQ @unix_http_proxy : Proxy? @socket : TCPSocket? @streamed_bytes = 0_u64 + @version : Bytes = Start def initialize(@config : Config, @id : Int32, @password : String, proxy = true) System.maximize_fd_limit @@ -76,16 +77,24 @@ module LavinMQ @socket = socket = TCPSocket.new(host, port) socket.sync = true socket.read_buffering = false + Log.info { "Connected" } + authenticate(socket) + Log.info { "Authenticated" } + set_socket_opts(socket) lz4 = Compress::LZ4::Reader.new(socket) sync(socket, lz4) Log.info { "Streaming changes" } stream_changes(socket, lz4) + rescue ex : ProtocolVersionError + Log.info { "Leader is running an older replication protocol: #{String.new(ex.version).dump}" } + @version = ex.version rescue ex : IO::Error - lz4.try &.close - socket.try &.close break if @closed Log.info { "Disconnected from server #{host}:#{port} (#{ex}), retrying..." } sleep 1.seconds + ensure + lz4.try &.close + socket.try &.close end end @@ -104,10 +113,6 @@ module LavinMQ end private def sync(socket, lz4) - Log.info { "Connected" } - authenticate(socket) - Log.info { "Authenticated" } - set_socket_opts(socket) sync_files(socket, lz4) Log.info { "Bulk synchronised" } sync_files(socket, lz4) @@ -125,9 +130,9 @@ module LavinMQ private def sync_files(socket, lz4) Log.info { "Waiting for list of files" } - sha1 = Digest::SHA1.new - remote_hash = Bytes.new(sha1.digest_size) - local_hash = Bytes.new(sha1.digest_size) + checksum_algo = @version == Start ? Digest::CRC32.new : Digest::SHA1.new + remote_hash = Bytes.new(checksum_algo.digest_size) + local_hash = Bytes.new(checksum_algo.digest_size) files_to_delete = ls_r(@data_dir) missing_files = Array(String).new loop do @@ -139,9 +144,9 @@ module LavinMQ path = File.join(@data_dir, filename) files_to_delete.delete(path) if File.exists? path - sha1.file(path) - sha1.final(local_hash) - sha1.reset + checksum_algo.file(path) + checksum_algo.final(local_hash) + checksum_algo.reset if local_hash != remote_hash Log.info { "Mismatching hash: #{path}" } move_to_backup path @@ -268,13 +273,21 @@ module LavinMQ end private def authenticate(socket) - socket.write Start + socket.write @version socket.write_bytes @password.bytesize.to_u8, IO::ByteFormat::LittleEndian socket.write @password.to_slice case socket.read_byte when 0 # ok when 1 then raise AuthenticationError.new when nil then raise IO::EOFError.new + when 'R'.ord + buf = uninitialized UInt8[7] + socket.read(buf.to_slice) + if buf == Start100[1, -1] + raise ProtocolVersionError.new(Start100) + else + raise Error.new("Unknown response from authentication: #{String.new(buf.to_slice).dump}") + end else raise Error.new("Unknown response from authentication") end @@ -299,6 +312,14 @@ module LavinMQ super("Authentication error") end end + + class ProtocolVersionError < Error + getter version + + def initialize(@version : Bytes) + super("Protocol version: #{String.new(@version).dump}") + end + end end end end diff --git a/src/lavinmq/clustering/file_index.cr b/src/lavinmq/clustering/file_index.cr index e3db418bed..d475cc382e 100644 --- a/src/lavinmq/clustering/file_index.cr +++ b/src/lavinmq/clustering/file_index.cr @@ -1,7 +1,7 @@ module LavinMQ module Clustering module FileIndex - abstract def files_with_hash(& : Tuple(String, Bytes) -> Nil) + abstract def files_with_hash(algo : Digest, & : Tuple(String, Bytes) -> Nil) abstract def with_file(filename : String, & : MFile | File | Nil -> Nil) : Nil end end diff --git a/src/lavinmq/clustering/follower.cr b/src/lavinmq/clustering/follower.cr index e06a831fdb..cf37bf8220 100644 --- a/src/lavinmq/clustering/follower.cr +++ b/src/lavinmq/clustering/follower.cr @@ -3,6 +3,9 @@ require "./file_index" require "../config" require "socket" require "wait_group" +require "lz4" +require "digest/crc32" +require "digest/sha1" module LavinMQ module Clustering @@ -23,8 +26,10 @@ module LavinMQ @lz4 = Compress::LZ4::Writer.new(@socket, Compress::LZ4::CompressOptions.new(auto_flush: false, block_mode_linked: true)) end + @checksum_algo : Digest = Digest::CRC32.new + def negotiate!(password) : Nil - validate_header! + @checksum_algo = validate_header! authenticate!(password) @id = @socket.read_bytes Int32, IO::ByteFormat::LittleEndian @socket.tcp_nodelay = true @@ -74,11 +79,16 @@ module LavinMQ len end - private def validate_header! : Nil + private def validate_header! : Digest buf = uninitialized UInt8[8] slice = buf.to_slice @socket.read_fully(slice) - if slice != Start + case slice + when Start + Digest::CRC32.new + when Start100 + Digest::SHA1.new + else @socket.write(Start) raise InvalidStartHeaderError.new(slice) end @@ -99,7 +109,7 @@ module LavinMQ private def send_file_list(socket = @lz4) count = 0 - @file_index.files_with_hash do |path, hash| + @file_index.files_with_hash(@checksum_algo) do |path, hash| count &+= 1 path = path[@data_dir.bytesize + 1..] socket.write_bytes path.bytesize.to_i32, IO::ByteFormat::LittleEndian diff --git a/src/lavinmq/clustering/server.cr b/src/lavinmq/clustering/server.cr index 7834b28cfe..b0b99245e6 100644 --- a/src/lavinmq/clustering/server.cr +++ b/src/lavinmq/clustering/server.cr @@ -14,7 +14,7 @@ module LavinMQ # When a follower connects: # It sends a static header (wrong header disconnects the client) # It sends its password (servers closes the connection if the password is wrong) - # Server sends a list of files in its data directory and the sha1 hash of those files + # Server sends a list of files in its data directory and the crc32 hash of those files # Client requests files that is missing or has mismatching checksums of # In the meantime the server queues up changes (all publishes/consumes are paused) # When client doesn't request more files starts to stream changes @@ -71,13 +71,12 @@ module LavinMQ each_follower &.delete(path) end - def files_with_hash(& : Tuple(String, Bytes) -> Nil) - sha1 = Digest::SHA1.new - hash = Bytes.new(sha1.digest_size) + def files_with_hash(algo : Digest, & : Tuple(String, Bytes) -> Nil) + hash = Bytes.new(algo.digest_size) @files.each_key do |path| - sha1.file path - sha1.final hash - sha1.reset + algo.file path + algo.final hash + algo.reset yield({path, hash}) end end