diff --git a/shard.lock b/shard.lock index 182e0db6a9..5f5a121e91 100644 --- a/shard.lock +++ b/shard.lock @@ -12,10 +12,22 @@ shards: git: https://github.com/cloudamqp/amqp-client.cr.git version: 1.2.8 + bindata: + git: https://github.com/spider-gazelle/bindata.git + version: 2.1.0 + + jwt: + git: https://github.com/crystal-community/jwt.git + version: 1.6.1 + lz4: git: https://github.com/84codes/lz4.cr.git version: 1.0.0+git.commit.96d714f7593c66ca7425872fd26c7b1286806d3d + openssl_ext: + git: https://github.com/spider-gazelle/openssl_ext.git + version: 2.4.4 + systemd: git: https://github.com/84codes/systemd.cr.git version: 2.0.0 diff --git a/shard.yml b/shard.yml index 6e2eb48079..a0c00f52f7 100644 --- a/shard.yml +++ b/shard.yml @@ -32,6 +32,8 @@ dependencies: github: 84codes/systemd.cr lz4: github: 84codes/lz4.cr + jwt: + github: crystal-community/jwt development_dependencies: ameba: diff --git a/spec/auth_sepc.cr b/spec/auth_sepc.cr new file mode 100644 index 0000000000..0de0c5ab97 --- /dev/null +++ b/spec/auth_sepc.cr @@ -0,0 +1,6 @@ +require "./spec_helper" + +describe LavinMQ::AuthHandler do + + +end diff --git a/spec/cache_spec.cr b/spec/cache_spec.cr new file mode 100644 index 0000000000..42385caeb4 --- /dev/null +++ b/spec/cache_spec.cr @@ -0,0 +1,36 @@ +# require "./spec_helper" + +# describe LavinMQ::Cache do +# cache = LavinMQ::Cache(String, String).new(1.seconds) + +# it "set key" do +# cache.set("key1", "allow").should eq "allow" +# end + +# it "get key" do +# cache.set("keyget", "deny") +# cache.get?("keyget").should eq "deny" +# end + +# it "invalid cache after 10 second" do +# cache.set("keyinvalid", "expired") +# sleep(2.seconds) +# cache.get?("keyinvalid").should be_nil +# end + +# it "delete key" do +# cache.set("keydelete", "deleted") +# cache.delete("keydelete") +# cache.get?("keydelete").should be_nil +# end + +# it "cleanup expired entry" do +# cache.set("clean1", "expired1") +# cache.set("clean2", "expired2") +# cache.set("clean3", "valid", 10.seconds) +# sleep(2.seconds) +# cache.get?("clean1").should be_nil +# cache.get?("clean2").should be_nil +# cache.get?("clean3").should eq "valid" +# end +# end diff --git a/src/lavinmq/amqp/connection_factory.cr b/src/lavinmq/amqp/connection_factory.cr index a9774da4ae..b7300507ab 100644 --- a/src/lavinmq/amqp/connection_factory.cr +++ b/src/lavinmq/amqp/connection_factory.cr @@ -2,20 +2,23 @@ require "../version" require "../logger" require "./client" require "../client/connection_factory" +require "../auth/handlers/basic_auth" +require "../auth/handlers/oauth2" +require "../auth/handlers/http" module LavinMQ module AMQP class ConnectionFactory < LavinMQ::ConnectionFactory Log = LavinMQ::Log.for "amqp.connection_factory" - def start(socket, connection_info, vhosts, users) : Client? + def start(socket, connection_info, vhosts, users, auth_chain) : Client? remote_address = connection_info.src socket.read_timeout = 15.seconds metadata = ::Log::Metadata.build({address: remote_address.to_s}) logger = Logger.new(Log, metadata) if confirm_header(socket, logger) if start_ok = start(socket, logger) - if user = authenticate(socket, remote_address, users, start_ok, logger) + if user = authenticate(socket, remote_address, users, start_ok, logger, auth_chain) if tune_ok = tune(socket, logger) if vhost = open(socket, vhosts, user, logger) socket.read_timeout = heartbeat_timeout(tune_ok) @@ -47,7 +50,7 @@ module LavinMQ elsif proto != AMQP::PROTOCOL_START_0_9_1 && proto != AMQP::PROTOCOL_START_0_9 socket.write AMQP::PROTOCOL_START_0_9_1.to_slice socket.flush - log.warn { "Unexpected protocol #{String.new(proto.to_unsafe, count).inspect}, closing socket" } + log.warn { "Unexpected protocol '#{String.new(proto.to_slice)}', closing socket" } false else true @@ -100,17 +103,10 @@ module LavinMQ end end - def authenticate(socket, remote_address, users, start_ok, log) + def authenticate(socket, remote_address, users, start_ok, log, auth_chain) username, password = credentials(start_ok) user = users[username]? - return user if user && user.password && user.password.not_nil!.verify(password) && - guest_only_loopback?(remote_address, user) - - if user.nil? - log.warn { "User \"#{username}\" not found" } - else - log.warn { "Authentication failure for user \"#{username}\"" } - end + return user if user && auth_chain.authenticate(username, password) && guest_only_loopback?(remote_address, user) props = start_ok.client_properties if capabilities = props["capabilities"]?.try &.as?(AMQP::Table) if capabilities["authentication_failure_close"]?.try &.as?(Bool) diff --git a/src/lavinmq/auth/auth_cache.cr b/src/lavinmq/auth/auth_cache.cr new file mode 100644 index 0000000000..0a71690a7f --- /dev/null +++ b/src/lavinmq/auth/auth_cache.cr @@ -0,0 +1,66 @@ +# module LavinMQ +# class CacheEntry(T) +# getter value : T +# getter expires_at : Time + +# def initialize(@value : T, ttl : Time::Span) +# @expires_at = Time.utc + ttl +# end + +# def expired? : Bool +# Time.utc > @expires_at +# end +# end + +# class Cache(K, V) +# def initialize(@default_ttl : Time::Span = 1.hour) +# @mutex = Mutex.new +# @data = Hash(K, CacheEntry(V)).new +# end + +# def set(key : K, value : V, ttl : Time::Span = @default_ttl) : V +# @mutex.synchronize do +# @data[key] = CacheEntry.new(value, ttl) +# value +# end +# end + +# def get?(key : K) : V? +# @mutex.synchronize do +# entry = @data[key]? +# return nil unless entry + +# if entry.expired? +# @data.delete(key) +# nil +# else +# entry.value +# end +# end +# end + +# def delete(key : K) : Bool +# @mutex.synchronize do +# @data.delete(key) ? true : false +# end +# end + +# def cleanup +# @mutex.synchronize do +# @data.reject! { |_, entry| entry.expired? } +# end +# end + +# def clear +# @mutex.synchronize do +# @data.clear +# end +# end + +# def size : Int32 +# @mutex.synchronize do +# @data.size +# end +# end +# end +# end diff --git a/src/lavinmq/auth/auth_chain.cr b/src/lavinmq/auth/auth_chain.cr new file mode 100644 index 0000000000..fa72f3bc41 --- /dev/null +++ b/src/lavinmq/auth/auth_chain.cr @@ -0,0 +1,49 @@ +require "./auth_cache" + +module LavinMQ + class AuthChain + @first_handler : AuthHandler? + + def initialize(users : UserStore) + backends = Config.instance.auth_backends + if backends.nil? || backends.size == 0 + add_handler(BasicAuthHandler.new(users)) + else + # TODO: gather config for http and oauth and send into handlers + backends.each do |backend| + case backend + when "oauth" + add_handler(OAuth2Handler.new(users)) + when "http" + add_handler(HTTPAuthHandler.new(users)) + when "basic" + add_handler(BasicAuthHandler.new(users)) + else + raise "Unsupported authentication backend: #{backend}" + end + end + end + end + + def add_handler(handler : AuthHandler) + if first = @first_handler + current = first + while next_handler = current.@successor + current = next_handler + end + current.set_successor(handler) + else + @first_handler = handler + end + self + end + + def authenticate(username : String, password : String) + # TODO: Cache the authorized users, and call authenticate from cache class + # if authorized = @auth_cache.get?(username) + # return authorized + # end + @first_handler.try &.authenticate(username, password) + end + end +end diff --git a/src/lavinmq/auth/auth_handler.cr b/src/lavinmq/auth/auth_handler.cr new file mode 100644 index 0000000000..66a2d590d6 --- /dev/null +++ b/src/lavinmq/auth/auth_handler.cr @@ -0,0 +1,22 @@ +module LavinMQ + abstract class AuthHandler + Log = LavinMQ::Log.for "auth.handler" + property successor : AuthHandler? + @log = Logger.new(Log) + + abstract def authenticate(username : String, password : String) + + def set_successor(service : AuthHandler) : AuthHandler + @successor = service + service + end + + def try_next(username : String, password : String) + if successor = @successor + successor.authenticate(username, password) + else + nil + end + end + end +end diff --git a/src/lavinmq/auth/handlers/basic_auth.cr b/src/lavinmq/auth/handlers/basic_auth.cr new file mode 100644 index 0000000000..cd4cdecc6b --- /dev/null +++ b/src/lavinmq/auth/handlers/basic_auth.cr @@ -0,0 +1,16 @@ +require "../auth_handler" +require "../../server" + +module LavinMQ + class BasicAuthHandler < LavinMQ::AuthHandler + def initialize(@users : UserStore) + end + + def authenticate(username : String, password : String) + user = @users[username] + return user if user && user.password && user.password.not_nil!.verify(password) + @log.warn { "Basic authentication failed" } + try_next(username, password) + end + end +end diff --git a/src/lavinmq/auth/handlers/http.cr b/src/lavinmq/auth/handlers/http.cr new file mode 100644 index 0000000000..50ae2490c9 --- /dev/null +++ b/src/lavinmq/auth/handlers/http.cr @@ -0,0 +1,21 @@ +require "http/client" +require "json" +require "../auth_handler" + +module LavinMQ + class HTTPAuthHandler < AuthHandler + def initialize(@users : UserStore) + end + + def authenticate(username : String, password : String) + # TODO: implement the HTTP authentication logic and permissions parser here + if password.starts_with?("http") + @log.warn { "HTTP authentication successful" } + return @users[username] + else + @log.warn { "HTTP authentication failed" } + return try_next(username, password) + end + end + end +end diff --git a/src/lavinmq/auth/handlers/oauth2.cr b/src/lavinmq/auth/handlers/oauth2.cr new file mode 100644 index 0000000000..bcfc9f9321 --- /dev/null +++ b/src/lavinmq/auth/handlers/oauth2.cr @@ -0,0 +1,47 @@ +require "../auth_handler" +require "jwt" +require "../../config" +require "http/client" + + +module LavinMQ + class OAuth2Handler < LavinMQ::AuthHandler + def initialize(@users : UserStore) + end + + # Temporary for tests + @token : String = LavinMQ::Config.instance.token + @public_key : String = LavinMQ::Config.instance.public_key + + + def authenticate(username : String, password : String) + begin + fetch_jwks_token + payload, header = JWT.decode(@token, key: @public_key, algorithm: JWT::Algorithm::RS256, verify: true) + + pp payload + pp header + oauth_user + rescue ex : JWT::DecodeError + @log.warn { "OAuth2 authentication failed, could not decode token: #{ex}" } + try_next(username, password) + rescue ex : JWT::UnsupportedAlgorithmError + @log.warn { "OAuth2 authentication failed, unsupported algortihm: #{ex}" } + try_next(username, password) + rescue ex + @log.warn { "OAuth2 authentication failed: #{ex}" } + try_next(username, password) + end + end + + private def fetch_jwks_token + end + + def oauth_user + # Discuss ow to do this? + # TODO: Create a uset that will be deleted when it disconnects, but also cannot be authorised with basic auth. + # introduce the needed configs for validation, and parse the payload to get the user details + user = @users.create("oauth_user", "password") + end + end +end diff --git a/src/lavinmq/config.cr b/src/lavinmq/config.cr index 0f2a347d63..fb8b0a622f 100644 --- a/src/lavinmq/config.cr +++ b/src/lavinmq/config.cr @@ -62,6 +62,31 @@ module LavinMQ property default_consumer_prefetch = UInt16::MAX property yield_each_received_bytes = 131_072 # max number of bytes to read from a client connection without letting other tasks in the server do any work property yield_each_delivered_bytes = 1_048_576 # max number of bytes sent to a client without tending to other tasks in the server + property auth_cache_ttl = 1.hour + property jwks_uri : String = "https://demos-test.criipto.id/.well-known/jwks" + property iss : String? = "" + property aud : String? = "" + property sub : String? = "" + property algorithm : JWT::Algorithm? = JWT::Algorithm::RS256 + property token_expiration_tolerance : Int32 = 60 + property token_cache_duration : Int32 = 60 + property auth_backends : Array(String)? = ["basic", "oauth", "http"] + + # ---- FOR TESTING PURPOSES ONLY ---- + #this will be fetched from an jwks endpoint + property public_key = "-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAi63Nh2tY0KZOvy1wEknC +1iUC75g+kuAueaph4TH4BXOdIspCmM6z47G5aCEMY6esTdq/skR9LwgwF6jHkwsj +PPE0wBv8AFprD8ib2u4VIdm4Sy94wruZnDVzE0YcIadptp9MD2sFLHmwF3wJ5rmw +CSWRBWqcpFCYha40C2qHokudzMusHV2AMQHzuAnk0WxgO+OCtyHzPBRq4DbuGSBM +9vqP0mvPCtM3pWnTO0LIJzUwbhNd3bWSKe3ItlhfLu9GXaZqYYwhw9hjvlkmEZsR +aB+LOn//FBtJhDmrrA/zmHwA39oALdynhU6BCXzEG/z/4JdA4gC7Ad64dVuN+bHQ +uQIDAQAB +-----END PUBLIC KEY----- +" + # this will come from the connect packet + property token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6Im9hdXRoIiwicGVybWlzc2lvbnMiOnsidXNlciI6InUxIiwidmhvc3QiOiIvIiwiY29uZmlndXJlIjoiYyIsIndyaXRlIjoidyIsInJlYWQiOiJyIn0sImlhdCI6MTczNzExNjI1NSwiZXhwIjoxNzM3MTE5ODU1fQ.POlh6o99cDQgfliDpOWAS-BNvTXtWI1myp7sVA9Y25lqlUCx4M5LkA1wuPSlENTU43bi2aAGOaSyH_dHRF2XVsMIn9t2UX735PMCgWSciv0pypAH56ake1kLkbM-JnzcHyAtbAo3sK5rzDtvI2Gj23jtSn8LkiSASa1Xs3DLQVGwDYeBgQfdYr5fjhBHTK8wv8KYW1cHH0A_s-oUeCDI0ps-rNNGTOyBqn55WDAfs_eOCJ3TeLymntndBf6ySdFumi2y04N7MVBAAngtKo6c7ej-J_MoOdwwm9UXNyIgsQogiVtr9QuM4a1fNPaj5T2PC-bqg9Jlce7T2EW7JWNs6Q" + # ---- FOR TESTING PURPOSES ONLY ---- @@instance : Config = self.new def self.instance : LavinMQ::Config @@ -145,6 +170,12 @@ module LavinMQ p.on("--default-consumer-prefetch=NUMBER", "Default consumer prefetch (default 65535)") do |v| @default_consumer_prefetch = v.to_u16 end + # p.on("--http_auth_url=URL", "URL to authenticate HTTP clients") do |v| + # @http_auth_url = v + # end + # p.on("--oauth_url=URL", "URL to authenticate OAuth2 clients") do |v| + # @oauth_url = v + # end p.invalid_option { |arg| abort "Invalid argument: #{arg}" } end parser.parse(ARGV.dup) # only parse args to get config_file @@ -295,6 +326,18 @@ module LavinMQ end end + private def parse_auth(settings) + settings.each do |config, v| + case config + when "http" then @http_auth_url = v + when "oauth" then @oauth_url = v + when "auth_backends" then @auth_backends = v.split(",") + else + STDERR.puts "WARNING: Unrecognized configuration 'auth/#{config}'" + end + end + end + private def parse_experimental(settings) settings.each do |config, v| case config diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index 4d9e39cfb4..675fce321e 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -15,6 +15,7 @@ require "./proxy_protocol" require "./client/client" require "./client/connection_factory" require "./amqp/connection_factory" +require "./auth/auth_chain" require "./stats" module LavinMQ @@ -37,6 +38,7 @@ module LavinMQ @users = UserStore.new(@data_dir, @replicator) @vhosts = VHostStore.new(@data_dir, @users, @replicator) @parameters = ParameterStore(Parameter).new(@data_dir, "parameters.json", @replicator) + @auth_chain = LavinMQ::AuthChain.new(@users) @amqp_connection_factory = LavinMQ::AMQP::ConnectionFactory.new apply_parameter spawn stats_loop, name: "Server#stats_loop" @@ -245,7 +247,7 @@ module LavinMQ end def handle_connection(socket, connection_info) - client = @amqp_connection_factory.start(socket, connection_info, @vhosts, @users) + client = @amqp_connection_factory.start(socket, connection_info, @vhosts, @users, @auth_chain) ensure socket.close if client.nil? end diff --git a/src/lavinmq/user.cr b/src/lavinmq/user.cr index 115e39d9a6..a74235f2ca 100644 --- a/src/lavinmq/user.cr +++ b/src/lavinmq/user.cr @@ -3,6 +3,7 @@ require "./password" require "./sortable_json" module LavinMQ + # needs to be extracted to own file (steg 1) enum Tag Administrator Monitoring @@ -15,6 +16,7 @@ module LavinMQ end end + # maybe have multiple types of users, let User be absracts and inherit class User include SortableJSON getter name, password, permissions