Skip to content

Commit

Permalink
Buffer deliveries on blocked consumer chans
Browse files Browse the repository at this point in the history
Rely on Channel.Qos so not to infinitely fill the buffer on dead
consumers.

Update the examples to ensure buffered chans are used for NotifyConfirm
so not to produce a deadlock when issuing another RPC call on the
Connection from a blocked NotifyConfirm receiver.

Fixes streadway#48
  • Loading branch information
Sean Treadway committed Feb 27, 2013
1 parent 1c56f9a commit f0b18b5
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 24 deletions.
37 changes: 24 additions & 13 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,17 +474,22 @@ func (me *Channel) NotifyReturn(c chan Return) chan Return {
/*
NotifyConfirm registers a listener chan for reliable publishing to receive
basic.ack and basic.nack messages. These messages will be sent by the server
for every publish after Channel.Confirm has been called. The value sent
on these channels are the sequence number of the publishing. It is up to
client of this channel to maintain the sequence number and handle resends.
for every publish after Channel.Confirm has been called. The value sent on
these channels is the sequence number of the publishing. It is up to client of
this channel to maintain the sequence number of each publishing and handle
resends on basic.nack.
There will be either at most one Ack or Nack delivered for every Publishing.
The Ack/Nack may arrive in a different order than the publishing's sequence.
The order of acknowledgments is not bound to the order of deliveries.
The order of acknowledgments is not bound to the order of publishings.
The capacity of the ack and nack channels must be at least as large as the
number of outstanding publishings. Not having enough buffered chans will
create a deadlock if you attempt to perform other operations on the Connection
or Channel while confirms are in-flight.
It's advisable to wait for all acks or nacks to arrive before closing the
channel on completion.
It's advisable to wait for all acks or nacks to arrive before calling
Channel.Close().
*/
func (me *Channel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64) {
Expand Down Expand Up @@ -569,9 +574,9 @@ acknowledgments from the consumers. This option is ignored when consumers are
started with noAck.
When global is true, these Qos settings apply to all existing and future
consumers on all channels on the same connection. When false, the Qos settings
will apply to all existing and future consumers on this channel. RabbitMQ does
not implement the global flag.
consumers on all channels on the same connection. When false, the Channel.Qos
settings will apply to all existing and future consumers on this channel.
RabbitMQ does not implement the global flag.
To get round-robin behavior between consumers consuming from the same queue on
different connections, set the prefetch count to 1, and the next available
Expand Down Expand Up @@ -906,7 +911,7 @@ the same non-empty idenfitier in Channel.Cancel. An empty string will cause
the library to generate a unique identity. The consumer identity will be
included in every Delivery in the ConsumerTag field
When autoAck (also known as noAck) is true the server will acknowledge
When autoAck (also known as noAck) is true, the server will acknowledge
deliveries to this consumer prior to writing the delivery to the network. When
autoAck is true, the consumer should not call Delivery.Ack. Automatically
acknowledging deliveries means that some deliveries may get lost if the
Expand All @@ -931,6 +936,10 @@ or server.
When the channel or connection closes, all delivery chans will also close.
Deliveries on the returned chan will be buffered indefinitely. To limit memory
of this buffer, use the Channel.Qos method to limit the amount of
unacknowledged/buffered deliveries the server will deliver on this Channel.
*/
func (me *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
// When we return from me.call, there may be a delivery already for the
Expand Down Expand Up @@ -1319,9 +1328,11 @@ tag set to a 1 based incrementing index corresponding to every publishing
received after the this method returns.
Add a listener to Channel.NotifyConfirm to respond to the acknowledgments and
negative acknowledgments.
negative acknowledgments before publishing. If Channel.NotifyConfirm is not
called, the Ack/Nacks will be silently ignored.
The order of acknowledgments is not bound to the order of deliveries.
The order of acknowledgments is not related to the order of deliveries and all
Ack and Nack confirmations will arrive at some point in the future.
Unroutable mandatory or immediate messages are acknowledged immediately after
Expand Down
51 changes: 42 additions & 9 deletions consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,62 @@ func uniqueConsumerTag() string {
return fmt.Sprintf("ctag-%s-%d", os.Args[0], atomic.AddUint64(&consumerSeq, 1))
}

type consumerChannels map[string]chan Delivery
type consumerBuffers map[string]chan *Delivery

// Concurrent type that manages the consumerTag ->
// consumerChannel mapping
// ingress consumerBuffer mapping
type consumers struct {
sync.Mutex
chans consumerChannels
chans consumerBuffers
}

func makeConsumers() consumers {
return consumers{chans: make(consumerChannels)}
return consumers{chans: make(consumerBuffers)}
}

func bufferDeliveries(in chan *Delivery, out chan Delivery) {
var queue []*Delivery
var queueIn = in

for delivery := range in {
select {
case out <- *delivery:
// delivered immediately while the consumer chan can receive
default:
queue = append(queue, delivery)
}

for len(queue) > 0 {
select {
case out <- *queue[0]:
queue = queue[1:]
case delivery, open := <-queueIn:
if open {
queue = append(queue, delivery)
} else {
// stop receiving to drain the queue
queueIn = nil
}
}
}
}

close(out)
}

// On key conflict, close the previous channel.
func (me *consumers) add(tag string, ch chan Delivery) {
func (me *consumers) add(tag string, consumer chan Delivery) {
me.Lock()
defer me.Unlock()

if prev, found := me.chans[tag]; found {
close(prev)
}

me.chans[tag] = ch
in := make(chan *Delivery)
go bufferDeliveries(in, consumer)

me.chans[tag] = in
}

func (me *consumers) close(tag string) (found bool) {
Expand All @@ -65,7 +98,7 @@ func (me *consumers) closeAll() {
close(ch)
}

me.chans = make(consumerChannels)
me.chans = make(consumerBuffers)
}

// Sends a delivery to a the consumer identified by `tag`.
Expand All @@ -76,9 +109,9 @@ func (me *consumers) send(tag string, msg *Delivery) bool {
me.Lock()
defer me.Unlock()

ch, found := me.chans[tag]
buffer, found := me.chans[tag]
if found {
ch <- *msg
buffer <- msg
}

return found
Expand Down
2 changes: 1 addition & 1 deletion examples/simple-producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func publish(amqpURI, exchange, exchangeType, routingKey, body string, reliable
return fmt.Errorf("Channel could not be put into confirm mode: %s", err)
}

ack, nack := channel.NotifyConfirm(make(chan uint64), make(chan uint64))
ack, nack := channel.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1))

defer confirmOne(ack, nack)
}
Expand Down
3 changes: 2 additions & 1 deletion examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func ExampleChannel_Confirm_bridge() {
log.Fatalf("exchange.declare destination: %s", err)
}

pubAcks, pubNacks := chd.NotifyConfirm(make(chan uint64), make(chan uint64))
// Buffer of 1 for our single outstanding publishing
pubAcks, pubNacks := chd.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1))

