An amqp(RabbitMQ) client that supports reconnection and msg resending.
Chinese Doc:中文文档
- 💥 Built on top of rabbitmq/amqp091-go package with zero dependencies
- 💪 Supports automatic reconnection after network disconnection
- 💎 Supports automatic message retry in case of send failure
- 🎈 Easy to use
When sending and receiving AMQP messages, it relies on the creation of channels, and the creation of channels depends on the connection. If the network connection is disconnected, the connection will be lost. However, the channel does not know who created it. Therefore, there are usually two ways to handle reconnection:
- One way is for the channel to obtain a new connection after the connection is lost. This is the simplest implementation approach. However, if there are a large number of channels, each channel will create a connection and connect to it. When the network condition is poor, there may be a large number of channels repeatedly attempting reconnection, which will greatly increase the server resource consumption and even worsen network congestion. In fact, we only need one connection to determine whether the network can be connected and is connected.
- Another way is for the connection to reconnect itself after a disconnection. If the reconnection is not successful, the channel does nothing. If the reconnection is successful, the registered operations are automatically executed again. If sending messages during reconnection, the messages will be sent after the connection is successfully reestablished. This approach is more complex to implement, but it avoids the problems of server resource consumption and network congestion.
ezmq adopts the latter approach.
package main
import (
"ezmq"
"log"
"time"
)
func main() {
// Create a connection and connect to the server
conn, err := ezmq.Dial("amqp://guest:guest@localhost:5672/", ezmq.DefaultTimesRetry())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// Create a producer
producer := conn.Producer()
// Send a message
err = producer.Send("amq.direct", "key.direct", []byte("producer.Send() | "+time.Now().Format("2006-01-02 15:04:05")),
ezmq.DefaultSendOpts())
if err != nil {
log.Fatal(err)
}
}
package main
import (
"ezmq"
amqp "github.com/rabbitmq/amqp091-go"
"log"
)
func main() {
// Create a connection and connect to the server
conn, err := ezmq.Dial("amqp://guest:guest@localhost:5672/", ezmq.DefaultTimesRetry())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// Create a consumer
consumer := conn.Consumer()
// Receive messages
consumer.Receive(
"queue.direct", // Queue name
ezmq.NewReceiveOptsBuilder().SetAutoAck(false).Build(), // Receive options
&ezmq.AbsReceiveListener{
ConsumerMethod: func(d *amqp.Delivery) (brk bool) { // Consumer method
log.Println("queue.direct ", d.DeliveryTag, " ", string(d.Body))
err := d.Ack(false)
if err != nil {
log.Println(err)
}
return
},
})
}
A suggestion:
Unless there are specific requirements, we only need to create one connection globally and use the producer and consumer to handle all message sending and receiving.
- Asynchronous message sending and confirmation
- Handling of duplicate consumption
- Message retransmission in case of not finding the queue during network partition (ReturnListener)
- Consumer acknowledgement retry
LGPL