Skip to content

Commit

Permalink
refactor: refactor projection handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Oct 14, 2023
1 parent 22f24be commit a5dbf46
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 43 deletions.
3 changes: 3 additions & 0 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"context"
"time"

"github.com/tochemey/ego/offsetstore"

"github.com/pkg/errors"
"github.com/tochemey/ego/eventstore"
"github.com/tochemey/goakt/actors"
Expand All @@ -47,6 +49,7 @@ type Engine struct {
telemetry *telemetry.Telemetry // telemetry is the observability engine
partitionsCount uint64 // partitionsCount specifies the number of partitions
started atomic.Bool
offsetStore offsetstore.OffsetStore
}

// NewEngine creates an instance of Engine
Expand Down
72 changes: 72 additions & 0 deletions projection/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2022-2023 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package projection

import (
"context"

"github.com/tochemey/goakt/log"
"go.uber.org/atomic"
"google.golang.org/protobuf/types/known/anypb"
)

// Handler is used to handle event and state consumed from the event store
type Handler interface {
// Handle handles the event that is consumed by the projection
Handle(ctx context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, revision uint64) error
}

// DiscardHandler implements the projection Handler interface
// This handler really does nothing with the consumed event
// Note: this will be useful when writing unit tests
type DiscardHandler struct {
eventsCounter *atomic.Int64
logger log.Logger
}

// enforce the complete implementation of the Handler interface
var _ Handler = (*DiscardHandler)(nil)

// NewDiscardHandler creates an instance of DiscardHandler
func NewDiscardHandler(logger log.Logger) *DiscardHandler {
return &DiscardHandler{
eventsCounter: atomic.NewInt64(0),
logger: logger,
}
}

// Handle handles the events consumed
func (x *DiscardHandler) Handle(_ context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, revision uint64) error {
// add some logging information
x.logger.Debugf("handling event=(%s) revision=(%d) with resulting state=(%s) of persistenceId=(%s)",
event.GetTypeUrl(), revision, state.GetTypeUrl(), persistenceID)
// increase the counter
x.eventsCounter.Inc()
// return successful process
return nil
}

