Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add subscriber pause & resume test #63

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ down:
docker compose down

test:
go test -timeout 600ss -v -race -count=1 ./... > parallel.test.log
go test -timeout 600s -v -race -count=1 ./... > parallel.test.log

test-sequentially:
go test -timeout 900s -v -race -parallel 1 -count=1 ./... > sequential.test.log
Expand Down
372 changes: 372 additions & 0 deletions amqpx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,378 @@ func TestPauseResumeHandlerNoProcessing(t *testing.T) {
}
}

func TestHandlerPauseAndResumeSubscriber(t *testing.T) {
t.Parallel()
var (
amqp = amqpx.New()
log = logging.NewTestLogger(t)
cctx, cancel = context.WithCancel(context.TODO())
funcName = testutils.CallerFuncName()
nextExchangeQueue = testutils.NewExchangeQueueGenerator(funcName)
eq1 = nextExchangeQueue()
)
defer cancel()
defer func() {
log.Info("closing amqp")
assert.NoError(t, amqp.Close())
}()

options := []amqpx.Option{
amqpx.WithLogger(logging.NewNoOpLogger()),
amqpx.WithPublisherConnections(1),
amqpx.WithPublisherSessions(1),
}

amqpPub := amqpx.New()
amqpPub.RegisterTopologyCreator(createTopology(log, eq1))
amqp.RegisterTopologyDeleter(deleteTopology(log, eq1))
defer func() {
assert.NoError(t, amqpPub.Close())
}()

err := amqpPub.Start(
cctx,
testutils.HealthyConnectURL,
append(options, amqpx.WithName(funcName+"-pub"))...,
)
require.NoError(t, err)

var (
publish = 10
cnt = 0
processingFinshed = make(chan struct{})
initialBatchSize = 2
subscriberFlushTimeout = 500 * time.Millisecond
finalBatchSize = 1
)
// step 2 - process messages, pause, wait, resume, process rest, cancel context
handler := amqp.RegisterBatchHandler(eq1.Queue, func(hctx context.Context, msgs []pool.Delivery) (err error) {
select {
case <-hctx.Done():
return fmt.Errorf("handler context canceled before processing: %w", hctx.Err())
default:
// nothing
}

for _, msg := range msgs {
assert.Equal(t, eq1.NextSubMsg(), string(msg.Body))
cnt++
}

if cnt == publish {
close(processingFinshed)
}

return nil
},
pool.WithMaxBatchSize(initialBatchSize),
pool.WithBatchFlushTimeout(subscriberFlushTimeout),
)

err = amqp.Start(cctx, testutils.HealthyConnectURL, amqpx.WithName(funcName+"-sub"), amqpx.WithLogger(log))
if err != nil {
assert.NoError(t, err)
return
}

// publish half of the messages
for i := 0; i < publish/2; i++ {
err := amqpPub.Publish(cctx, eq1.Exchange, eq1.RoutingKey, pool.Publishing{
ContentType: "text/plain",
Body: []byte(eq1.NextPubMsg()),
})
if err != nil {
assert.NoError(t, err)
return
}
}

time.Sleep(2 * subscriberFlushTimeout)

// pause and reduce batch size and resume
reconnectTimeout := 2 * time.Minute

pauseCtx, cancel := context.WithTimeout(cctx, reconnectTimeout)
err = handler.Pause(pauseCtx)
cancel()
assert.NoError(t, err)

handler.SetMaxBatchSize(finalBatchSize)

resumeCtx, cancel := context.WithTimeout(cctx, reconnectTimeout)
err = handler.Resume(resumeCtx)
cancel()
assert.NoError(t, err)

// publish rest of messages
for i := 0; i < publish/2; i++ {
err := amqpPub.Publish(cctx, eq1.Exchange, eq1.RoutingKey, pool.Publishing{
ContentType: "text/plain",
Body: []byte(eq1.NextPubMsg()),
})
if err != nil {
assert.NoError(t, err)
return
}
}

// await for subscriber to consume all messages before finishing test
publishFinishTimeout := time.Duration(publish/2) * 500 * time.Millisecond // max one second per message
select {
case <-time.After(publishFinishTimeout):
t.Errorf("timeout after %s", publishFinishTimeout)
return
case <-processingFinshed:
log.Info("processing finished successfully")
}
}

