Skip to content

Commit

Permalink
Backwards compabilit with replication protocl version 1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Jan 3, 2025
1 parent 3758af1 commit e3d3e34
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 42 deletions.
22 changes: 11 additions & 11 deletions spec/clustering/follower_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ require "../spec_helper"
require "lz4"

module FollowerSpec
def self.sha1(str : String)
sha1 = Digest::SHA1.new
hash = Bytes.new(sha1.digest_size)
sha1.update str.to_slice
sha1.final hash
sha1.reset
def self.checksum(str : String)
algo = Digest::CRC32.new
hash = Bytes.new(algo.digest_size)
algo.update str.to_slice
algo.final hash
algo.reset
hash
end

Expand All @@ -23,13 +23,13 @@ module FollowerSpec
def initialize(data_dir : String, files_with_hash : Hash(String, Bytes)? = nil,
@with_file : Hash(String, FileType) = DEFAULT_WITH_FILE)
@files_with_hash = files_with_hash || Hash(String, Bytes){
File.join(data_dir, "file1") => FollowerSpec.sha1("hash1"),
File.join(data_dir, "file2") => FollowerSpec.sha1("hash2"),
File.join(data_dir, "file3") => FollowerSpec.sha1("hash3"),
File.join(data_dir, "file1") => FollowerSpec.checksum("hash1"),
File.join(data_dir, "file2") => FollowerSpec.checksum("hash2"),
File.join(data_dir, "file3") => FollowerSpec.checksum("hash3"),
}
end

def files_with_hash(& : Tuple(String, Bytes) -> Nil)
def files_with_hash(algo : Digest, & : Tuple(String, Bytes) -> Nil)
@files_with_hash.each do |values|
yield values
end
Expand Down Expand Up @@ -132,7 +132,7 @@ module FollowerSpec
loop do
len = client_lz4.read_bytes Int32, IO::ByteFormat::LittleEndian
break if len == 0
hash = Bytes.new(20)
hash = Bytes.new(4)
path = client_lz4.read_string len
client_lz4.read_fully hash
file_list[path] = hash
Expand Down
47 changes: 34 additions & 13 deletions src/lavinmq/clustering/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module LavinMQ
@unix_http_proxy : Proxy?
@socket : TCPSocket?
@streamed_bytes = 0_u64
@version : Bytes = Start

def initialize(@config : Config, @id : Int32, @password : String, proxy = true)
System.maximize_fd_limit
Expand Down Expand Up @@ -75,16 +76,24 @@ module LavinMQ
@socket = socket = TCPSocket.new(host, port)
socket.sync = true
socket.read_buffering = false
Log.info { "Connected" }
authenticate(socket)
Log.info { "Authenticated" }
set_socket_opts(socket)
lz4 = Compress::LZ4::Reader.new(socket)
sync(socket, lz4)
Log.info { "Streaming changes" }
stream_changes(socket, lz4)
rescue ex : ProtocolVersionError
Log.info { "Leader is running an older replication protocol: #{String.new(ex.version).dump}" }
@version = ex.version
rescue ex : IO::Error
lz4.try &.close
socket.try &.close
break if @closed
Log.info { "Disconnected from server #{host}:#{port} (#{ex}), retrying..." }
sleep 1.seconds
ensure
lz4.try &.close
socket.try &.close
end
end

Expand All @@ -103,10 +112,6 @@ module LavinMQ
end

private def sync(socket, lz4)
Log.info { "Connected" }
authenticate(socket)
Log.info { "Authenticated" }
set_socket_opts(socket)
sync_files(socket, lz4)
Log.info { "Bulk synchronised" }
sync_files(socket, lz4)
Expand All @@ -124,9 +129,9 @@ module LavinMQ

private def sync_files(socket, lz4)
Log.info { "Waiting for list of files" }
crc32 = Digest::CRC32.new
remote_hash = Bytes.new(crc32.digest_size)
local_hash = Bytes.new(crc32.digest_size)
checksum_algo = @version == Start ? Digest::CRC32.new : Digest::SHA1.new
remote_hash = Bytes.new(checksum_algo.digest_size)
local_hash = Bytes.new(checksum_algo.digest_size)
files_to_delete = ls_r(@data_dir)
missing_files = Array(String).new
loop do
Expand All @@ -138,9 +143,9 @@ module LavinMQ
path = File.join(@data_dir, filename)
files_to_delete.delete(path)
if File.exists? path
crc32.file(path)
crc32.final(local_hash)
crc32.reset
checksum_algo.file(path)
checksum_algo.final(local_hash)
checksum_algo.reset
if local_hash != remote_hash
Log.info { "Mismatching hash: #{path}" }
move_to_backup path
Expand Down Expand Up @@ -267,13 +272,21 @@ module LavinMQ
end