// EventsCount returns the number of events processed
func (x *DiscardHandler) EventsCount() int {
return int(x.eventsCounter.Load())
}
22 changes: 9 additions & 13 deletions projection/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,9 @@ import (
"github.com/tochemey/goakt/log"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

// Handler is used to handle event and state consumed from the event store
type Handler func(ctx context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, revision uint64) error

// Projection defines the projection
type Projection struct {
// Name specifies the projection Name
Expand Down Expand Up @@ -227,7 +223,7 @@ func (p *Projection) processingLoop(ctx context.Context) {
})

// Start a fixed number of goroutines process the shards.
for i := 0; i < 20; i++ {
for i := 0; i < 5; i++ {
for shard := range shardsChan {
g.Go(func() error {
return p.doProcess(ctx, shard)
Expand Down Expand Up @@ -309,8 +305,8 @@ func (p *Projection) doProcess(ctx context.Context, shard uint64) error {
switch p.recovery.RecoveryPolicy() {
case Fail:
// send the data to the handler. In case of error we log the error and fail the projection
if err := p.handler(ctx, persistenceID, event, state, seqNr); err != nil {
p.logger.Error(errors.Wrapf(err, "failed to process event for persistence id=%s, sequence=%d", persistenceID, seqNr))
if err := p.handler.Handle(ctx, persistenceID, event, state, seqNr); err != nil {
p.logger.Error(errors.Wrapf(err, "failed to process event for persistence id=%s, revision=%d", persistenceID, seqNr))
return err
}

Expand All @@ -324,8 +320,8 @@ func (p *Projection) doProcess(ctx context.Context, shard uint64) error {
// pass the data to the projection handler
if err := backoff.Run(func() error {
// handle the projection handler error
if err := p.handler(ctx, persistenceID, event, state, seqNr); err != nil {
p.logger.Error(errors.Wrapf(err, "failed to process event for persistence id=%s, sequence=%d", persistenceID, seqNr))
if err := p.handler.Handle(ctx, persistenceID, event, state, seqNr); err != nil {
p.logger.Error(errors.Wrapf(err, "failed to process event for persistence id=%s, revision=%d", persistenceID, seqNr))
return err
}
return nil
Expand All @@ -343,16 +339,16 @@ func (p *Projection) doProcess(ctx context.Context, shard uint64) error {
backoff := retry.NewRetrier(int(retries), 100*time.Millisecond, delay)
// pass the data to the projection handler
if err := backoff.Run(func() error {
return p.handler(ctx, persistenceID, event, state, seqNr)
return p.handler.Handle(ctx, persistenceID, event, state, seqNr)
}); err != nil {
// here we just log the error, but we skip the event and commit the offset
p.logger.Error(errors.Wrapf(err, "failed to process event for persistence id=%s, sequence=%d", persistenceID, seqNr))
p.logger.Error(errors.Wrapf(err, "failed to process event for persistence id=%s, revision=%d", persistenceID, seqNr))
}

case Skip:
// send the data to the handler. In case of error we just log the error and skip the event by committing the offset
if err := p.handler(ctx, persistenceID, event, state, seqNr); err != nil {
p.logger.Error(errors.Wrapf(err, "failed to process event for persistence id=%s, sequence=%d", persistenceID, seqNr))
if err := p.handler.Handle(ctx, persistenceID, event, state, seqNr); err != nil {
p.logger.Error(errors.Wrapf(err, "failed to process event for persistence id=%s, revision=%d", persistenceID, seqNr))
}
}
// the envelope has been successfully processed
Expand Down
58 changes: 28 additions & 30 deletions projection/projection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,9 @@ func TestProjection(t *testing.T) {
offsetStore := memoffsetstore.NewOffsetStore()
assert.NotNil(t, offsetStore)

var counter atomic.Int32
// set up the projection
// create a handler that return successfully
handler := func(ctx context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, sequenceNumber uint64) error {
counter.Inc()
return nil
}
handler := NewDiscardHandler(logger)

// create an instance of the projection
projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery(), logger)
Expand Down Expand Up @@ -113,7 +109,7 @@ func TestProjection(t *testing.T) {
assert.NotNil(t, actual)
assert.EqualValues(t, journals[9].GetTimestamp(), actual.GetCurrentOffset())

assert.EqualValues(t, 10, counter.Load())
assert.EqualValues(t, 10, handler.EventsCount())

// free resources
assert.NoError(t, projection.Stop(ctx))
Expand All @@ -135,9 +131,7 @@ func TestProjection(t *testing.T) {

// set up the projection
// create a handler that return successfully
handler := func(ctx context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, sequenceNumber uint64) error {
return errors.New("damn")
}
handler := &testHandler1{}

projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery(), logger)
// start the projection
Expand Down Expand Up @@ -193,9 +187,7 @@ func TestProjection(t *testing.T) {

// set up the projection
// create a handler that return successfully
handler := func(ctx context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, sequenceNumber uint64) error {
return errors.New("damn")
}
handler := &testHandler1{}

projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery(
WithRecoveryPolicy(RetryAndFail),
Expand Down Expand Up @@ -256,15 +248,8 @@ func TestProjection(t *testing.T) {
assert.NotNil(t, offsetStore)

// set up the projection
var counter atomic.Int32
// create a handler that return successfully
handler := func(ctx context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, sequenceNumber uint64) error {
if (int(sequenceNumber) % 2) == 0 {
return errors.New("failed handler")
}
counter.Inc()
return nil
}
handler := &testHandler2{counter: atomic.NewInt32(0)}

projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery(
WithRecoveryPolicy(Skip),
Expand Down Expand Up @@ -311,7 +296,7 @@ func TestProjection(t *testing.T) {
actual, err := offsetStore.GetCurrentOffset(ctx, projectionID)
assert.NoError(t, err)
assert.NotNil(t, actual)
assert.EqualValues(t, 5, counter.Load())
assert.EqualValues(t, 5, handler.counter.Load())

// free resource
assert.NoError(t, projection.Stop(ctx))
Expand All @@ -333,15 +318,8 @@ func TestProjection(t *testing.T) {
assert.NotNil(t, offsetStore)

// set up the projection
var counter atomic.Int32
// create a handler that return successfully
handler := func(ctx context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, sequenceNumber uint64) error {
if (int(sequenceNumber) % 2) == 0 {
return errors.New("failed handler")
}
counter.Inc()
return nil
}
handler := &testHandler2{counter: atomic.NewInt32(0)}

projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery(
WithRecoveryPolicy(RetryAndSkip),
Expand Down Expand Up @@ -388,9 +366,29 @@ func TestProjection(t *testing.T) {
actual, err := offsetStore.GetCurrentOffset(ctx, projectionID)
assert.NoError(t, err)
assert.NotNil(t, actual)
assert.EqualValues(t, 5, counter.Load())
assert.EqualValues(t, 5, handler.counter.Load())

// free resource
assert.NoError(t, projection.Stop(ctx))
})
}

type testHandler1 struct{}

var _ Handler = &testHandler1{}

func (x testHandler1) Handle(_ context.Context, _ string, _ *anypb.Any, _ *anypb.Any, _ uint64) error {
return errors.New("damn")
}

type testHandler2 struct {
counter *atomic.Int32
}

func (x testHandler2) Handle(_ context.Context, _ string, _ *anypb.Any, _ *anypb.Any, revision uint64) error {
if (int(revision) % 2) == 0 {
return errors.New("failed handler")
}
x.counter.Inc()
return nil
}

0 comments on commit a5dbf46

Please sign in to comment.