Skip to content

Commit

Permalink
Iterator(Binding) instead of Iterator(Destination)
Browse files Browse the repository at this point in the history
  • Loading branch information
snichme committed Oct 10, 2024
1 parent 27f4268 commit 0e7cd21
Show file tree
Hide file tree
Showing 16 changed files with 123 additions and 110 deletions.
4 changes: 2 additions & 2 deletions spec/api/bindings_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ describe LavinMQ::HTTP::BindingsController do
response = http.post("/api/bindings/%2f/e/be1/q/bindings_q1", body: body)
response.status_code.should eq 201
response.headers["Location"].should eq "bindings_q1/rk"
s.vhosts["/"].exchanges["be1"].bindings_details.first.routing_key.should eq "rk"
s.vhosts["/"].exchanges["be1"].bindings.first.routing_key.should eq "rk"
end
end

Expand Down Expand Up @@ -127,7 +127,7 @@ describe LavinMQ::HTTP::BindingsController do
props = binding[0]["properties_key"].as_s
response = http.delete("/api/bindings/%2f/e/be1/q/bindings_q1/#{props}")
response.status_code.should eq 204
s.vhosts["/"].exchanges["be1"].bindings_details.empty?.should be_true
s.vhosts["/"].exchanges["be1"].bindings.empty?.should be_true
end
end
end
Expand Down
64 changes: 32 additions & 32 deletions spec/message_routing_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ require "./spec_helper"
module LavinMQ
class Exchange
# Monkey patch for backward compability and easier testing
def matches(routing_key, headers = nil) : Set(Queue | Exchange)
def spec_matches(routing_key, headers = nil) : Set(Queue | Exchange)
s = Set(Queue | Exchange).new
qs = Set(Queue).new
es = Set(Exchange).new
Expand Down Expand Up @@ -72,49 +72,49 @@ describe LavinMQ::TopicExchange do
it "matches prefixed star-wildcard" do
q1 = LavinMQ::Queue.new(vhost, "q1")
x.bind(q1, "*.test")
x.matches("rk2.test").should eq(Set{q1})
x.spec_matches("rk2.test").should eq(Set{q1})
x.unbind(q1, "*.test")
end

it "matches exact rk" do
q1 = LavinMQ::Queue.new(vhost, "q1")
x.bind(q1, "rk1")
x.matches("rk1", nil).should eq(Set{q1})
x.spec_matches("rk1", nil).should eq(Set{q1})
x.unbind(q1, "rk1")
end

it "matches star-wildcards" do
q2 = LavinMQ::Queue.new(vhost, "q2")
x.bind(q2, "*")
x.matches("rk2").should eq(Set{q2})
x.spec_matches("rk2").should eq(Set{q2})
x.unbind(q2, "*")
end

it "matches star-wildcards but not too much" do
q22 = LavinMQ::Queue.new(vhost, "q22")
x.bind(q22, "*")
x.matches("rk2.a").should be_empty
x.spec_matches("rk2.a").should be_empty
x.unbind(q22, "*")
end

it "should not match with too many star-wildcards" do
q3 = LavinMQ::Queue.new(vhost, "q3")
x.bind(q3, "a.*")
x.matches("b.c").should be_empty
x.spec_matches("b.c").should be_empty
x.unbind(q3, "a.*")
end

it "should match star-wildcards in the middle" do
q4 = LavinMQ::Queue.new(vhost, "q4")
x.bind(q4, "c.*.d")
x.matches("c.a.d").should eq(Set{q4})
x.spec_matches("c.a.d").should eq(Set{q4})
x.unbind(q4, "c.*.d")
end

it "should match catch-all" do
q5 = LavinMQ::Queue.new(vhost, "q5")
x.bind(q5, "d.#")
x.matches("d.a.d").should eq(Set{q5})
x.spec_matches("d.a.d").should eq(Set{q5})
x.unbind(q5, "d.#")
end

Expand All @@ -124,7 +124,7 @@ describe LavinMQ::TopicExchange do
ex = LavinMQ::TopicExchange.new(vhost, "t55", false, false, true)
ex.bind(q6, "rk")
ex.bind(q7, "rk")
ex.matches("rk").should eq(Set{q6, q7})
ex.spec_matches("rk").should eq(Set{q6, q7})
ex.unbind(q6, "rk")
ex.unbind(q7, "rk")
end
Expand All @@ -133,57 +133,57 @@ describe LavinMQ::TopicExchange do
q8 = LavinMQ::Queue.new(vhost, "q63")
ex = LavinMQ::TopicExchange.new(vhost, "t63", false, false, true)
ex.bind(q8, "rk63.rk63")
ex.matches("rk63").should be_empty
ex.spec_matches("rk63").should be_empty
ex.unbind(q8, "rk63.rk63")
end

it "# should consider what's comes after" do
q9 = LavinMQ::Queue.new(vhost, "q9")
x.bind(q9, "#.a")
x.matches("a.a.b").should be_empty
x.matches("a.a.a").should eq(Set{q9})
x.spec_matches("a.a.b").should be_empty
x.spec_matches("a.a.a").should eq(Set{q9})
x.unbind(q9, "#.a")
end

