Skip to content

Commit

Permalink
Parse array fields with byte size as field length
Browse files Browse the repository at this point in the history
Integrates a server generated table field set from using
the x-dead-letter-exchange argument to a queue declaration.

Fixes streadway#56
  • Loading branch information
Sean Treadway committed Apr 7, 2013
1 parent 4cfaca4 commit 8d16fb0
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 10 deletions.
81 changes: 81 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,87 @@ func TestIntegrationConfirm(t *testing.T) {
}
}

// Sets up the topology where rejected messages will be forwarded
// to a fanout exchange, with a single queue bound.
//
// Relates to https://github.com/streadway/amqp/issues/56
//
func TestDeclareArgsRejectToDeadLetterQueue(t *testing.T) {
if conn := integrationConnection(t, "declareArgs"); conn != nil {
defer conn.Close()

ex, q := "declareArgs", "declareArgs-deliveries"
dlex, dlq := ex+"-dead-letter", q+"-dead-letter"

ch, _ := conn.Channel()

if err := ch.ExchangeDeclare(ex, "fanout", false, true, false, false, nil); err != nil {
t.Fatalf("cannot declare %v: got: %v", ex, err)
}

if err := ch.ExchangeDeclare(dlex, "fanout", false, true, false, false, nil); err != nil {
t.Fatalf("cannot declare %v: got: %v", dlex, err)
}

if _, err := ch.QueueDeclare(dlq, false, true, false, false, nil); err != nil {
t.Fatalf("cannot declare %v: got: %v", dlq, err)
}

if err := ch.QueueBind(dlq, "#", dlex, false, nil); err != nil {
t.Fatalf("cannot bind %v to %v: got: %v", dlq, dlex, err)
}

if _, err := ch.QueueDeclare(q, false, true, false, false, Table{
"x-dead-letter-exchange": dlex,
}); err != nil {
t.Fatalf("cannot declare %v with dlq %v: got: %v", q, dlex, err)
}

if err := ch.QueueBind(q, "#", ex, false, nil); err != nil {
t.Fatalf("cannot bind %v: got: %v", ex, err)
}

fails, err := ch.Consume(q, "", false, false, false, false, nil)
if err != nil {
t.Fatalf("cannot consume %v: got: %v", q, err)
}

// Reject everything consumed
go func() {
for d := range fails {
d.Reject(false)
}
}()

// Publish the 'poison'
if err := ch.Publish(ex, q, true, false, Publishing{Body: []byte("ignored")}); err != nil {
t.Fatalf("publishing failed")
}

// spin-get until message arrives on the dead-letter queue with a
// synchronous parse to exercise the array field (x-death) set by the
// server relating to issue-56
for i := 0; i < 10; i++ {
d, got, err := ch.Get(dlq, false)
if !got && err == nil {
continue
} else if err != nil {
t.Fatalf("expected success in parsing reject, got: %v", err)
} else {
// pass if we've parsed an array
if v, ok := d.Headers["x-death"]; ok {
if _, ok := v.([]interface{}); ok {
return
}
}
t.Fatalf("array field x-death expected in the headers, got: %v (%T)", d.Headers, d.Headers["x-death"])
}
}

t.Fatalf("expectd dead-letter after 10 get attempts")
}
}

// https://github.com/streadway/amqp/issues/48
func TestDeadlockConsumerIssue48(t *testing.T) {
if conn := integrationConnection(t, "issue48"); conn != nil {
Expand Down
18 changes: 13 additions & 5 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,21 @@ func readField(r io.Reader) (v interface{}, err error) {
if err = binary.Read(r, binary.BigEndian, &size); err != nil {
return
}
array := make([]interface{}, size)
for i, _ := range array {
if array[i], err = readField(r); err != nil {
return

lim := &io.LimitedReader{R: r, N: int64(size)}
arr := make([]interface{}, 0)
var field interface{}

for {
if field, err = readField(lim); err != nil {
if err == io.EOF {
return arr, nil
} else {
return nil, err
}
}
arr = append(arr, field)
}
return array, nil

case 'T': // timestamp
return readTimestamp(r)
Expand Down
17 changes: 12 additions & 5 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,16 +354,23 @@ func writeField(w io.Writer, value interface{}) (err error) {
enc = append(buf[:5], []byte(v)...)

case []interface{}: // field-array
sec := new(bytes.Buffer)
for _, val := range v {
if err = writeField(sec, val); err != nil {
return
}
}

buf[0] = 'A'
binary.BigEndian.PutUint32(buf[1:5], uint32(len(v)))
binary.BigEndian.PutUint32(buf[1:5], uint32(sec.Len()))
if _, err = w.Write(buf[:5]); err != nil {
return
}
for _, val := range v {
if err = writeField(w, val); err != nil {
return
}

if _, err = w.Write(sec.Bytes()); err != nil {
return
}

return

case time.Time: // timestamp
Expand Down

0 comments on commit 8d16fb0

Please sign in to comment.