From ea5570fbd410d267c8cf8b749f089c10a692be87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 30 Jan 2025 18:50:03 +0100 Subject: [PATCH 1/3] Make Exchange#bindings accept a block instead of returning iterator --- src/lavinmq/amqp/exchange/consistent_hash.cr | 6 ++-- src/lavinmq/amqp/exchange/default.cr | 6 ++-- src/lavinmq/amqp/exchange/direct.cr | 6 ++-- src/lavinmq/amqp/exchange/exchange.cr | 2 +- src/lavinmq/amqp/exchange/fanout.cr | 6 ++-- src/lavinmq/amqp/exchange/headers.cr | 31 +++++++++++++------- src/lavinmq/amqp/exchange/topic.cr | 24 ++++++++++----- src/lavinmq/mqtt/exchange.cr | 3 +- 8 files changed, 50 insertions(+), 34 deletions(-) diff --git a/src/lavinmq/amqp/exchange/consistent_hash.cr b/src/lavinmq/amqp/exchange/consistent_hash.cr index 8c856664d0..0fd85b0359 100644 --- a/src/lavinmq/amqp/exchange/consistent_hash.cr +++ b/src/lavinmq/amqp/exchange/consistent_hash.cr @@ -43,12 +43,10 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(Destination) + def bindings(routing_key : String, headers : AMQP::Table?, &) key = hash_key(routing_key, headers) if d = @hasher.get(key) - {d}.each - else - Iterator(Destination).empty + yield d end end diff --git a/src/lavinmq/amqp/exchange/default.cr b/src/lavinmq/amqp/exchange/default.cr index b39600d6bd..cb959eb575 100644 --- a/src/lavinmq/amqp/exchange/default.cr +++ b/src/lavinmq/amqp/exchange/default.cr @@ -11,11 +11,9 @@ module LavinMQ Iterator(BindingDetails).empty end - protected def bindings(routing_key, headers) : Iterator(LavinMQ::Destination) + protected def bindings(routing_key, headers, &) if q = @vhost.queues[routing_key]? - Tuple(LavinMQ::Destination).new(q).each - else - Iterator(LavinMQ::Destination).empty + yield q end end diff --git a/src/lavinmq/amqp/exchange/direct.cr b/src/lavinmq/amqp/exchange/direct.cr index 86c93a12d4..caa2b2c077 100644 --- a/src/lavinmq/amqp/exchange/direct.cr +++ b/src/lavinmq/amqp/exchange/direct.cr @@ -41,8 +41,10 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(Destination) - @bindings[routing_key].each + protected def bindings(routing_key, headers, &) + @bindings[routing_key].each do |destination| + yield destination + end end end end diff --git a/src/lavinmq/amqp/exchange/exchange.cr b/src/lavinmq/amqp/exchange/exchange.cr index 5f140d2d30..3809e4d9c7 100644 --- a/src/lavinmq/amqp/exchange/exchange.cr +++ b/src/lavinmq/amqp/exchange/exchange.cr @@ -232,7 +232,7 @@ module LavinMQ queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new, exchanges : Set(LavinMQ::Exchange) = Set(LavinMQ::Exchange).new) : Nil return unless exchanges.add? self - bindings(routing_key, headers).each do |d| + bindings(routing_key, headers) do |d| case d in LavinMQ::Queue queues.add(d) diff --git a/src/lavinmq/amqp/exchange/fanout.cr b/src/lavinmq/amqp/exchange/fanout.cr index 5ef9e9573f..e14f76f17c 100644 --- a/src/lavinmq/amqp/exchange/fanout.cr +++ b/src/lavinmq/amqp/exchange/fanout.cr @@ -33,8 +33,10 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(Destination) - @bindings.each + protected def bindings(routing_key, headers, &) + @bindings.each do |destination| + yield destination + end end end end diff --git a/src/lavinmq/amqp/exchange/headers.cr b/src/lavinmq/amqp/exchange/headers.cr index 22bbf22ee4..acf2f83174 100644 --- a/src/lavinmq/amqp/exchange/headers.cr +++ b/src/lavinmq/amqp/exchange/headers.cr @@ -51,8 +51,10 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(Destination) - matches(headers).each + protected def bindings(routing_key, headers, &) + matches(headers) do |destination| + yield destination + end end private def validate!(headers) : Nil @@ -65,19 +67,26 @@ module LavinMQ end end - private def matches(headers) : Iterator(Destination) - @bindings.each.select do |args, _| + private def matches(headers, &) + @bindings.each do |args, destinations| if headers.nil? || headers.empty? - args.empty? + next unless args.empty? + destinations.each do |destination| + yield destination + end else - case args["x-match"]? - when "any" - args.any? { |k, v| !k.starts_with?("x-") && (headers.has_key?(k) && headers[k] == v) } - else - args.all? { |k, v| k.starts_with?("x-") || (headers.has_key?(k) && headers[k] == v) } + is_match = case args["x-match"]? + when "any" + args.any? { |k, v| !k.starts_with?("x-") && (headers.has_key?(k) && headers[k] == v) } + else + args.all? { |k, v| k.starts_with?("x-") || (headers.has_key?(k) && headers[k] == v) } + end + next unless is_match + destinations.each do |destination| + yield destination end end - end.flat_map { |_, v| v.each } + end end end end diff --git a/src/lavinmq/amqp/exchange/topic.cr b/src/lavinmq/amqp/exchange/topic.cr index 0762e49171..6223618ec3 100644 --- a/src/lavinmq/amqp/exchange/topic.cr +++ b/src/lavinmq/amqp/exchange/topic.cr @@ -42,28 +42,32 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(LavinMQ::Destination) - select_matches(routing_key).each + protected def bindings(routing_key : String, headers : AMQP::Table?, &) + select_matches(routing_key) do |destination| + yield destination + end end # ameba:disable Metrics/CyclomaticComplexity - private def select_matches(routing_key) : Iterator(LavinMQ::Destination) + private def select_matches(routing_key, &) binding_keys = @bindings - return Iterator(LavinMQ::Destination).empty if binding_keys.empty? + return if binding_keys.empty? # optimize the case where the only binding key is '#' if binding_keys.size == 1 bk, qs = binding_keys.first if bk.size == 1 if bk.first == "#" - return qs.each + qs.each do |q| + yield q + end end end end rk_parts = routing_key.split(".") - binding_keys.each.select do |bks, _| + binding_keys.each do |bks, destinations| ok = false prev_hash = false size = bks.size # binding keys can max be 256 chars long anyway @@ -120,8 +124,12 @@ module LavinMQ break unless ok i += 1 end - ok - end.flat_map { |_, v| v.each } + if ok + destinations.each do |destination| + yield destination + end + end + end end end end diff --git a/src/lavinmq/mqtt/exchange.cr b/src/lavinmq/mqtt/exchange.cr index df0a04c4c4..ef88c8515c 100644 --- a/src/lavinmq/mqtt/exchange.cr +++ b/src/lavinmq/mqtt/exchange.cr @@ -85,8 +85,7 @@ module LavinMQ end # Only here to make superclass happy - protected def bindings(routing_key, headers) : Iterator(LavinMQ::Destination) - Iterator(LavinMQ::Destination).empty + protected def bindings(routing_key, headers, &) end def bind(destination : MQTT::Session, routing_key : String, headers = nil) : Bool From 5d851643e1f10966bb6a891ef3a601fb1dcebbd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 30 Jan 2025 19:15:54 +0100 Subject: [PATCH 2/3] Rename Exchange#bindings to Exchange#each_destination --- src/lavinmq/amqp/exchange/consistent_hash.cr | 2 +- src/lavinmq/amqp/exchange/default.cr | 2 +- src/lavinmq/amqp/exchange/direct.cr | 2 +- src/lavinmq/amqp/exchange/exchange.cr | 3 ++- src/lavinmq/amqp/exchange/fanout.cr | 2 +- src/lavinmq/amqp/exchange/headers.cr | 8 +------- src/lavinmq/amqp/exchange/topic.cr | 8 +------- 7 files changed, 8 insertions(+), 19 deletions(-) diff --git a/src/lavinmq/amqp/exchange/consistent_hash.cr b/src/lavinmq/amqp/exchange/consistent_hash.cr index 0fd85b0359..d610a37378 100644 --- a/src/lavinmq/amqp/exchange/consistent_hash.cr +++ b/src/lavinmq/amqp/exchange/consistent_hash.cr @@ -43,7 +43,7 @@ module LavinMQ true end - def bindings(routing_key : String, headers : AMQP::Table?, &) + def each_destination(routing_key : String, headers : AMQP::Table?, & : Destination ->) key = hash_key(routing_key, headers) if d = @hasher.get(key) yield d diff --git a/src/lavinmq/amqp/exchange/default.cr b/src/lavinmq/amqp/exchange/default.cr index cb959eb575..e0a8b5d1c5 100644 --- a/src/lavinmq/amqp/exchange/default.cr +++ b/src/lavinmq/amqp/exchange/default.cr @@ -11,7 +11,7 @@ module LavinMQ Iterator(BindingDetails).empty end - protected def bindings(routing_key, headers, &) + protected def each_destination(routing_key : String, _headers : AMQP::Table?, & : Destination ->) if q = @vhost.queues[routing_key]? yield q end diff --git a/src/lavinmq/amqp/exchange/direct.cr b/src/lavinmq/amqp/exchange/direct.cr index caa2b2c077..d80cd27f19 100644 --- a/src/lavinmq/amqp/exchange/direct.cr +++ b/src/lavinmq/amqp/exchange/direct.cr @@ -41,7 +41,7 @@ module LavinMQ true end - protected def bindings(routing_key, headers, &) + protected def each_destination(routing_key : String, _headers : AMQP::Table?, & : Destination ->) @bindings[routing_key].each do |destination| yield destination end diff --git a/src/lavinmq/amqp/exchange/exchange.cr b/src/lavinmq/amqp/exchange/exchange.cr index 3809e4d9c7..a8a8ba7995 100644 --- a/src/lavinmq/amqp/exchange/exchange.cr +++ b/src/lavinmq/amqp/exchange/exchange.cr @@ -184,6 +184,7 @@ module LavinMQ abstract def bind(destination : AMQP::Destination, routing_key : String, headers : AMQP::Table?) abstract def unbind(destination : AMQP::Destination, routing_key : String, headers : AMQP::Table?) abstract def bindings_details : Iterator(BindingDetails) + abstract def each_destination(routing_key : String, headers : AMQP::Table?, & : Destination ->) def publish(msg : Message, immediate : Bool, queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new, @@ -232,7 +233,7 @@ module LavinMQ queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new, exchanges : Set(LavinMQ::Exchange) = Set(LavinMQ::Exchange).new) : Nil return unless exchanges.add? self - bindings(routing_key, headers) do |d| + each_destination(routing_key, headers) do |d| case d in LavinMQ::Queue queues.add(d) diff --git a/src/lavinmq/amqp/exchange/fanout.cr b/src/lavinmq/amqp/exchange/fanout.cr index e14f76f17c..57fd7d7288 100644 --- a/src/lavinmq/amqp/exchange/fanout.cr +++ b/src/lavinmq/amqp/exchange/fanout.cr @@ -33,7 +33,7 @@ module LavinMQ true end - protected def bindings(routing_key, headers, &) + protected def each_destination(_routing_key : String, _headers : AMQP::Table?, & : Destination ->) @bindings.each do |destination| yield destination end diff --git a/src/lavinmq/amqp/exchange/headers.cr b/src/lavinmq/amqp/exchange/headers.cr index acf2f83174..0ab825cbdb 100644 --- a/src/lavinmq/amqp/exchange/headers.cr +++ b/src/lavinmq/amqp/exchange/headers.cr @@ -51,12 +51,6 @@ module LavinMQ true end - protected def bindings(routing_key, headers, &) - matches(headers) do |destination| - yield destination - end - end - private def validate!(headers) : Nil if h = headers if match = h["x-match"]? @@ -67,7 +61,7 @@ module LavinMQ end end - private def matches(headers, &) + protected def each_destination(_routing_key : String, headers : AMQP::Table?, & : Destination ->) @bindings.each do |args, destinations| if headers.nil? || headers.empty? next unless args.empty? diff --git a/src/lavinmq/amqp/exchange/topic.cr b/src/lavinmq/amqp/exchange/topic.cr index 6223618ec3..151975c8f9 100644 --- a/src/lavinmq/amqp/exchange/topic.cr +++ b/src/lavinmq/amqp/exchange/topic.cr @@ -42,14 +42,8 @@ module LavinMQ true end - protected def bindings(routing_key : String, headers : AMQP::Table?, &) - select_matches(routing_key) do |destination| - yield destination - end - end - # ameba:disable Metrics/CyclomaticComplexity - private def select_matches(routing_key, &) + protected def each_destination(routing_key : String, headers : AMQP::Table?, & : Destination ->) binding_keys = @bindings return if binding_keys.empty? From c1139d9aecf12e24e89001e1ce0696da5b9cc7d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 13 Feb 2025 15:25:12 +0100 Subject: [PATCH 3/3] Fixes after rebase --- src/lavinmq/amqp/exchange/consistent_hash.cr | 2 +- src/lavinmq/amqp/exchange/default.cr | 2 +- src/lavinmq/amqp/exchange/direct.cr | 2 +- src/lavinmq/amqp/exchange/exchange.cr | 2 +- src/lavinmq/amqp/exchange/fanout.cr | 2 +- src/lavinmq/amqp/exchange/headers.cr | 2 +- src/lavinmq/amqp/exchange/topic.cr | 6 +++--- src/lavinmq/mqtt/exchange.cr | 2 +- 8 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/lavinmq/amqp/exchange/consistent_hash.cr b/src/lavinmq/amqp/exchange/consistent_hash.cr index d610a37378..eb27c4a8b7 100644 --- a/src/lavinmq/amqp/exchange/consistent_hash.cr +++ b/src/lavinmq/amqp/exchange/consistent_hash.cr @@ -43,7 +43,7 @@ module LavinMQ true end - def each_destination(routing_key : String, headers : AMQP::Table?, & : Destination ->) + def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) key = hash_key(routing_key, headers) if d = @hasher.get(key) yield d diff --git a/src/lavinmq/amqp/exchange/default.cr b/src/lavinmq/amqp/exchange/default.cr index e0a8b5d1c5..81936b6005 100644 --- a/src/lavinmq/amqp/exchange/default.cr +++ b/src/lavinmq/amqp/exchange/default.cr @@ -11,7 +11,7 @@ module LavinMQ Iterator(BindingDetails).empty end - protected def each_destination(routing_key : String, _headers : AMQP::Table?, & : Destination ->) + protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) if q = @vhost.queues[routing_key]? yield q end diff --git a/src/lavinmq/amqp/exchange/direct.cr b/src/lavinmq/amqp/exchange/direct.cr index d80cd27f19..c00b043e16 100644 --- a/src/lavinmq/amqp/exchange/direct.cr +++ b/src/lavinmq/amqp/exchange/direct.cr @@ -41,7 +41,7 @@ module LavinMQ true end - protected def each_destination(routing_key : String, _headers : AMQP::Table?, & : Destination ->) + protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) @bindings[routing_key].each do |destination| yield destination end diff --git a/src/lavinmq/amqp/exchange/exchange.cr b/src/lavinmq/amqp/exchange/exchange.cr index a8a8ba7995..c5acf8d5a8 100644 --- a/src/lavinmq/amqp/exchange/exchange.cr +++ b/src/lavinmq/amqp/exchange/exchange.cr @@ -184,7 +184,7 @@ module LavinMQ abstract def bind(destination : AMQP::Destination, routing_key : String, headers : AMQP::Table?) abstract def unbind(destination : AMQP::Destination, routing_key : String, headers : AMQP::Table?) abstract def bindings_details : Iterator(BindingDetails) - abstract def each_destination(routing_key : String, headers : AMQP::Table?, & : Destination ->) + abstract def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) def publish(msg : Message, immediate : Bool, queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new, diff --git a/src/lavinmq/amqp/exchange/fanout.cr b/src/lavinmq/amqp/exchange/fanout.cr index 57fd7d7288..73ca9a5cd1 100644 --- a/src/lavinmq/amqp/exchange/fanout.cr +++ b/src/lavinmq/amqp/exchange/fanout.cr @@ -33,7 +33,7 @@ module LavinMQ true end - protected def each_destination(_routing_key : String, _headers : AMQP::Table?, & : Destination ->) + protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) @bindings.each do |destination| yield destination end diff --git a/src/lavinmq/amqp/exchange/headers.cr b/src/lavinmq/amqp/exchange/headers.cr index 0ab825cbdb..1cf11e04e7 100644 --- a/src/lavinmq/amqp/exchange/headers.cr +++ b/src/lavinmq/amqp/exchange/headers.cr @@ -61,7 +61,7 @@ module LavinMQ end end - protected def each_destination(_routing_key : String, headers : AMQP::Table?, & : Destination ->) + protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) @bindings.each do |args, destinations| if headers.nil? || headers.empty? next unless args.empty? diff --git a/src/lavinmq/amqp/exchange/topic.cr b/src/lavinmq/amqp/exchange/topic.cr index 151975c8f9..40bbfb75ad 100644 --- a/src/lavinmq/amqp/exchange/topic.cr +++ b/src/lavinmq/amqp/exchange/topic.cr @@ -3,8 +3,8 @@ require "./exchange" module LavinMQ module AMQP class TopicExchange < Exchange - @bindings = Hash(Array(String), Set(LavinMQ::Destination)).new do |h, k| - h[k] = Set(LavinMQ::Destination).new + @bindings = Hash(Array(String), Set(AMQP::Destination)).new do |h, k| + h[k] = Set(AMQP::Destination).new end def type : String @@ -43,7 +43,7 @@ module LavinMQ end # ameba:disable Metrics/CyclomaticComplexity - protected def each_destination(routing_key : String, headers : AMQP::Table?, & : Destination ->) + protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) binding_keys = @bindings return if binding_keys.empty? diff --git a/src/lavinmq/mqtt/exchange.cr b/src/lavinmq/mqtt/exchange.cr index ef88c8515c..8213a1dc8c 100644 --- a/src/lavinmq/mqtt/exchange.cr +++ b/src/lavinmq/mqtt/exchange.cr @@ -85,7 +85,7 @@ module LavinMQ end # Only here to make superclass happy - protected def bindings(routing_key, headers, &) + protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) end def bind(destination : MQTT::Session, routing_key : String, headers = nil) : Bool