Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep Etcd dependencies to Clustering::Controller #913

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions spec/clustering_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@ describe LavinMQ::Clustering::Client do
end

it "can stream changes" do
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0)
controller = LavinMQ::Clustering::Controller.new(
LavinMQ::Config.instance,
LavinMQ::Etcd.new("localhost:12379"))
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, controller)
tcp_server = TCPServer.new("localhost", 0)
spawn(replicator.listen(tcp_server), name: "repli server spec")
config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir
repli = LavinMQ::Clustering::Client.new(config, 1, replicator.password, proxy: false)
repli = LavinMQ::Clustering::Client.new(config, 1, controller.password, proxy: false)
done = Channel(Nil).new
spawn(name: "follow spec") do
repli.follow("localhost", tcp_server.local_address.port)
Expand Down Expand Up @@ -79,11 +82,14 @@ describe LavinMQ::Clustering::Client do
end

it "can stream full file" do
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0)
controller = LavinMQ::Clustering::Controller.new(
LavinMQ::Config.instance,
LavinMQ::Etcd.new("localhost:12379"))
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, controller)
tcp_server = TCPServer.new("localhost", 0)
spawn(replicator.listen(tcp_server), name: "repli server spec")
config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir
repli = LavinMQ::Clustering::Client.new(config, 1, replicator.password, proxy: false)
repli = LavinMQ::Clustering::Client.new(config, 1, controller.password, proxy: false)
done = Channel(Nil).new
spawn(name: "follow spec") do
repli.follow("localhost", tcp_server.local_address.port)
Expand Down Expand Up @@ -113,7 +119,9 @@ describe LavinMQ::Clustering::Client do
config1.clustering_port = 5681
config1.amqp_port = 5671
config1.http_port = 15671
controller1 = LavinMQ::Clustering::Controller.new(config1)
controller1 = LavinMQ::Clustering::Controller.new(
config1,
LavinMQ::Etcd.new(config1.clustering_etcd_endpoints))

config2 = LavinMQ::Config.new
config2.data_dir = "/tmp/failover2"
Expand All @@ -122,7 +130,9 @@ describe LavinMQ::Clustering::Client do
config2.clustering_port = 5682
config2.amqp_port = 5672
config2.http_port = 15672
controller2 = LavinMQ::Clustering::Controller.new(config2)
controller2 = LavinMQ::Clustering::Controller.new(
config2,
LavinMQ::Etcd.new(config2.clustering_etcd_endpoints))

listen = Channel(String).new
spawn(name: "etcd elect leader spec") do
Expand Down
16 changes: 16 additions & 0 deletions src/lavinmq/clustering/controller.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@ class LavinMQ::Clustering::Controller
@id = clustering_id
@advertised_uri = @config.clustering_advertised_uri ||
"tcp://#{System.hostname}:#{@config.clustering_port}"
@isr_key = "#{@config.clustering_etcd_prefix}/isr"
end

def password : String
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can/will only be called by the leader? otherwise we have a racecondition in this method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The Clustering::Client is created by the Clustering::Controller and the password is passed as constructor argument. Clustering::Client doesn't know about neither Etcd nor Clustering::Controller.

Maybe Clustering::Server should be created by Clustering::Controller too. Then #password wont have to be public and Clustering::Controller handles it all.

key = "#{@config.clustering_etcd_prefix}/clustering_secret"
@etcd.get(key) ||
begin
Log.info { "Generating new clustering secret" }
secret = Random::Secure.base64(32)
@etcd.put(key, secret)
secret
end
end

def update_isr(value)
@etcd.put(@isr_key, value)
end

# This method is called by the Launcher#run.
Expand Down
1 change: 0 additions & 1 deletion src/lavinmq/clustering/replicator.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ module LavinMQ
abstract def close
abstract def listen(server : TCPServer)
abstract def clear
abstract def password : String
end
end
end
36 changes: 12 additions & 24 deletions src/lavinmq/clustering/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require "../message"
require "../mfile"
require "crypto/subtle"
require "lz4"
require "../etcd"
require "./controller"

module LavinMQ
module Clustering
Expand All @@ -34,11 +34,12 @@ module LavinMQ
@id : Int32
@config : Config

def initialize(config : Config, @etcd : Etcd, @id : Int32)
def initialize(config : Config, @controller : Controller)
@id = @controller.id
Log.info { "ID: #{@id.to_s(36)}" }
@config = config
@data_dir = @config.data_dir
@password = password
@password = @controller.password
end

def clear
Expand Down Expand Up @@ -104,17 +105,6 @@ module LavinMQ
end
end

def password : String
key = "#{@config.clustering_etcd_prefix}/clustering_secret"
@etcd.get(key) ||
begin
Log.info { "Generating new clustering secret" }
secret = Random::Secure.base64(32)
@etcd.put(key, secret)
secret
end
end

@listeners = Array(TCPServer).new(1)

def listen(server : TCPServer)
Expand Down Expand Up @@ -165,13 +155,15 @@ module LavinMQ
end

private def update_isr
isr_key = "#{@config.clustering_etcd_prefix}/isr"
ids = String.build do |str|
@followers.each { |f| f.id.to_s(str, 36); str << "," }
@id.to_s(str, 36)
isr = String.build do |str|
@followers.each do |follower|
follower.id.to_s(str, 36)
str << ","
end
@controller.id.to_s(str, 36)
end
Log.info { "In-sync replicas: #{ids}" }
@etcd.put(isr_key, ids)
@controller.update_isr isr
Log.info { "ISR updated: #{isr}" }
@dirty_isr = false
end

Expand Down Expand Up @@ -229,10 +221,6 @@ module LavinMQ

def clear
end

def password : String
""
end
end
end
end
2 changes: 1 addition & 1 deletion src/lavinmq/launcher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ module LavinMQ
if @config.clustering?
etcd = Etcd.new(@config.clustering_etcd_endpoints)
@runner = controller = Clustering::Controller.new(@config, etcd)
@replicator = Clustering::Server.new(@config, etcd, controller.id)
@replicator = Clustering::Server.new(@config, controller)
else
@replicator = Clustering::NoopServer.new
@runner = StandaloneRunner.new
Expand Down
Loading