it "# can be followed by *" do
q0 = LavinMQ::Queue.new(vhost, "q0")
x.bind(q0, "#.*.d")
x.matches("a.d.a").should be_empty
x.matches("a.a.d").should eq(Set{q0})
x.spec_matches("a.d.a").should be_empty
x.spec_matches("a.a.d").should eq(Set{q0})
x.unbind(q0, "#.*.d")
end

it "can handle multiple #" do
q11 = LavinMQ::Queue.new(vhost, "q11")
x.bind(q11, "#.a.#")
x.matches("a.b.a").should be_empty
x.matches("b.b.a.b.b").should eq(Set{q11})
x.spec_matches("a.b.a").should be_empty
x.spec_matches("b.b.a.b.b").should eq(Set{q11})
x.unbind(q11, "#.a.#")
end

it "should match double star-wildcards" do
q12 = LavinMQ::Queue.new(vhost, "q12")
x.bind(q12, "c.*.*")
x.matches("c.a.d").should eq(Set{q12})
x.spec_matches("c.a.d").should eq(Set{q12})
x.unbind(q12, "c.*.*")
end

it "should match triple star-wildcards" do
q13 = LavinMQ::Queue.new(vhost, "q13")
x.bind(q13, "c.*.*.*")
x.matches("c.a.d.e").should eq(Set{q13})
x.spec_matches("c.a.d.e").should eq(Set{q13})
x.unbind(q13, "c.*.*.*")
end

it "can differentiate a.b.c from a.b" do
q = LavinMQ::Queue.new(vhost, "")
x.bind(q, "a.b.c")
x.matches("a.b.c").should eq(Set{q})
x.matches("a.b").should be_empty
x.spec_matches("a.b.c").should eq(Set{q})
x.spec_matches("a.b").should be_empty
x.unbind(q, "a.b.c")
x.bind(q, "a.b")
x.matches("a.b").should eq(Set{q})
x.matches("a.b.c").should be_empty
x.spec_matches("a.b").should eq(Set{q})
x.spec_matches("a.b.c").should be_empty
end
end
end
Expand Down Expand Up @@ -214,7 +214,7 @@ describe LavinMQ::HeadersExchange do
x = LavinMQ::HeadersExchange.new(vhost, "h", false, false, true)
q6 = LavinMQ::Queue.new(vhost, "q6")
x.bind(q6, "", hdrs_all)
x.matches("", hdrs_all).should eq(Set{q6})
x.spec_matches("", hdrs_all).should eq(Set{q6})
end

it "should not match if not all args are the same" do
Expand All @@ -224,7 +224,7 @@ describe LavinMQ::HeadersExchange do
msg_hdrs = hdrs_all.dup
msg_hdrs.delete "x-match"
msg_hdrs["org"] = "google"
x.matches("", msg_hdrs).size.should eq 0
x.spec_matches("", msg_hdrs).size.should eq 0
end
end

Expand All @@ -236,7 +236,7 @@ describe LavinMQ::HeadersExchange do
msg_hdrs = hdrs_any.dup
msg_hdrs.delete "x-match"
msg_hdrs["org"] = "google"
x.matches("", msg_hdrs).should eq(Set{q8})
x.spec_matches("", msg_hdrs).should eq(Set{q8})
end

it "should not match if no args are the same" do
Expand All @@ -247,7 +247,7 @@ describe LavinMQ::HeadersExchange do
msg_hdrs.delete "x-match"
msg_hdrs["org"] = "google"
msg_hdrs["user"] = "hest"
x.matches("", msg_hdrs).size.should eq 0
x.spec_matches("", msg_hdrs).size.should eq 0
end

it "should match nestled amq-protocol tables" do
Expand All @@ -260,7 +260,7 @@ describe LavinMQ::HeadersExchange do
x.bind(q10, "", bind_hdrs) # to_h because that's what's done in VHost
msg_hdrs = bind_hdrs.clone
msg_hdrs.delete("x-match")
x.matches("", msg_hdrs).size.should eq 1
x.spec_matches("", msg_hdrs).size.should eq 1
end
end

Expand All @@ -274,8 +274,8 @@ describe LavinMQ::HeadersExchange do
x.bind(q10, "", hdrs2)
hdrs1.delete "x-match"
hdrs2.delete "x-match"
x.matches("", hdrs1).should eq Set{q10}
x.matches("", hdrs2).should eq Set{q10}
x.spec_matches("", hdrs1).should eq Set{q10}
x.spec_matches("", hdrs2).should eq Set{q10}
end

it "should handle all Field types" do
Expand All @@ -292,7 +292,7 @@ describe LavinMQ::HeadersExchange do
"x-match" => "all",
})
x.bind(q11, "", hdrs)
x.matches("", hdrs).should eq Set{q11}
x.spec_matches("", hdrs).should eq Set{q11}
end

