Skip to content

Commit

Permalink
temp/wip auth chain
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Jan 14, 2025
1 parent fea68a6 commit 354d8f0
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 68 deletions.
12 changes: 6 additions & 6 deletions src/lavinmq/amqp/connection_factory.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@ require "./client"
require "../client/connection_factory"
require "../auth/handlers/basic_auth"
require "../auth/handlers/oauth2"
require "../auth/handlers/http_auth"

module LavinMQ
module AMQP
class ConnectionFactory < LavinMQ::ConnectionFactory
Log = LavinMQ::Log.for "amqp.connection_factory"

def start(socket, connection_info, vhosts, users) : Client?
def start(socket, connection_info, vhosts, users, auth_chain) : Client?
remote_address = connection_info.src
socket.read_timeout = 15.seconds
metadata = ::Log::Metadata.build({address: remote_address.to_s})
logger = Logger.new(Log, metadata)
if confirm_header(socket, logger)
if start_ok = start(socket, logger)
if user = authenticate(socket, remote_address, users, start_ok, logger)
if user = authenticate(socket, remote_address, users, start_ok, logger, auth_chain)
if tune_ok = tune(socket, logger)
if vhost = open(socket, vhosts, user, logger)
socket.read_timeout = heartbeat_timeout(tune_ok)
Expand Down Expand Up @@ -49,7 +50,7 @@ module LavinMQ
elsif proto != AMQP::PROTOCOL_START_0_9_1 && proto != AMQP::PROTOCOL_START_0_9
socket.write AMQP::PROTOCOL_START_0_9_1.to_slice
socket.flush
log.warn { "Unexpected protocol #{String.new(proto.to_unsafe, count).inspect}, closing socket" }
log.warn { "Unexpected protocol '#{String.new(proto.to_slice)}', closing socket" }
false
else
true
Expand Down Expand Up @@ -102,11 +103,10 @@ module LavinMQ
end
end

def authenticate(socket, remote_address, users, start_ok, log)
def authenticate(socket, remote_address, users, start_ok, log, auth_chain)
username, password = credentials(start_ok)
# TODO: only initialize handler for the configured auth methods
auth_handler = LavinMQ::BasicAuthHandler.new(LavinMQ::OAuth2Handler.new)
return auth_handler.authenticate(username, password, users, remote_address)
return auth_chain.authenticate(username, password) #(username, password, users, remote_address)
props = start_ok.client_properties
if capabilities = props["capabilities"]?.try &.as?(AMQP::Table)
if capabilities["authentication_failure_close"]?.try &.as?(Bool)
Expand Down
112 changes: 56 additions & 56 deletions src/lavinmq/auth/auth_cache.cr
Original file line number Diff line number Diff line change
@@ -1,66 +1,66 @@
module LavinMQ
class CacheEntry(T)
getter value : T
getter expires_at : Time
# module LavinMQ
# class CacheEntry(T)
# getter value : T
# getter expires_at : Time

def initialize(@value : T, ttl : Time::Span)
@expires_at = Time.utc + ttl
end
# def initialize(@value : T, ttl : Time::Span)
# @expires_at = Time.utc + ttl
# end

def expired? : Bool
Time.utc > @expires_at
end
end
# def expired? : Bool
# Time.utc > @expires_at
# end
# end

class Cache(K, V)
def initialize(@default_ttl : Time::Span = 1.hour)
@mutex = Mutex.new
@data = Hash(K, CacheEntry(V)).new
end
# class Cache(K, V)
# def initialize(@default_ttl : Time::Span = 1.hour)
# @mutex = Mutex.new
# @data = Hash(K, CacheEntry(V)).new
# end

def set(key : K, value : V, ttl : Time::Span = @default_ttl) : V
@mutex.synchronize do
@data[key] = CacheEntry.new(value, ttl)
value
end
end
# def set(key : K, value : V, ttl : Time::Span = @default_ttl) : V
# @mutex.synchronize do
# @data[key] = CacheEntry.new(value, ttl)
# value
# end
# end

def get?(key : K) : V?
@mutex.synchronize do
entry = @data[key]?
return nil unless entry
# def get?(key : K) : V?
# @mutex.synchronize do
# entry = @data[key]?
# return nil unless entry

if entry.expired?
@data.delete(key)
nil
else
entry.value
end
end
end
# if entry.expired?
# @data.delete(key)
# nil
# else
# entry.value
# end
# end
# end

def delete(key : K) : Bool
@mutex.synchronize do
@data.delete(key) ? true : false
end
end
# def delete(key : K) : Bool
# @mutex.synchronize do
# @data.delete(key) ? true : false
# end
# end

def cleanup
@mutex.synchronize do
@data.reject! { |_, entry| entry.expired? }
end
end
# def cleanup
# @mutex.synchronize do
# @data.reject! { |_, entry| entry.expired? }
# end
# end

def clear
@mutex.synchronize do
@data.clear
end
end
# def clear
# @mutex.synchronize do
# @data.clear
# end
# end

def size : Int32
@mutex.synchronize do
@data.size
end
end
end
end
# def size : Int32
# @mutex.synchronize do
# @data.size
# end
# end
# end
# end
47 changes: 47 additions & 0 deletions src/lavinmq/auth/auth_chain.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
module LavinMQ
class AuthChain
@first_handler : AuthHandler?

