Skip to content

Commit

Permalink
make delivery mode persistent by default
Browse files Browse the repository at this point in the history
  • Loading branch information
jxsl13 committed Feb 14, 2023
1 parent 1832af0 commit 1c7e895
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions pool/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ type Publishing struct {
// Properties
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // Transient (0 or 1) or Persistent (2)
DeliveryMode uint8 // Persistent (0 or 2) or Transient (1) (different from rabbitmq/amqp091-go library)
Priority uint8 // 0 to 9
CorrelationId string // correlation identifier
ReplyTo string // address to to reply to (ex: RPC)
Expand Down Expand Up @@ -310,6 +310,17 @@ func (s *Session) Publish(ctx context.Context, exchange string, routingKey strin
s.mu.Lock()
defer s.mu.Unlock()

// we want to have a persistent messages by default
// this allows to even in a disaster case where the rabbitmq node is restarted or crashes
// to still have our messages persisted to disk.
// https://www.rabbitmq.com/persistence-conf.html#how-it-works
var amqpDeliverMode uint8
if msg.DeliveryMode == 1 {
amqpDeliverMode = 1 // transient (purged upon rabbitmq restart)
} else {
amqpDeliverMode = 2 // persistent (persisted to disk upon arrival in queue)
}

return s.retryPublish(func() (uint64, error) {
tag = 0
if s.confirmable {
Expand All @@ -326,7 +337,7 @@ func (s *Session) Publish(ctx context.Context, exchange string, routingKey strin
Headers: msg.Headers,
ContentType: msg.ContentType,
ContentEncoding: msg.ContentEncoding,
DeliveryMode: msg.DeliveryMode,
DeliveryMode: amqpDeliverMode,
Priority: msg.Priority,
CorrelationId: msg.CorrelationId,
ReplyTo: msg.ReplyTo,
Expand Down

0 comments on commit 1c7e895

Please sign in to comment.