it "should handle unbind" do
Expand All @@ -306,15 +306,15 @@ describe LavinMQ::HeadersExchange do
})
x.bind(q12, "", hdrs1)
x.unbind(q12, "", hdrs2)
x.matches("", hdrs1).size.should eq 0
x.spec_matches("", hdrs1).size.should eq 0
end

describe "match empty" do
it "should match if both args and headers are empty" do
x = LavinMQ::HeadersExchange.new(vhost, "h", false, false, true)
q13 = LavinMQ::Queue.new(vhost, "q13")
x.bind(q13, "", nil)
x.matches("", nil).size.should eq 1
x.spec_matches("", nil).size.should eq 1
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/vhost_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ describe LavinMQ::VHost do
v.declare_queue("q", true, false)
s.vhosts["test"].bind_queue("q", "e", "q")
s.restart
s.vhosts["test"].exchanges["e"].bindings_details.first.destination.name.should eq "q"
s.vhosts["test"].exchanges["e"].bindings.first.destination.name.should eq "q"
end
end

Expand Down
15 changes: 8 additions & 7 deletions src/lavinmq/exchange/consistent_hash.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ module LavinMQ
"x-consistent-hash"
end

def bindings_details : Iterator(BindingDetails)
def bindings : Iterator(Binding)
@bindings.each.map do |d, w|
binding_key = BindingKey.new(w, @arguments)
BindingDetails.new(name, vhost.name, binding_key, d)
Binding.new(name, vhost.name, binding_key, d)
end
end

Expand All @@ -23,7 +23,7 @@ module LavinMQ
w = weight(routing_key)
@hasher.add(destination.name, w, destination)
binding_key = BindingKey.new(routing_key, @arguments)
data = BindingDetails.new(name, vhost.name, binding_key, destination)
data = Binding.new(name, vhost.name, binding_key, destination)
notify_observers(ExchangeEvent::Bind, data)
true
end
Expand All @@ -34,19 +34,20 @@ module LavinMQ
@hasher.remove(destination.name, w)

binding_key = BindingKey.new(routing_key, @arguments)
data = BindingDetails.new(name, vhost.name, binding_key, destination)
data = Binding.new(name, vhost.name, binding_key, destination)
notify_observers(ExchangeEvent::Unbind, data)

delete if @auto_delete && @bindings.empty?
true
end

protected def bindings(routing_key, headers) : Iterator(Destination)
protected def matches(routing_key, headers) : Iterator(Binding)
key = hash_key(routing_key, headers)
if d = @hasher.get(key)
{d}.each
binding_key = BindingKey.new(key, @arguments)
{Binding.new(name, vhost.name, binding_key, d)}.each
else
Iterator(Destination).empty
Iterator(Binding).empty
end
end

Expand Down
11 changes: 6 additions & 5 deletions src/lavinmq/exchange/default.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ module LavinMQ
"direct"
end

def bindings_details : Iterator(BindingDetails)
Iterator(BindingDetails).empty
def bindings : Iterator(Binding)
Iterator(Binding).empty
end

protected def bindings(routing_key, headers) : Iterator(Destination)
protected def matches(routing_key, headers) : Iterator(Binding)
if q = @vhost.queues[routing_key]?
Tuple(Destination).new(q).each
binding_key = BindingKey.new(routing_key)
Tuple(Binding).new(Binding.new(name, vhost.name, binding_key, q)).each
else
Iterator(Destination).empty
Iterator(Binding).empty
end
end

Expand Down
15 changes: 9 additions & 6 deletions src/lavinmq/exchange/direct.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ module LavinMQ
"direct"
end

def bindings_details : Iterator(BindingDetails)
def bindings : Iterator(Binding)
@bindings.each.flat_map do |key, ds|
ds.each.map do |d|
binding_key = BindingKey.new(key)
BindingDetails.new(name, vhost.name, binding_key, d)
Binding.new(name, vhost.name, binding_key, d)
end
end
end

def bind(destination : Destination, routing_key, headers = nil) : Bool
return false unless @bindings[routing_key].add? destination
binding_key = BindingKey.new(routing_key)
data = BindingDetails.new(name, vhost.name, binding_key, destination)
data = Binding.new(name, vhost.name, binding_key, destination)
notify_observers(ExchangeEvent::Bind, data)
true
end
Expand All @@ -33,15 +33,18 @@ module LavinMQ
@bindings.delete routing_key if rk_bindings.empty?

binding_key = BindingKey.new(routing_key)
data = BindingDetails.new(name, vhost.name, binding_key, destination)
data = Binding.new(name, vhost.name, binding_key, destination)
notify_observers(ExchangeEvent::Unbind, data)

delete if @auto_delete && @bindings.each_value.all?(&.empty?)
true
end

protected def bindings(routing_key, headers) : Iterator(Destination)
@bindings[routing_key].each
protected def matches(routing_key, headers) : Iterator(Binding)
@bindings[routing_key].each.map do |d|
binding_key = BindingKey.new(routing_key)
Binding.new(name, vhost.name, binding_key, d)
end
end
end
end
Loading

0 comments on commit 0e7cd21

Please sign in to comment.