Skip to content

Commit

Permalink
merge wire package into amqp package
Browse files Browse the repository at this point in the history
  • Loading branch information
Sean Treadway committed Apr 30, 2012
1 parent 93a24ea commit ed2a138
Show file tree
Hide file tree
Showing 19 changed files with 203 additions and 236 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1 @@
wire/spec/spec
spec/spec
17 changes: 7 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,25 @@ all: build fmt test

.PHONY: gen

wire/spec/spec: wire/spec/gen.go
echo "Entering directory 'wire/spec'"
cd wire/spec && go build .
spec/spec: spec/gen.go
echo "Entering directory 'spec'"
cd spec && go build .
echo "Entering directory '.'"
cd ../..
cd ..

wire/spec091.go: wire/spec/spec
wire/spec/spec < wire/spec/amqp0-9-1.extended.xml > wire/spec091.go
spec091.go: spec/spec
spec/spec < spec/amqp0-9-1.extended.xml > spec091.go

gen: wire/spec091.go
gen: spec091.go

test: all
go test

fmt: gen
go fmt ./...
go fmt .

build: gen
go build ./...
go build .

install: build
go install ./...
go install .
2 changes: 1 addition & 1 deletion wire/buffer.go → buffer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package wire
package amqp

import (
"bytes"
Expand Down
83 changes: 41 additions & 42 deletions channel.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package amqp

import (
"amqp/wire"
"fmt"
"sync"
)
Expand Down Expand Up @@ -49,7 +48,7 @@ func (me *Channel) handleAsync() {
return
}
switch method := msg.Method.(type) {
case wire.BasicDeliver:
case BasicDeliver:
consumer, ok := me.consumers[method.ConsumerTag]
if !ok {
// TODO handle missing consumer
Expand All @@ -60,7 +59,7 @@ func (me *Channel) handleAsync() {
Exchange: method.Exchange,
Redelivered: method.Redelivered,
RoutingKey: method.RoutingKey,
Properties: Properties(msg.Properties),
Properties: msg.Properties,
Body: msg.Body,
}
}
Expand All @@ -78,49 +77,49 @@ func (me *Channel) handleAsync() {
// close-channel = C:CLOSE S:CLOSE-OK
// / S:CLOSE C:CLOSE-OK
func (me *Channel) open() error {
me.framing.SendMethod(wire.ChannelOpen{})
me.framing.SendMethod(ChannelOpen{})

switch me.framing.Recv().Method.(type) {
case wire.ChannelOpenOk:
case ChannelOpenOk:
return nil
}

// TODO handle channel open errors (like already opened on this ID)
return ErrBadProtocol
}

func newQueueState(msg *wire.QueueDeclareOk) *QueueState {
func newQueueState(msg *QueueDeclareOk) *QueueState {
return &QueueState{
Name: msg.Queue,
Consumers: int(msg.ConsumerCount),
Messages: int(msg.MessageCount),
}
}

func (me *Channel) unhandled(msg wire.Method) error {
func (me *Channel) unhandled(msg Method) error {
// TODO CLOSE/CLOSE-OK/ERROR
fmt.Println("UNHANDLED", msg)
panic("UNHANDLED")
return nil
}

func (me *Channel) ExchangeDeclare(typ string, name string, durable bool, autoDelete bool, internal bool, args Table) error {
msg := wire.ExchangeDeclare{
msg := ExchangeDeclare{
Exchange: name,
Type: typ,
Passive: false,
Durable: durable,
AutoDelete: autoDelete,
Internal: internal,
NoWait: me.noWait,
Arguments: wire.Table(args),
Arguments: Table(args),
}

me.framing.SendMethod(msg)

if !msg.NoWait {
switch res := me.framing.Recv().Method.(type) {
case wire.ExchangeDeclareOk:
case ExchangeDeclareOk:
return nil
default:
return me.unhandled(res)
Expand All @@ -132,7 +131,7 @@ func (me *Channel) ExchangeDeclare(typ string, name string, durable bool, autoDe
}

func (me *Channel) ExchangeDelete(name string, ifUnused bool) error {
msg := wire.ExchangeDelete{
msg := ExchangeDelete{
Exchange: name,
IfUnused: ifUnused,
NoWait: me.noWait,
Expand All @@ -142,7 +141,7 @@ func (me *Channel) ExchangeDelete(name string, ifUnused bool) error {

if !msg.NoWait {
switch res := me.framing.Recv().Method.(type) {
case wire.ExchangeDeleteOk:
case ExchangeDeleteOk:
return nil
default:
return me.unhandled(res)
Expand All @@ -153,19 +152,19 @@ func (me *Channel) ExchangeDelete(name string, ifUnused bool) error {
}

func (me *Channel) ExchangeBind(destination string, source string, routingKey string, arguments Table) error {
msg := wire.ExchangeBind{
msg := ExchangeBind{
Destination: destination,
Source: source,
RoutingKey: routingKey,
NoWait: me.noWait,
Arguments: wire.Table(arguments),
Arguments: Table(arguments),
}

me.framing.SendMethod(msg)

if !msg.NoWait {
switch res := me.framing.Recv().Method.(type) {
case wire.ExchangeBindOk:
case ExchangeBindOk:
return nil
default:
return me.unhandled(res)
Expand All @@ -176,19 +175,19 @@ func (me *Channel) ExchangeBind(destination string, source string, routingKey st
}

func (me *Channel) ExchangeUnbind(destination string, source string, routingKey string, arguments Table) error {
msg := wire.ExchangeUnbind{
msg := ExchangeUnbind{
Destination: destination,
Source: source,
RoutingKey: routingKey,
NoWait: me.noWait,
Arguments: wire.Table(arguments),
Arguments: Table(arguments),
}

me.framing.SendMethod(msg)

if !msg.NoWait {
switch res := me.framing.Recv().Method.(type) {
case wire.ExchangeUnbindOk:
case ExchangeUnbindOk:
return nil
default:
return me.unhandled(res)
Expand All @@ -199,21 +198,21 @@ func (me *Channel) ExchangeUnbind(destination string, source string, routingKey
}

func (me *Channel) QueueDeclare(name string, durable bool, autoDelete bool, exclusive bool, arguments Table) (*QueueState, error) {
msg := wire.QueueDeclare{
msg := QueueDeclare{
Queue: name,
Passive: false,
Durable: durable,
Exclusive: exclusive,
AutoDelete: autoDelete,
NoWait: me.noWait,
Arguments: wire.Table(arguments),
Arguments: Table(arguments),
}

me.framing.SendMethod(msg)

if !msg.NoWait {
switch res := me.framing.Recv().Method.(type) {
case wire.QueueDeclareOk:
case QueueDeclareOk:
return newQueueState(&res), nil
default:
return nil, me.unhandled(res)
Expand All @@ -224,19 +223,19 @@ func (me *Channel) QueueDeclare(name string, durable bool, autoDelete bool, excl
}

func (me *Channel) QueueBind(exchange string, queue string, routingKey string, arguments Table) error {
msg := wire.QueueBind{
msg := QueueBind{
Queue: queue,
Exchange: exchange,
RoutingKey: routingKey,
NoWait: me.noWait,
Arguments: wire.Table(arguments),
Arguments: Table(arguments),
}

me.framing.SendMethod(msg)

if !msg.NoWait {
switch res := me.framing.Recv().Method.(type) {
case wire.QueueBindOk:
case QueueBindOk:
return nil
default:
return me.unhandled(res)
Expand All @@ -247,17 +246,17 @@ func (me *Channel) QueueBind(exchange string, queue string, routingKey string, a
}

func (me *Channel) QueueUnbind(exchange string, queue string, routingKey string, arguments Table) error {
msg := wire.QueueUnbind{
msg := QueueUnbind{
Queue: queue,
Exchange: exchange,
RoutingKey: routingKey,
Arguments: wire.Table(arguments),
Arguments: Table(arguments),
}

me.framing.SendMethod(msg)

switch res := me.framing.Recv().Method.(type) {
case wire.QueueUnbindOk:
case QueueUnbindOk:
return nil
default:
return me.unhandled(res)
Expand All @@ -267,7 +266,7 @@ func (me *Channel) QueueUnbind(exchange string, queue string, routingKey string,
}

func (me *Channel) QueuePurge(name string) error {
msg := wire.QueuePurge{
msg := QueuePurge{
Queue: name,
NoWait: me.noWait,
}
Expand All @@ -276,7 +275,7 @@ func (me *Channel) QueuePurge(name string) error {

if !msg.NoWait {
switch res := me.framing.Recv().Method.(type) {
case wire.QueuePurgeOk:
case QueuePurgeOk:
return nil
default:
return me.unhandled(res)
Expand All @@ -287,7 +286,7 @@ func (me *Channel) QueuePurge(name string) error {
}

func (me *Channel) QueueDelete(name string, ifUnused bool, ifEmpty bool) error {
msg := wire.QueueDelete{
msg := QueueDelete{
Queue: name,
IfUnused: ifUnused,
IfEmpty: ifEmpty,
Expand All @@ -298,7 +297,7 @@ func (me *Channel) QueueDelete(name string, ifUnused bool, ifEmpty bool) error {

if !msg.NoWait {
switch res := me.framing.Recv().Method.(type) {
case wire.QueueDeleteOk:
case QueueDeleteOk:
return nil
default:
return me.unhandled(res)
Expand All @@ -310,7 +309,7 @@ func (me *Channel) QueueDelete(name string, ifUnused bool, ifEmpty bool) error {

// Only applies to this Channel
func (me *Channel) BasicQos(prefetchMessageCount int, prefetchWindowByteSize int) error {
msg := wire.BasicQos{
msg := BasicQos{
PrefetchCount: uint16(prefetchMessageCount),
PrefetchSize: uint32(prefetchWindowByteSize),
Global: false, // connection global change from a channel message, durr...
Expand All @@ -319,7 +318,7 @@ func (me *Channel) BasicQos(prefetchMessageCount int, prefetchWindowByteSize int
me.framing.SendMethod(msg)

switch res := me.framing.Recv().Method.(type) {
case wire.BasicQosOk:
case BasicQosOk:
return nil
default:
return me.unhandled(res)
Expand All @@ -328,15 +327,15 @@ func (me *Channel) BasicQos(prefetchMessageCount int, prefetchWindowByteSize int
panic("unreachable")
}

func (me *Channel) BasicPublish(exchange string, routingKey string, mandatory bool, immediate bool, body []byte, properties Properties) {
func (me *Channel) BasicPublish(exchange string, routingKey string, mandatory bool, immediate bool, body []byte, properties ContentProperties) {
me.framing.Send(Message{
Method: wire.BasicPublish{
Method: BasicPublish{
Exchange: exchange,
RoutingKey: routingKey,
Mandatory: mandatory,
Immediate: immediate,
},
Properties: wire.ContentProperties(properties),
Properties: ContentProperties(properties),
Body: body,
})
}
Expand Down Expand Up @@ -369,20 +368,20 @@ func (me *Channel) BasicConsume(queue string, consumerTag string, noLocal bool,
me.consumersMutex.Lock()
defer me.consumersMutex.Unlock()

msg := wire.BasicConsume{
msg := BasicConsume{
Queue: queue,
ConsumerTag: consumerTag,
NoLocal: false,
NoAck: false,
Exclusive: false,
NoWait: me.noWait,
Arguments: wire.Table(arguments),
Arguments: Table(arguments),
}

me.framing.SendMethod(msg)

switch res := me.framing.Recv().Method.(type) {
case wire.BasicConsumeOk:
case BasicConsumeOk:
consumer := make(chan *Delivery)
me.consumers[res.ConsumerTag] = consumer
return consumer, nil
Expand All @@ -401,7 +400,7 @@ func (me *Channel) BasicCancel(consumerTag string) error {

consumer, ok := me.consumers[consumerTag]
if ok {
msg := wire.BasicCancel{
msg := BasicCancel{
ConsumerTag: consumerTag,
NoWait: me.noWait,
}
Expand All @@ -413,7 +412,7 @@ func (me *Channel) BasicCancel(consumerTag string) error {
close(consumer)
} else {
switch res := me.framing.Recv().Method.(type) {
case wire.BasicCancelOk:
case BasicCancelOk:
if res.ConsumerTag == consumerTag {
delete(me.consumers, consumerTag)
close(consumer)
Expand All @@ -431,7 +430,7 @@ func (me *Channel) BasicCancel(consumerTag string) error {
}

func (me *Channel) BasicAck(deliveryTag uint64, multiple bool) {
me.framing.SendMethod(wire.BasicAck{
me.framing.SendMethod(BasicAck{
DeliveryTag: deliveryTag,
Multiple: multiple,
})
Expand Down
Loading

0 comments on commit ed2a138

Please sign in to comment.