def initialize
backends = Config.instance.auth_backends
if backends.empty?
add_handler(BasicAuthHandler.new)
else
# need to add initializers to these in order to only send in username & password in auth
backends.each do |backend|
case backend
when "oauth"
add_handler(OAuth2Handler.new)
when "http"
add_handler(HTTPAuthHandler.new)
when "basic"
add_handler(BasicAuthHandler.new)
else
raise "Unsupported authentication backend: #{backend}"
end
end
end
end

def add_handler(handler : AuthHandler)
pp "Adding handler #{handler}"
if first = @first_handler
current = first
while next_handler = current.@successor
current = next_handler
end
current.then(handler)
else
@first_handler = handler
end
self
end

def authenticate(username : String, password : String)
pp "hello #{username} #{password}"
pp @first_handler

@first_handler.try &.authenticate(username, password)
end
end
end
5 changes: 5 additions & 0 deletions src/lavinmq/auth/auth_handler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ module LavinMQ
@successor : AuthHandler?

def initialize(successor : AuthHandler? = nil)
#don't init this here, it is set in then and can be a property.
@successor = successor
end

def then(handler : AuthHandler) : AuthHandler
@successor = handler
end

def authenticate(username : String, password : String)
end
end
Expand Down
1 change: 1 addition & 0 deletions src/lavinmq/auth/handlers/basic_auth.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module LavinMQ
class BasicAuthHandler < LavinMQ::AuthHandler
def authenticate(username : String, password : String, users : UserStore, remote_address : Socket::Address)
user = users[username]
pp "USER: #{user}"
# TODO: do not do authentication check if the user is not in the userstore, instead pass directly to the next handler
return user if user && user.password && user.password.not_nil!.verify(password) &&
guest_only_loopback?(remote_address, user)
Expand Down
11 changes: 6 additions & 5 deletions src/lavinmq/auth/handlers/http_auth.cr
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
require "http/client"
require "json"
require "./auth_service"
require "../auth_handler"

module LavinMQ
class HttpAuthHandler < AuthHandler
class HTTPAuthHandler < AuthHandler
def authenticate(username : String, password : String)
payload = {
"username" => username,
"password" => password
}.to_json

success = ::HTTP::Client.post(@user_path,
user_path = Config.instance.http_auth_url || ""
success = ::HTTP::Client.post(user_path,
headers: ::HTTP::Headers{"Content-Type" => "application/json"},
body: payload).success?
if success
"allow"
puts "HTTP authentication successful"
else
return @successor ? @successor.try &.authenticate(username, password, ) : nil
try_next(username, password)
end
end
Expand Down
21 changes: 21 additions & 0 deletions src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ module LavinMQ
property default_consumer_prefetch = UInt16::MAX
property yield_each_received_bytes = 131_072 # max number of bytes to read from a client connection without letting other tasks in the server do any work
property yield_each_delivered_bytes = 1_048_576 # max number of bytes sent to a client without tending to other tasks in the server
property http_auth_url : String? = ""
property oauth_url : String? = ""
property auth_backends = [] of String
@@instance : Config = self.new

def self.instance : LavinMQ::Config
Expand Down Expand Up @@ -145,6 +148,12 @@ module LavinMQ
p.on("--default-consumer-prefetch=NUMBER", "Default consumer prefetch (default 65535)") do |v|
@default_consumer_prefetch = v.to_u16
end
# p.on("--http_auth_url=URL", "URL to authenticate HTTP clients") do |v|
# @http_auth_url = v
# end
# p.on("--oauth_url=URL", "URL to authenticate OAuth2 clients") do |v|
# @oauth_url = v
# end
p.invalid_option { |arg| abort "Invalid argument: #{arg}" }
end
parser.parse(ARGV.dup) # only parse args to get config_file
Expand Down Expand Up @@ -295,6 +304,18 @@ module LavinMQ
end
end

private def parse_auth(settings)
settings.each do |config, v|
case config
when "http" then @http_auth_url = v
when "oauth" then @oauth_url = v
when "auth_backends" then @auth_backends = v.split(",")
else
STDERR.puts "WARNING: Unrecognized configuration 'auth/#{config}'"
end
end
end

private def parse_experimental(settings)
settings.each do |config, v|
case config
Expand Down
4 changes: 3 additions & 1 deletion src/lavinmq/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require "./proxy_protocol"
require "./client/client"
require "./client/connection_factory"
require "./amqp/connection_factory"
require "./auth/auth_chain"
require "./stats"

module LavinMQ
Expand All @@ -37,6 +38,7 @@ module LavinMQ
@users = UserStore.new(@data_dir, @replicator)
@vhosts = VHostStore.new(@data_dir, @users, @replicator)
@parameters = ParameterStore(Parameter).new(@data_dir, "parameters.json", @replicator)
@auth_chain = LavinMQ::AuthChain.new
@amqp_connection_factory = LavinMQ::AMQP::ConnectionFactory.new
apply_parameter
spawn stats_loop, name: "Server#stats_loop"
Expand Down Expand Up @@ -245,7 +247,7 @@ module LavinMQ
end

def handle_connection(socket, connection_info)
client = @amqp_connection_factory.start(socket, connection_info, @vhosts, @users)
client = @amqp_connection_factory.start(socket, connection_info, @vhosts, @users, @auth_chain)
ensure
socket.close if client.nil?
end
Expand Down

0 comments on commit 354d8f0

Please sign in to comment.