func TestHandlerPauseAndResumeInFlightNackSubscriber(t *testing.T) {
t.Parallel()
var (
amqp = amqpx.New()
log = logging.NewTestLogger(t)
cctx, cancel = context.WithCancel(context.TODO())
funcName = testutils.CallerFuncName()
nextExchangeQueue = testutils.NewExchangeQueueGenerator(funcName)
eq1 = nextExchangeQueue()
)
defer cancel()
defer func() {
log.Info("closing amqp")
assert.NoError(t, amqp.Close())
}()

options := []amqpx.Option{
amqpx.WithLogger(logging.NewNoOpLogger()),
amqpx.WithPublisherConnections(1),
amqpx.WithPublisherSessions(1),
}

amqpPub := amqpx.New()
amqpPub.RegisterTopologyCreator(createTopology(log, eq1))
amqp.RegisterTopologyDeleter(deleteTopology(log, eq1))
defer func() {
assert.NoError(t, amqpPub.Close())
}()

err := amqpPub.Start(
cctx,
testutils.HealthyConnectURL,
append(options, amqpx.WithName(funcName+"-pub"))...,
)
require.NoError(t, err)

var (
publish = 10
initialBatchSize = publish * 2 // higher than number of published messages in order to enforce messages to be in flight
cnt = 0
processingFinshed = make(chan struct{})
subscriberFlushTimeout = 10 * time.Second // also use a high timeout in order to enforce messages to be in flight
finalBatchSize = 1
process = make(chan struct{})
)

handler := amqp.RegisterBatchHandler(eq1.Queue, func(hctx context.Context, msgs []pool.Delivery) (err error) {
select {
case <-hctx.Done():
return fmt.Errorf("handler context canceled before processing: %w", hctx.Err())
case <-process:
// allow processing
// otherwise nack the massages
default:
return fmt.Errorf("we don't want the message to be processed, yet")
}

// TODO: at this point the order is seemingly somewhat broken.
// The bigger the batch, the more data we loose
for _, msg := range msgs {
assert.Equal(t, eq1.NextSubMsg(), string(msg.Body))
cnt++
}

if cnt == publish {
close(processingFinshed)
}

return nil
},
pool.WithMaxBatchSize(initialBatchSize),
pool.WithBatchFlushTimeout(subscriberFlushTimeout),
)

err = amqp.Start(cctx, testutils.HealthyConnectURL, amqpx.WithName(funcName+"-sub"), amqpx.WithLogger(log))
if err != nil {
assert.NoError(t, err)
return
}

// publish half of the messages
for i := 0; i < publish; i++ {
err := amqpPub.Publish(cctx, eq1.Exchange, eq1.RoutingKey, pool.Publishing{
ContentType: "text/plain",
Body: []byte(eq1.NextPubMsg()),
})
if err != nil {
assert.NoError(t, err)
return
}
}

time.Sleep(2 * time.Second)

// pause and reduce batch size and resume
reconnectTimeout := 2 * time.Minute

pauseCtx, cancel := context.WithTimeout(cctx, reconnectTimeout)
err = handler.Pause(pauseCtx)
cancel()
assert.NoError(t, err)

handler.SetMaxBatchSize(finalBatchSize)

resumeCtx, cancel := context.WithTimeout(cctx, reconnectTimeout)
err = handler.Resume(resumeCtx)
cancel()
assert.NoError(t, err)

// allow processing without nacks
close(process)

// await for subscriber to consume all messages before finishing test
publishFinishTimeout := time.Duration(publish) * time.Second // max one second per message
select {
case <-time.After(publishFinishTimeout):
t.Errorf("timeout after %s", publishFinishTimeout)
return
case <-processingFinshed:
log.Info("processing finished successfully")
}
}

