-
Notifications
You must be signed in to change notification settings - Fork 1
/
remit.go
111 lines (95 loc) · 2.58 KB
/
remit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package remit
import (
"log"
"sync"
"github.com/streadway/amqp"
)
// J is a convenient aliaas for a `map[string]interface{}`, useful for dealing with
// JSON is a more native manner.
//
// remit.J{
// "foo": "bar",
// "baz": true,
// "qux": remit.J{
// "big": false,
// "small": true,
// },
// }
//
type J map[string]interface{}
// Connect connects to a RabbitMQ instance using the `Url` provided in
// `ConnectionOptions` and setting the _service name_ to `Name`.
//
// The `Url` should be valid as defined by the AMQP URI scheme. Currently,
// this is:
//
// amqp_URI = "amqp://" amqp_authority [ "/" vhost ] [ "?" query ]
// amqp_authority = [ amqp_userinfo "@" ] host [ ":" port ]
// amqp_userinfo = username [ ":" password ]
// username = *( unreserved / pct-encoded / sub-delims )
// password = *( unreserved / pct-encoded / sub-delims )
// vhost = segment
//
// More info can be found here:
//
// https://www.rabbitmq.com/uri-spec.html
//
// Example:
//
// remitSession := remit.Connect(remit.ConnectionOptions{
// Name: "my-service",
// Url: "amqp://localhost"
// })
//
func Connect(options ConnectionOptions) Session {
conn, err := amqp.Dial(options.Url)
failOnError(err, "Failed to connect to RabbitMQ")
closing := conn.NotifyClose(make(chan *amqp.Error))
go func() {
for cl := range closing {
log.Println("Closed", cl.Reason)
}
}()
setupChannel, err := conn.Channel()
failOnError(err, "Failed to open work channel")
err = setupChannel.ExchangeDeclare(
"remit", // name of the exchange
"topic", // type
true, // durable
true, // autoDelete
false, // internal
false, // noWait
nil, // arguments
)
failOnError(err, "Failed to declare \"remit\" exchange")
setupChannel.Close()
publishChannel, err := conn.Channel()
failOnError(err, "Failed to open publish channel")
requestChannel, err := conn.Channel()
failOnError(err, "Failed to open replies channel")
session := Session{
Config: Config{
Name: options.Name,
Url: options.Url,
},
connection: conn,
publishChannel: publishChannel,
requestChannel: requestChannel,
waitGroup: &sync.WaitGroup{},
mu: &sync.Mutex{},
awaitingReply: make(map[string]chan Event),
workerPool: newWorkerPool(1, 5, conn),
}
replies, err := requestChannel.Consume(
"amq.rabbitmq.reply-to", // name of the queue
"", // consumer tag
true, // noAck
true, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
failOnError(err, "Failed to consume replies")
go session.watchForReplies(replies)
return session
}