Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CRC32 for the replication protocol #897

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions spec/clustering/follower_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ require "../spec_helper"
require "lz4"

module FollowerSpec
def self.sha1(str : String)
sha1 = Digest::SHA1.new
hash = Bytes.new(sha1.digest_size)
sha1.update str.to_slice
sha1.final hash
sha1.reset
def self.checksum(str : String)
algo = Digest::CRC32.new
hash = Bytes.new(algo.digest_size)
algo.update str.to_slice
algo.final hash
algo.reset
hash
end

Expand All @@ -23,13 +23,13 @@ module FollowerSpec
def initialize(data_dir : String, files_with_hash : Hash(String, Bytes)? = nil,
@with_file : Hash(String, FileType) = DEFAULT_WITH_FILE)
@files_with_hash = files_with_hash || Hash(String, Bytes){
File.join(data_dir, "file1") => FollowerSpec.sha1("hash1"),
File.join(data_dir, "file2") => FollowerSpec.sha1("hash2"),
File.join(data_dir, "file3") => FollowerSpec.sha1("hash3"),
File.join(data_dir, "file1") => FollowerSpec.checksum("hash1"),
File.join(data_dir, "file2") => FollowerSpec.checksum("hash2"),
File.join(data_dir, "file3") => FollowerSpec.checksum("hash3"),
}
end

def files_with_hash(& : Tuple(String, Bytes) -> Nil)
def files_with_hash(algo : Digest, & : Tuple(String, Bytes) -> Nil)
@files_with_hash.each do |values|
yield values
end
Expand Down Expand Up @@ -113,6 +113,9 @@ module FollowerSpec
response.should eq 0u8
end
end

it "can negotiate older protocol version" do
end
end
end

Expand All @@ -132,7 +135,7 @@ module FollowerSpec
loop do
len = client_lz4.read_bytes Int32, IO::ByteFormat::LittleEndian
break if len == 0
hash = Bytes.new(20)
hash = Bytes.new(4)
path = client_lz4.read_string len
client_lz4.read_fully hash
file_list[path] = hash
Expand Down
3 changes: 2 additions & 1 deletion src/lavinmq/clustering.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ require "digest/sha1"

module LavinMQ
module Clustering
Start = Bytes['R'.ord, 'E'.ord, 'P'.ord, 'L'.ord, 'I'.ord, 1, 0, 0]
Start100 = Bytes['R'.ord, 'E'.ord, 'P'.ord, 'L'.ord, 'I'.ord, 1, 0, 0]
Start = Bytes['R'.ord, 'E'.ord, 'P'.ord, 'L'.ord, 'I'.ord, 1, 0, 1]

class Error < Exception; end

Expand Down
47 changes: 34 additions & 13 deletions src/lavinmq/clustering/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module LavinMQ
@unix_http_proxy : Proxy?
@socket : TCPSocket?
@streamed_bytes = 0_u64
@version : Bytes = Start

def initialize(@config : Config, @id : Int32, @password : String, proxy = true)
System.maximize_fd_limit
Expand Down Expand Up @@ -76,16 +77,24 @@ module LavinMQ
@socket = socket = TCPSocket.new(host, port)
socket.sync = true
socket.read_buffering = false
Log.info { "Connected" }
authenticate(socket)
Log.info { "Authenticated" }
set_socket_opts(socket)
lz4 = Compress::LZ4::Reader.new(socket)
sync(socket, lz4)
Log.info { "Streaming changes" }
stream_changes(socket, lz4)
rescue ex : ProtocolVersionError
Log.info { "Leader is running an older replication protocol: #{String.new(ex.version).dump}" }
@version = ex.version
rescue ex : IO::Error
lz4.try &.close
socket.try &.close
break if @closed
Log.info { "Disconnected from server #{host}:#{port} (#{ex}), retrying..." }
sleep 1.seconds
ensure
lz4.try &.close
socket.try &.close
end
end

Expand All @@ -104,10 +113,6 @@ module LavinMQ
end

private def sync(socket, lz4)
Log.info { "Connected" }
authenticate(socket)
Log.info { "Authenticated" }
set_socket_opts(socket)
sync_files(socket, lz4)
Log.info { "Bulk synchronised" }
sync_files(socket, lz4)
Expand All @@ -125,9 +130,9 @@ module LavinMQ

