-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdial.go
109 lines (89 loc) · 2.82 KB
/
dial.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package amqpx
import (
"context"
"fmt"
"time"
"github.com/rabbitmq/amqp091-go"
)
const (
defaultHost = "localhost"
defaultPort = 5672
defaultReconnectDelay = 4 * time.Second
defaultHeartbeat = 10 * time.Second
defaultLocale = "en_US"
)
//go:generate ./bin/moq -rm -out connection_moq_test.go . Connection
// A Connection is an interface implemented by amqp091 client.
type Connection interface {
IsClosed() bool
Channel() (Channel, error)
NotifyClose(chan *amqp091.Error) chan *amqp091.Error
Close() error
}
//go:generate ./bin/moq -rm -out channel_moq_test.go . Channel
// A Channel is an interface implemented by amqp091 client.
type Channel interface {
QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp091.Table) (amqp091.Queue, error)
ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp091.Table) error
QueueBind(name, key, exchange string, noWait bool, args amqp091.Table) error
Confirm(noWait bool) error
Qos(prefetchCount, prefetchSize int, global bool) error
NotifyClose(chan *amqp091.Error) chan *amqp091.Error
NotifyCancel(chan string) chan string
Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp091.Table) (<-chan amqp091.Delivery, error)
PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp091.Publishing) (*amqp091.DeferredConfirmation, error)
NotifyReturn(c chan amqp091.Return) chan amqp091.Return
Close() error
}
//go:generate ./bin/moq -out dialer_moq_test.go . dialer
// A dialer is an interface implemented by creating connection.
type dialer interface {
Dial(context.Context) (Connection, error)
}
type amqpConn struct {
conn *amqp091.Connection
}
func (w *amqpConn) IsClosed() bool {
return w.conn.IsClosed()
}
func (w *amqpConn) Channel() (Channel, error) {
return w.conn.Channel()
}
func (w *amqpConn) NotifyClose(receiver chan *amqp091.Error) chan *amqp091.Error {
return w.conn.NotifyClose(receiver)
}
func (w *amqpConn) Close() error {
return w.conn.Close()
}
var defaultURI = amqp091.URI{
Scheme: "amqp",
Host: defaultHost,
Port: defaultPort,
Username: "guest",
Password: "guest",
Vhost: "/",
}
var defaultConfig = amqp091.Config{
Heartbeat: defaultHeartbeat,
Locale: defaultLocale,
Properties: amqp091.NewConnectionProperties(),
}
type defaultDialer struct {
URI string
Config amqp091.Config
log LogFunc
}
func (d *defaultDialer) Dial(ctx context.Context) (Connection, error) {
for {
conn, err := amqp091.DialConfig(d.URI, d.Config)
if err == nil {
return &amqpConn{conn: conn}, nil
}
d.log("[ERROR] dial conn: %v", err)
select {
case <-ctx.Done():
return nil, fmt.Errorf("%s: %w", err, ctx.Err())
case <-time.After(defaultReconnectDelay):
}
}
}