Skip to content

Commit

Permalink
Msgs specs (#515)
Browse files Browse the repository at this point in the history
* Add spec - expires messages with TTL on requeue

* Add spec for - can publish and consume messages larger than 128kb

* Refactor spec for  - can publish and consume messages larger than 128kb

* Add spec for  - does not requeue messages on queue close

* Refactor: remove duplicate lines from spec

* Remove blocking operation with sleep

Co-authored-by: Carl Hörberg <[email protected]>

* Lint code with tool format

* Replace unused arguments with underscore

---------

Co-authored-by: Carl Hörberg <[email protected]>
  • Loading branch information
nyior and carlhoerberg authored Jul 12, 2023
1 parent f9d29b3 commit 4feeda0
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 11 deletions.
55 changes: 44 additions & 11 deletions spec/server_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,52 @@ describe LavinMQ::Server do

it "expires messages with message TTL on queue declaration" do
with_channel do |ch|
args = AMQP::Client::Arguments.new
args["x-message-ttl"] = 1
args["x-dead-letter-exchange"] = ""
args["x-dead-letter-routing-key"] = "dlq"
q = ch.queue("ttl", args: args)
dlq = ch.queue("dlq")
q.publish_confirm "queue dlx"
q, dlq = create_ttl_and_dl_queues(ch)
ttl_msg = "queue dlx"
q.publish_confirm ttl_msg
msg = wait_for { dlq.get(no_ack: true) }
msg.not_nil!.body_io.to_s.should eq(ttl_msg)
Server.vhosts["/"].queues[q.name].empty?.should be_true
q.publish_confirm ttl_msg
msg = wait_for { dlq.get(no_ack: true) }
msg.not_nil!.body_io.to_s.should eq("queue dlx")
Server.vhosts["/"].queues["ttl"].empty?.should be_true
q.publish_confirm "queue dlx"
msg.not_nil!.body_io.to_s.should eq(ttl_msg)
end
end

it "expires messages with TTL on requeue" do
with_channel do |ch|
q, dlq = create_ttl_and_dl_queues(ch, queue_ttl: 500)
r_msg = "requeue msg"
q.publish_confirm r_msg
msg = q.get(no_ack: false).not_nil!
msg.reject(requeue: true)
msg = wait_for { dlq.get(no_ack: true) }
msg.not_nil!.body_io.to_s.should eq("queue dlx")
msg.not_nil!.body_io.to_s.should eq(r_msg)
Server.vhosts["/"].queues[q.name].empty?.should be_true
end
end

it "can publish and consume messages larger than 128kb" do
with_channel do |ch|
lmsg = "a" * 5_00_000
q = ch.queue "lmsg_q"
q.publish_confirm lmsg
q.subscribe(no_ack: true) do |msg|
msg.should_not be_nil
msg.body_io.to_s.should eq(lmsg)
end
end
end

it "does not requeue messages on consumer close" do
with_channel do |ch|
q = ch.queue "msg_q"
q.publish_confirm "no requeue"
done = Channel(Nil).new
tag = q.subscribe(no_ack: false) { |_| done.send nil }
done.receive
q.unsubscribe(tag)
Server.vhosts["/"].queues["msg_q"].empty?.should be_true
end
end

Expand Down
11 changes: 11 additions & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ def start_http_server
h
end

# Helper function for creating a queue with ttl and an associated dlq
def create_ttl_and_dl_queues(channel, queue_ttl = 1)
args = AMQP::Client::Arguments.new
args["x-message-ttl"] = queue_ttl
args["x-dead-letter-exchange"] = ""
args["x-dead-letter-routing-key"] = "dlq"
q = channel.queue("ttl", args: args)
dlq = channel.queue("dlq")
{q, dlq}
end

def get(path, headers = nil)
HTTP::Client.get("#{BASE_URL}#{path}", headers: test_headers(headers))
end
Expand Down

0 comments on commit 4feeda0

Please sign in to comment.