Skip to content

Commit

Permalink
Merge pull request #62 from jxsl13/fix/publish-retry-tag-mismatch
Browse files Browse the repository at this point in the history
introduce backoff policy for publisher
  • Loading branch information
jxsl13 authored Aug 31, 2024
2 parents 28526e2 + 1cfe9b8 commit c0af546
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 68 deletions.
96 changes: 48 additions & 48 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@ name: rabbitmq & toxiproxy integration tests

on:
push:
branches: [ "main" ]
branches: ["main"]
paths:
- '**.go'
- '**.yaml'
- '**.yml'
- '**.json'
- 'go.mod'
- 'go.sum'
- "**.go"
- "**.yaml"
- "**.yml"
- "**.json"
- "go.mod"
- "go.sum"
pull_request:
# The branches below must be a subset of the branches above
branches: [ "main" ]
branches: ["main"]
paths:
- '**.go'
- '**.yaml'
- '**.yml'
- '**.json'
- 'go.mod'
- 'go.sum'
- "**.go"
- "**.yaml"
- "**.yml"
- "**.json"
- "go.mod"
- "go.sum"

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
Expand All @@ -29,7 +29,7 @@ jobs:
test:
strategy:
matrix:
go-version: ['stable', 'oldstable']
go-version: ["stable", "oldstable"]
platform: [ubuntu-latest]
runs-on: ${{ matrix.platform }}
permissions:
Expand All @@ -39,44 +39,44 @@ jobs:
actions: read
contents: read
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Checkout code
uses: actions/checkout@v4

- name: Install Go
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}

- name: Vet
run: go vet ./...
- name: Vet
run: go vet ./...

- name: Install govulncheck
run: go install golang.org/x/vuln/cmd/govulncheck@latest
- name: Install govulncheck
run: go install golang.org/x/vuln/cmd/govulncheck@latest

- name: Run govulncheck
run: govulncheck ./...
- name: Run govulncheck
run: govulncheck ./...

- name: Run Gosec Security Scanner
uses: securego/gosec@master
with:
# we let the report trigger content trigger a failure using the GitHub Security features.
args: '-no-fail -fmt sarif -out results.sarif ./...'
- name: Upload SARIF file
uses: github/codeql-action/upload-sarif@v3
with:
# Path to SARIF file relative to the root of the repository
sarif_file: results.sarif
- name: Run Gosec Security Scanner
uses: securego/gosec@master
with:
# we let the report trigger content trigger a failure using the GitHub Security features.
args: "-no-fail -fmt sarif -out results.sarif ./..."
- name: Upload SARIF file
uses: github/codeql-action/upload-sarif@v3
with:
# Path to SARIF file relative to the root of the repository
sarif_file: results.sarif

- name: Setup integration test environment
run: docker-compose up -d
- name: Setup integration test environment
run: docker compose up -d

- name: Code Coverage
run: go test -timeout 900s -race -count=1 -parallel 2 -covermode=atomic -coverprofile=coverage.txt ./...
- name: Code Coverage
run: go test -timeout 900s -race -count=1 -parallel 2 -covermode=atomic -coverprofile=coverage.txt ./...

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./coverage.txt
fail_ci_if_error: false
verbose: false
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./coverage.txt
fail_ci_if_error: false
verbose: false
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@


environment:
docker-compose up -d
docker compose up -d

down:
docker-compose down
docker compose down

test:
go test -timeout 600ss -v -race -count=1 ./... > parallel.test.log
Expand All @@ -21,5 +21,6 @@ count-disconnect-tests:


pool.TestBatchSubscriberMaxBytes:
go test -timeout 0m30s github.com/jxsl13/amqpx/pool -run ^TestBatchSubscriberMaxBytes$ -v -count=1 -race 2>&1 > debug.test.log
go test -timeout 0m30s github.com/jxsl13/amqpx/pool -run ^TestBatchSubscriberMaxBytes$ -v -count=1 -race 2>&1 > debug
.test.log
cat test.log | grep 'INFO: session' | sort | uniq -c
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ docker compose up -d
The test environment looks like this:

Web interfaces:
- username: `admin` and password: `password`
- [rabbitmq management interface: http://127.0.0.1:15672 -> rabbitmq:15672](http://127.0.0.1:15672)
- [out of memory rabbitmq management interface: http://127.0.0.1:25672 -> rabbitmq-broken:15672](http://127.0.0.1:25672)

Expand Down
76 changes: 61 additions & 15 deletions pool/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/jxsl13/amqpx/logging"
)

type Publisher struct {
pool *Pool
autoClosePool bool
backoff BackoffFunc

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -39,7 +41,8 @@ func NewPublisher(p *Pool, options ...PublisherOption) *Publisher {
Ctx: p.Context(),

AutoClosePool: false,
Logger: p.sp.log, // derive logger from session pool
BackoffPolicy: newDefaultBackoffPolicy(1*time.Millisecond, 5*time.Second), // currently only affects publishing with confirms
Logger: p.sp.log, // derive logger from session pool
}

for _, o := range options {
Expand All @@ -52,6 +55,7 @@ func NewPublisher(p *Pool, options ...PublisherOption) *Publisher {
pub := &Publisher{
pool: p,
autoClosePool: option.AutoClosePool,
backoff: option.BackoffPolicy,
ctx: ctx,
cancel: cancel,

Expand All @@ -65,25 +69,22 @@ func NewPublisher(p *Pool, options ...PublisherOption) *Publisher {
// Publish a message to a specific exchange with a given routingKey.
// You may set exchange to "" and routingKey to your queue name in order to publish directly to a queue.
func (p *Publisher) Publish(ctx context.Context, exchange string, routingKey string, msg Publishing) error {

for {
err := p.publish(ctx, exchange, routingKey, msg)
return p.retry(ctx, func() (cont bool, err error) {
err = p.publish(ctx, exchange, routingKey, msg)
switch {
case err == nil:
return nil
return false, nil
case errors.Is(err, ErrNack):
return err
case errors.Is(err, ErrDeliveryTagMismatch):
return err
return false, err
case !recoverable(err):
// not recoverable
return false, err
default:
if recoverable(err) {
p.warn(exchange, routingKey, err, "publish failed due to recoverable error, retrying")
// retry
} else {
return err
}
// ErrDeliveryTagMismatch + all other unknown errors
p.warn(exchange, routingKey, err, "publish failed due to recoverable error, retrying")
return true, nil
}
}
})
}

func (p *Publisher) publish(ctx context.Context, exchange string, routingKey string, msg Publishing) (err error) {
Expand Down Expand Up @@ -115,6 +116,51 @@ func (p *Publisher) publish(ctx context.Context, exchange string, routingKey str
return s.AwaitConfirm(ctx, tag)
}

func (p *Publisher) retry(ctx context.Context, f func() (cont bool, err error)) error {
// fast path
cont, err := f()
if err != nil {
return err
}
if !cont {
return nil
}

// continue second try with backoff timer overhead
var (
retry = 1
timer = time.NewTimer(p.backoff(retry))
drained = false
)
defer closeTimer(timer, &drained)

for {

select {
case <-timer.C:
// at this point we know that the timer channel has been drained
drained = true

// try again
cont, err = f()
if err != nil {
return err
}
if !cont {
return nil
}

retry++
resetTimer(timer, p.backoff(retry), &drained)

case <-ctx.Done():
return errors.Join(err, ctx.Err())
case <-p.ctx.Done():
return errors.Join(err, p.ctx.Err())
}
}
}

// Get is only supposed to be used for testing, do not use get for polling any broker queues.
func (p *Publisher) Get(ctx context.Context, queue string, autoAck bool) (msg Delivery, ok bool, err error) {
s, err := p.pool.GetSession(ctx)
Expand Down
17 changes: 15 additions & 2 deletions pool/publisher_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type publisherOption struct {
Ctx context.Context

AutoClosePool bool
BackoffPolicy BackoffFunc

Logger logging.Logger
}
Expand All @@ -18,13 +19,17 @@ type PublisherOption func(*publisherOption)

func PublisherWithContext(ctx context.Context) PublisherOption {
return func(po *publisherOption) {
po.Ctx = ctx
if ctx != nil {
po.Ctx = ctx
}
}
}

func PublisherWithLogger(logger logging.Logger) PublisherOption {
return func(po *publisherOption) {
po.Logger = logger
if logger != nil {
po.Logger = logger
}
}
}

Expand All @@ -33,3 +38,11 @@ func PublisherWithAutoClosePool(autoClose bool) PublisherOption {
po.AutoClosePool = autoClose
}
}

func PublisherWithBackoffPolicy(backoffFunc BackoffFunc) PublisherOption {
return func(po *publisherOption) {
if backoffFunc != nil {
po.BackoffPolicy = backoffFunc
}
}
}

0 comments on commit c0af546

Please sign in to comment.