if err := chd.Confirm(false); err != nil {
log.Fatalf("confirm.select destination: %s", err)
Expand Down
64 changes: 64 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,70 @@ func TestIntegrationConfirm(t *testing.T) {
}
}

// https://github.com/streadway/amqp/issues/48
func TestDeadlockConsumerIssue48(t *testing.T) {
if conn := integrationConnection(t, "issue48"); conn != nil {
defer conn.Close()

deadline := make(chan bool)
go func() {
select {
case <-time.After(5 * time.Second):
panic("expected to receive 2 deliveries while in an RPC, got a deadlock")
case <-deadline:
// pass
}
}()

ch, err := conn.Channel()
if err != nil {
t.Fatalf("got error on channel.open: %v", err)
}

queue := "test-issue48"

if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
t.Fatalf("expected to declare a queue", err)
}

if err := ch.Confirm(false); err != nil {
t.Fatalf("got error on confirm: %v", err)
}

ack, nack := make(chan uint64, 2), make(chan uint64, 2)
ch.NotifyConfirm(ack, nack)

for i := 0; i < cap(ack); i++ {
// Fill the queue with some new or remaining publishings
ch.Publish("", queue, false, false, Publishing{Body: []byte("")})
}

for i := 0; i < cap(ack); i++ {
// Wait for them to land on the queue so they'll be delivered on consume
<-ack
}

// Consuming should send them all on the wire
msgs, err := ch.Consume(queue, "", false, false, false, false, nil)
if err != nil {
t.Fatalf("got error on consume: %v", err)
}

// We pop one off the chan, the other is on the wire
<-msgs

// Opening a new channel (any RPC) while another delivery is on the wire
if _, err := conn.Channel(); err != nil {
t.Fatalf("got error on consume: %v", err)
}

// We pop the next off the chan
<-msgs

deadline <- true
}
}

// https://github.com/streadway/amqp/issues/46
func TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46(t *testing.T) {
conn := integrationConnection(t, "issue46")
Expand Down

0 comments on commit f0b18b5

Please sign in to comment.