From 1c7e895ed1bd7ee17fd460e78788bdf3272ddd0f Mon Sep 17 00:00:00 2001 From: jxsl13 Date: Tue, 14 Feb 2023 15:34:16 +0100 Subject: [PATCH] make delivery mode persistent by default --- pool/session.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pool/session.go b/pool/session.go index 828028d..3a07e66 100644 --- a/pool/session.go +++ b/pool/session.go @@ -275,7 +275,7 @@ type Publishing struct { // Properties ContentType string // MIME content type ContentEncoding string // MIME content encoding - DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) + DeliveryMode uint8 // Persistent (0 or 2) or Transient (1) (different from rabbitmq/amqp091-go library) Priority uint8 // 0 to 9 CorrelationId string // correlation identifier ReplyTo string // address to to reply to (ex: RPC) @@ -310,6 +310,17 @@ func (s *Session) Publish(ctx context.Context, exchange string, routingKey strin s.mu.Lock() defer s.mu.Unlock() + // we want to have a persistent messages by default + // this allows to even in a disaster case where the rabbitmq node is restarted or crashes + // to still have our messages persisted to disk. + // https://www.rabbitmq.com/persistence-conf.html#how-it-works + var amqpDeliverMode uint8 + if msg.DeliveryMode == 1 { + amqpDeliverMode = 1 // transient (purged upon rabbitmq restart) + } else { + amqpDeliverMode = 2 // persistent (persisted to disk upon arrival in queue) + } + return s.retryPublish(func() (uint64, error) { tag = 0 if s.confirmable { @@ -326,7 +337,7 @@ func (s *Session) Publish(ctx context.Context, exchange string, routingKey strin Headers: msg.Headers, ContentType: msg.ContentType, ContentEncoding: msg.ContentEncoding, - DeliveryMode: msg.DeliveryMode, + DeliveryMode: amqpDeliverMode, Priority: msg.Priority, CorrelationId: msg.CorrelationId, ReplyTo: msg.ReplyTo,