private def authenticate(socket)
socket.write Start
socket.write @version
socket.write_bytes @password.bytesize.to_u8, IO::ByteFormat::LittleEndian
socket.write @password.to_slice
case socket.read_byte
when 0 # ok
when 1 then raise AuthenticationError.new
when nil then raise IO::EOFError.new
when 'R'.ord
buf = uninitialized UInt8[7]
socket.read(buf.to_slice)
if buf == Start100[1, -1]
raise ProtocolVersionError.new(Start100)
else
raise Error.new("Unknown response from authentication: #{String.new(buf.to_slice).dump}")
end
else
raise Error.new("Unknown response from authentication")
end
Expand All @@ -298,6 +311,14 @@ module LavinMQ
super("Authentication error")
end
end

class ProtocolVersionError < Error
getter version

def initialize(@version : Bytes)
super("Protocol version: #{String.new(@version).dump}")
end
end
end
end
end
2 changes: 1 addition & 1 deletion src/lavinmq/clustering/file_index.cr
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module LavinMQ
module Clustering
module FileIndex
abstract def files_with_hash(& : Tuple(String, Bytes) -> Nil)
abstract def files_with_hash(algo : Digest, & : Tuple(String, Bytes) -> Nil)
abstract def with_file(filename : String, & : MFile | File | Nil -> Nil) : Nil
end
end
Expand Down
18 changes: 14 additions & 4 deletions src/lavinmq/clustering/follower.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ require "./actions"
require "./file_index"
require "../config"
require "socket"
require "lz4"
require "digest/crc32"
require "digest/sha1"

module LavinMQ
module Clustering
Expand All @@ -22,8 +25,10 @@ module LavinMQ
@lz4 = Compress::LZ4::Writer.new(@socket, Compress::LZ4::CompressOptions.new(auto_flush: false, block_mode_linked: true))
end

@checksum_algo : Digest = Digest::CRC32.new

def negotiate!(password) : Nil
validate_header!
@checksum_algo = validate_header!
authenticate!(password)
@id = @socket.read_bytes Int32, IO::ByteFormat::LittleEndian
@socket.read_timeout = nil
Expand Down Expand Up @@ -81,11 +86,16 @@ module LavinMQ
len
end

private def validate_header! : Nil
private def validate_header! : Digest
buf = uninitialized UInt8[8]
slice = buf.to_slice
@socket.read_fully(slice)
if slice != Start
case slice
when Start
Digest::CRC32.new
when Start100
Digest::SHA1.new
else
@socket.write(Start)
raise InvalidStartHeaderError.new(slice)
end
Expand All @@ -106,7 +116,7 @@ module LavinMQ

private def send_file_list(socket = @lz4)
count = 0
@file_index.files_with_hash do |path, hash|
@file_index.files_with_hash(@checksum_algo) do |path, hash|
count &+= 1
path = path[@data_dir.bytesize + 1..]
socket.write_bytes path.bytesize.to_i32, IO::ByteFormat::LittleEndian
Expand Down
19 changes: 6 additions & 13 deletions src/lavinmq/clustering/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,12 @@ module LavinMQ
each_follower &.delete(path)
end

def files_with_hash(& : Tuple(String, Bytes) -> Nil)
crc32 = Digest::CRC32.new
hash = Bytes.new(crc32.digest_size)
@files.each do |path, mfile|
if mfile
was_unmapped = mfile.unmapped?
crc32.update mfile.to_slice
mfile.unmap if was_unmapped
else
crc32.file path
end
crc32.final hash
crc32.reset
def files_with_hash(algo : Digest, & : Tuple(String, Bytes) -> Nil)
hash = Bytes.new(algo.digest_size)
@files.each_key do |path|
algo.file path
algo.final hash
algo.reset
yield({path, hash})
end
end
Expand Down

0 comments on commit e3d3e34

Please sign in to comment.