private def sync_files(socket, lz4)
Log.info { "Waiting for list of files" }
sha1 = Digest::SHA1.new
remote_hash = Bytes.new(sha1.digest_size)
local_hash = Bytes.new(sha1.digest_size)
checksum_algo = @version == Start ? Digest::CRC32.new : Digest::SHA1.new
remote_hash = Bytes.new(checksum_algo.digest_size)
local_hash = Bytes.new(checksum_algo.digest_size)
files_to_delete = ls_r(@data_dir)
missing_files = Array(String).new
loop do
Expand All @@ -139,9 +144,9 @@ module LavinMQ
path = File.join(@data_dir, filename)
files_to_delete.delete(path)
if File.exists? path
sha1.file(path)
sha1.final(local_hash)
sha1.reset
checksum_algo.file(path)
checksum_algo.final(local_hash)
checksum_algo.reset
if local_hash != remote_hash
Log.info { "Mismatching hash: #{path}" }
move_to_backup path
Expand Down Expand Up @@ -268,13 +273,21 @@ module LavinMQ
end

private def authenticate(socket)
socket.write Start
socket.write @version
socket.write_bytes @password.bytesize.to_u8, IO::ByteFormat::LittleEndian
socket.write @password.to_slice
case socket.read_byte
when 0 # ok
when 1 then raise AuthenticationError.new
when nil then raise IO::EOFError.new
when 'R'.ord
buf = uninitialized UInt8[7]
socket.read(buf.to_slice)
if buf == Start100[1, -1]
raise ProtocolVersionError.new(Start100)
else
raise Error.new("Unknown response from authentication: #{String.new(buf.to_slice).dump}")
end
else
raise Error.new("Unknown response from authentication")
end
Expand All @@ -299,6 +312,14 @@ module LavinMQ
super("Authentication error")
end
end

class ProtocolVersionError < Error
getter version

def initialize(@version : Bytes)
super("Protocol version: #{String.new(@version).dump}")
end
end
end
end
end
2 changes: 1 addition & 1 deletion src/lavinmq/clustering/file_index.cr
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module LavinMQ
module Clustering
module FileIndex
abstract def files_with_hash(& : Tuple(String, Bytes) -> Nil)
abstract def files_with_hash(algo : Digest, & : Tuple(String, Bytes) -> Nil)
abstract def with_file(filename : String, & : MFile | File | Nil -> Nil) : Nil
end
end
Expand Down
18 changes: 14 additions & 4 deletions src/lavinmq/clustering/follower.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ require "./file_index"
require "../config"
require "socket"
require "wait_group"
require "lz4"
require "digest/crc32"
require "digest/sha1"

module LavinMQ
module Clustering
Expand All @@ -23,8 +26,10 @@ module LavinMQ
@lz4 = Compress::LZ4::Writer.new(@socket, Compress::LZ4::CompressOptions.new(auto_flush: false, block_mode_linked: true))
end

@checksum_algo : Digest = Digest::CRC32.new

def negotiate!(password) : Nil
validate_header!
@checksum_algo = validate_header!
authenticate!(password)
@id = @socket.read_bytes Int32, IO::ByteFormat::LittleEndian
@socket.tcp_nodelay = true
Expand Down Expand Up @@ -74,11 +79,16 @@ module LavinMQ
len
end

private def validate_header! : Nil
private def validate_header! : Digest
buf = uninitialized UInt8[8]
slice = buf.to_slice
@socket.read_fully(slice)
if slice != Start
case slice
when Start
Digest::CRC32.new
when Start100
Digest::SHA1.new
else
@socket.write(Start)
raise InvalidStartHeaderError.new(slice)
end
Expand All @@ -99,7 +109,7 @@ module LavinMQ

private def send_file_list(socket = @lz4)
count = 0
@file_index.files_with_hash do |path, hash|
@file_index.files_with_hash(@checksum_algo) do |path, hash|
count &+= 1
path = path[@data_dir.bytesize + 1..]
socket.write_bytes path.bytesize.to_i32, IO::ByteFormat::LittleEndian
Expand Down
13 changes: 6 additions & 7 deletions src/lavinmq/clustering/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module LavinMQ
# When a follower connects:
# It sends a static header (wrong header disconnects the client)
# It sends its password (servers closes the connection if the password is wrong)
# Server sends a list of files in its data directory and the sha1 hash of those files
# Server sends a list of files in its data directory and the crc32 hash of those files
# Client requests files that is missing or has mismatching checksums of
# In the meantime the server queues up changes (all publishes/consumes are paused)
# When client doesn't request more files starts to stream changes
Expand Down Expand Up @@ -71,13 +71,12 @@ module LavinMQ
each_follower &.delete(path)
end

def files_with_hash(& : Tuple(String, Bytes) -> Nil)
sha1 = Digest::SHA1.new
hash = Bytes.new(sha1.digest_size)
def files_with_hash(algo : Digest, & : Tuple(String, Bytes) -> Nil)
hash = Bytes.new(algo.digest_size)
@files.each_key do |path|
sha1.file path
sha1.final hash
sha1.reset
algo.file path
algo.final hash
algo.reset
yield({path, hash})
end
end
Expand Down
Loading