func TestHandlerPauseAndResumeInFlightSmallWindowNackSubscriber(t *testing.T) {
t.Parallel()
var (
amqp = amqpx.New()
log = logging.NewTestLogger(t)
cctx, cancel = context.WithCancel(context.TODO())
funcName = testutils.CallerFuncName()
nextExchangeQueue = testutils.NewExchangeQueueGenerator(funcName)
eq1 = nextExchangeQueue()
)
defer cancel()
defer func() {
log.Info("closing amqp")
assert.NoError(t, amqp.Close())
}()

options := []amqpx.Option{
amqpx.WithLogger(logging.NewNoOpLogger()),
amqpx.WithPublisherConnections(1),
amqpx.WithPublisherSessions(1),
}

amqpPub := amqpx.New()
amqpPub.RegisterTopologyCreator(createTopology(log, eq1))
amqp.RegisterTopologyDeleter(deleteTopology(log, eq1))
defer func() {
assert.NoError(t, amqpPub.Close())
}()

err := amqpPub.Start(
cctx,
testutils.HealthyConnectURL,
append(options, amqpx.WithName(funcName+"-pub"))...,
)
require.NoError(t, err)

var (
publish = 10
initialBatchSize = 2
cnt = 0
processingFinshed = make(chan struct{})
subscriberFlushTimeout = 500 * time.Millisecond
finalBatchSize = 1
process = make(chan struct{})
)

handler := amqp.RegisterBatchHandler(eq1.Queue, func(hctx context.Context, msgs []pool.Delivery) (err error) {
select {
case <-hctx.Done():
return fmt.Errorf("handler context canceled before processing: %w", hctx.Err())
case <-process:
// allow processing
// otherwise nack the massages
default:
return fmt.Errorf("we don't want the message to be processed, yet")
}

// TODO: at this point the order is seemingly somewhat broken.
// The bigger the batch, the more data we loose
for _, msg := range msgs {
assert.Equal(t, eq1.NextSubMsg(), string(msg.Body))
cnt++
}

if cnt == publish {
close(processingFinshed)
}

return nil
},
pool.WithMaxBatchSize(initialBatchSize),
pool.WithBatchFlushTimeout(subscriberFlushTimeout),
)

err = amqp.Start(cctx, testutils.HealthyConnectURL, amqpx.WithName(funcName+"-sub"), amqpx.WithLogger(log))
if err != nil {
assert.NoError(t, err)
return
}

// publish half of the messages
for i := 0; i < publish; i++ {
err := amqpPub.Publish(cctx, eq1.Exchange, eq1.RoutingKey, pool.Publishing{
ContentType: "text/plain",
Body: []byte(eq1.NextPubMsg()),
})
if err != nil {
assert.NoError(t, err)
return
}
}

time.Sleep(2 * time.Second)

// pause and reduce batch size and resume
reconnectTimeout := 2 * time.Minute

pauseCtx, cancel := context.WithTimeout(cctx, reconnectTimeout)
err = handler.Pause(pauseCtx)
cancel()
assert.NoError(t, err)

handler.SetMaxBatchSize(finalBatchSize)

resumeCtx, cancel := context.WithTimeout(cctx, reconnectTimeout)
err = handler.Resume(resumeCtx)
cancel()
assert.NoError(t, err)

// allow processing without nacks
close(process)

// await for subscriber to consume all messages before finishing test
publishFinishTimeout := time.Duration(publish) * time.Second // max one second per message
select {
case <-time.After(publishFinishTimeout):
t.Errorf("timeout after %s", publishFinishTimeout)
return
case <-processingFinshed:
log.Info("processing finished successfully")
}
}

func TestHandlerPauseAndResume(t *testing.T) {
t.Parallel()
var wg sync.WaitGroup
Expand Down
Loading
Loading