Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make channel raise Connection::ClosedException #46

Merged
merged 6 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Specify cacert/cafile/keyfile via URI parameters

### Fixed

- `Channel` methods didn't raise `Connection::ClosedException` if connection had been closed by server.

## [1.2.5] - 2024-06-17

### Fixed
Expand Down
45 changes: 39 additions & 6 deletions spec/amqp-client_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,15 @@ describe AMQP::Client do
end
end

it "should set connection name" do
it "should set connection name", tags: "slow" do
AMQP::Client.start(name: "My Name") do |_|
names = Array(String).new
5.times do
HTTP::Client.get("http://guest:guest@#{AMQP::Client::AMQP_HOST}:15672/api/connections") do |resp|
conns = JSON.parse resp.body_io
names = conns.as_a.map &.dig("client_properties", "connection_name")
with_http_api do |api|
5.times do
names = api.connections.map &.dig("client_properties", "connection_name")
break if names.includes? "My name"
sleep 1
end
sleep 1
end
names.should contain "My Name"
end
Expand Down Expand Up @@ -412,4 +411,38 @@ describe AMQP::Client do
c.update_secret("foobar", "no reason")
end
end

describe "Channel raises Connection::ClosedException", tags: "slow" do
it "#basic_consume block=true" do
with_channel do |ch|
q = ch.queue
q.publish "foobar"
expect_raises(AMQP::Client::Connection::ClosedException) do
ch.basic_consume q.name, block: true do
with_http_api &.close_connections(1)
end
end
end
end

it "#basic_publish" do
with_channel do |ch|
with_http_api &.close_connections(1)
sleep 1 # Wait for connection to be closed
expect_raises(AMQP::Client::Connection::ClosedException) do
ch.basic_publish "", "", "foobar"
end
end
end

it "#basic_publish_confirm" do
with_channel do |ch|
with_http_api &.close_connections(1)
sleep 1 # Wait for connection to be closed
expect_raises(AMQP::Client::Connection::ClosedException) do
ch.basic_publish_confirm "", "", "foobar"
end
end
end
end
end
35 changes: 35 additions & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,41 @@ module TestHelpers
yield c.channel
end
end

struct ManagementApi
def initialize(uri : URI)
@http = HTTP::Client.new uri
username = uri.user || "guest"
password = uri.password || "guest"
@http.basic_auth username, password
end

def connections
get("/api/connections/") do |resp|
JSON.parse(resp.body_io).as_a
end
end

def close_connections(amount : Int)
loop do
conns = connections
conns.each do |conn|
name = conn["name"].as_s
delete("/api/connections/#{URI.encode_path_segment name}")
amount -= 1
end
break if amount <= 0
sleep 1
end
end

forward_missing_to @http
end

def with_http_api(&)
uri = URI.parse ENV.fetch("MGMT_URL", "http://guest:guest@#{AMQP::Client::AMQP_HOST}:15672")
yield ManagementApi.new(uri)
end
end

extend TestHelpers
42 changes: 20 additions & 22 deletions src/amqp-client/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class AMQP::Client
def basic_publish(body : IO | Bytes, bytesize : Int, exchange : String, routing_key = "",
mandatory = false, immediate = false, props properties = Properties.new,
blk : Proc(Bool, Nil)? = nil) : UInt64
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed

@connection.with_lock(flush: !@tx) do |c|
if blk && !@confirm_mode
Expand Down Expand Up @@ -309,7 +309,7 @@ class AMQP::Client
waiting_fiber.enqueue
end
sleep
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed
confirmed
end

Expand All @@ -321,7 +321,7 @@ class AMQP::Client
waiting_fiber.enqueue
end
sleep
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed
confirmed
end

Expand All @@ -333,11 +333,8 @@ class AMQP::Client
return true if @unconfirmed_publishes.empty?
ok = @unconfirmed_empty.receive
unless ok
if frame = @closing_frame
raise ClosedException.new(frame)
else
raise Error.new("BUG: got nack without closing frame")
end
raise_if_closed
raise Error.new("BUG: got nack without closing frame")
end
ok
end
Expand Down Expand Up @@ -383,13 +380,8 @@ class AMQP::Client
write Frame::Basic::Get.new(@id, 0_u16, queue, no_ack)
@basic_get.receive
rescue ex : ::Channel::ClosedError
if cf = @connection.closing_frame
raise Connection::ClosedException.new(cf)
elsif cf = @closing_frame
raise ClosedException.new(cf, cause: ex)
else
raise ex
end
raise_if_closed(cause: ex)
raise ex
end

# :nodoc:
Expand Down Expand Up @@ -430,7 +422,7 @@ class AMQP::Client
raise ex
end
end
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed
else
done.close
end
Expand Down Expand Up @@ -681,25 +673,31 @@ class AMQP::Client
end

private def write(frame)
raise ClosedException.new(@closing_frame) if @closing_frame
raise_if_closed
@connection.write frame
end

private def next_frame : Frame
@reply_frames.receive
rescue ex : ::Channel::ClosedError
if conn_close = @connection.closing_frame
raise Connection::ClosedException.new(conn_close)
else
raise ClosedException.new(@closing_frame, cause: ex)
end
raise_if_closed(ex)
raise Error.new("BUG: Channel::ClosedError but not closing frame")
end

private macro expect(clz)
frame = next_frame
frame.as?({{ clz }}) || raise Error::UnexpectedFrame.new(frame)
end

private def raise_if_closed(cause ex : Exception? = nil)
if frame = @closing_frame
raise ClosedException.new frame, cause: ex
end
if frame = @connection.closing_frame
raise Connection::ClosedException.new frame, cause: ex
end
end

def inspect(io : IO) : Nil
io << "#<" << self.class.name << " @id=" << @id << '>'
end
Expand Down
4 changes: 2 additions & 2 deletions src/amqp-client/errors.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class AMQP::Client
super(message, cause)
end

def initialize(frame : Frame::Connection::Close)
super("#{frame.reply_text} (#{frame.reply_code})")
def initialize(frame : Frame::Connection::Close, cause : Exception? = nil)
super("#{frame.reply_text} (#{frame.reply_code})", cause)
end

def initialize(message, host, user, vhost)
Expand Down
Loading