Skip to content

Commit

Permalink
feat: hook projection as an option to the engine
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Oct 14, 2023
1 parent a5dbf46 commit 4849356
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 27 deletions.
42 changes: 39 additions & 3 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ import (
"context"
"time"

"github.com/tochemey/ego/offsetstore"

"github.com/pkg/errors"
"github.com/tochemey/ego/eventstore"
"github.com/tochemey/ego/projection"
"github.com/tochemey/goakt/actors"
"github.com/tochemey/goakt/discovery"
"github.com/tochemey/goakt/log"
Expand All @@ -49,7 +48,10 @@ type Engine struct {
telemetry *telemetry.Telemetry // telemetry is the observability engine
partitionsCount uint64 // partitionsCount specifies the number of partitions
started atomic.Bool
offsetStore offsetstore.OffsetStore

// define the list of projections
projections []*Projection
projectionRunners []*projection.Runner
}

// NewEngine creates an instance of Engine
Expand All @@ -61,11 +63,19 @@ func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option)
enableCluster: atomic.NewBool(false),
logger: log.DefaultLogger,
telemetry: telemetry.New(),
projections: make([]*Projection, 0),
}
// apply the various options
for _, opt := range opts {
opt.Apply(e)
}

// set the projection runners
for i, p := range e.projections {
// create the projection instance and add it to the list of runners
e.projectionRunners[i] = projection.NewRunner(p.Name, p.Handler, e.eventsStore, p.OffsetStore, p.Recovery, e.logger)
}

e.started.Store(false)
return e
}
Expand Down Expand Up @@ -103,12 +113,38 @@ func (x *Engine) Start(ctx context.Context) error {
}
// set the started to true
x.started.Store(true)

// start the projections if is set
if len(x.projectionRunners) > 0 {
// simply iterate the list of projections and start them
// fail start when a single projection fail to start
for _, runner := range x.projectionRunners {
// start the runner
if err := runner.Start(ctx); err != nil {
x.logger.Error(errors.Wrapf(err, "failed to start projection=(%s)", runner.Name()))
return err
}
}
}

return nil
}

// Stop stops the ego engine
func (x *Engine) Stop(ctx context.Context) error {
// set the started to false
x.started.Store(false)
// stop the projections
if len(x.projectionRunners) > 0 {
// simply iterate the list of projections and start them
// fail stop when a single projection fail to stop
for _, runner := range x.projectionRunners {
// stop the runner
if err := runner.Stop(ctx); err != nil {
x.logger.Error(errors.Wrapf(err, "failed to stop projection=(%s)", runner.Name()))
return err
}
}
}
return x.actorSystem.Stop(ctx)
}
16 changes: 16 additions & 0 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,19 @@ func WithTelemetry(telemetry *telemetry.Telemetry) Option {
e.telemetry = telemetry
})
}

// WithProjection defines a projection
func WithProjection(projection *Projection) Option {
return OptionFunc(func(e *Engine) {
// discard the projection when the name is already added
for _, p := range e.projections {
// already exist discard this setup
if p.Name == projection.Name {
return
}
}

// add the created projection to the list of projections
e.projections = append(e.projections, projection)
})
}
18 changes: 18 additions & 0 deletions option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/tochemey/ego/offsetstore/memory"
"github.com/tochemey/ego/projection"
"github.com/tochemey/goakt/discovery"
"github.com/tochemey/goakt/discovery/kubernetes"
"github.com/tochemey/goakt/log"
Expand All @@ -40,6 +42,15 @@ func TestOptions(t *testing.T) {
discoveryProvider := kubernetes.NewDiscovery()
config := discovery.NewConfig()
tel := telemetry.New()
offsetStore := memory.NewOffsetStore()
recovery := projection.NewRecovery()
handler := projection.NewDiscardHandler(logger)
projection := &Projection{
Name: "kafka",
Handler: handler,
OffsetStore: offsetStore,
Recovery: recovery,
}

testCases := []struct {
name string
Expand All @@ -66,6 +77,13 @@ func TestOptions(t *testing.T) {
option: WithTelemetry(tel),
expected: Engine{telemetry: tel},
},
{
name: "WithProjection",
option: WithProjection(projection),
expected: Engine{
projections: []*Projection{projection},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
43 changes: 43 additions & 0 deletions projection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2022-2023 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package ego

import (
"github.com/tochemey/ego/offsetstore"
"github.com/tochemey/ego/projection"
)

// Projection is a wrapper for the main projection
// Help pass in projection as an option to the engine
type Projection struct {
// Name defines the name of the projection
// The name of the projection should be unique and meaningful
// Example: kafka means consume persisted events and push them to kafka
Name string
// Handler defines the projection handler
Handler projection.Handler
// OffsetStore defines the offset store
OffsetStore offsetstore.OffsetStore
// Recovery policy
Recovery *projection.Recovery
}
32 changes: 19 additions & 13 deletions projection/projection.go → projection/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

// Projection defines the projection
type Projection struct {
// Name specifies the projection Name
// Runner defines the projection runner
type Runner struct {
// Name specifies the runner Name
name string
// Logger specifies the logger
logger log.Logger
Expand All @@ -61,14 +61,15 @@ type Projection struct {
isStarted *atomic.Bool
}

// New create an instance of Projection given the name of the projection, the handler and the offsets store
func New(name string,
// NewRunner create an instance of Runner given the name of the projection, the handler and the offsets store
// The name of the projection should be unique
func NewRunner(name string,
handler Handler,
eventsStore eventstore.EventsStore,
offsetStore offsetstore.OffsetStore,
recovery *Recovery,
logger log.Logger) *Projection {
return &Projection{
logger log.Logger) *Runner {
return &Runner{
handler: handler,
offsetsStore: offsetStore,
name: name,
Expand All @@ -80,8 +81,8 @@ func New(name string,
}
}

// Start starts the projection
func (p *Projection) Start(ctx context.Context) error {
// Start starts the projection runner
func (p *Runner) Start(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "PreStart")
defer span.End()
Expand Down Expand Up @@ -149,8 +150,8 @@ func (p *Projection) Start(ctx context.Context) error {
return nil
}

// Stop stops the projection
func (p *Projection) Stop(ctx context.Context) error {
// Stop stops the projection runner
func (p *Runner) Stop(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "PostStop")
defer span.End()
Expand Down Expand Up @@ -178,8 +179,13 @@ func (p *Projection) Stop(ctx context.Context) error {
return nil
}

// Name returns the projection runner Name
func (p *Runner) Name() string {
return p.name
}

// processingLoop is a loop that continuously runs to process events persisted onto the journal store until the projection is stopped
func (p *Projection) processingLoop(ctx context.Context) {
func (p *Runner) processingLoop(ctx context.Context) {
for {
select {
case <-p.stopSignal:
Expand Down Expand Up @@ -255,7 +261,7 @@ func (p *Projection) processingLoop(ctx context.Context) {
}

// doProcess processes all events of a given persistent entity and hand them over to the handler
func (p *Projection) doProcess(ctx context.Context, shard uint64) error {
func (p *Runner) doProcess(ctx context.Context, shard uint64) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "HandleShard")
defer span.End()
Expand Down
10 changes: 5 additions & 5 deletions projection/projection_test.go → projection/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestProjection(t *testing.T) {
handler := NewDiscardHandler(logger)

// create an instance of the projection
projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery(), logger)
projection := NewRunner(projectionName, handler, journalStore, offsetStore, NewRecovery(), logger)
// start the projection
err := projection.Start(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestProjection(t *testing.T) {
// create a handler that return successfully
handler := &testHandler1{}

projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery(), logger)
projection := NewRunner(projectionName, handler, journalStore, offsetStore, NewRecovery(), logger)
// start the projection
err := projection.Start(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestProjection(t *testing.T) {
// create a handler that return successfully
handler := &testHandler1{}

projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery(
projection := NewRunner(projectionName, handler, journalStore, offsetStore, NewRecovery(
WithRecoveryPolicy(RetryAndFail),
WithRetries(2),
WithRetryDelay(100*time.Millisecond)), logger)
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestProjection(t *testing.T) {
// create a handler that return successfully
handler := &testHandler2{counter: atomic.NewInt32(0)}

projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery(
projection := NewRunner(projectionName, handler, journalStore, offsetStore, NewRecovery(
WithRecoveryPolicy(Skip),
WithRetries(2),
WithRetryDelay(100*time.Millisecond)), logger)
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestProjection(t *testing.T) {
// create a handler that return successfully
handler := &testHandler2{counter: atomic.NewInt32(0)}

projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery(
projection := NewRunner(projectionName, handler, journalStore, offsetStore, NewRecovery(
WithRecoveryPolicy(RetryAndSkip),
WithRetries(2),
WithRetryDelay(100*time.Millisecond)), logger)
Expand Down
12 changes: 6 additions & 6 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ Under the hood, ego leverages [goakt](https://github.com/Tochemey/goakt) to scal
Event handlers must be pure functions as they will be used when instantiating the Aggregate and replaying the event store.
- Extensible events store
- Built-in events store:
- Postgres: Schema can be found [here](./resources/eventstore_postgres.sql)
- Memory (for testing purpose only)
- [Postgres](./eventstore/postgres/postgres.go): Schema can be found [here](./resources/eventstore_postgres.sql)
- [Memory](./eventstore/memory/memory.go) (for testing purpose only)
- [Cluster Mode](https://github.com/Tochemey/goakt#clustering)
- Read Model:
- Projection: The engine that help build a read model
- Projection:
- Runner: Helps consume and handle events persisted by entity. It depends on the [Offset Store](./offsetstore/iface.go) to track consumers' offset.
- Extensible Offset store: Helps store offsets of events consumed and processed by projections
- Built-in offset stores:
- Postgres: Schema can be found [here](./resources/offsetstore_postgres.sql)
- Memory (for testing purpose only)
- [Postgres](./offsetstore/postgres/postgres.go): Schema can be found [here](./resources/offsetstore_postgres.sql)
- [Memory](./offsetstore/memory/memory.go) (for testing purpose only)
- Examples (check the [examples](./example))

### Installation
Expand Down

0 comments on commit 4849356

Please sign in to comment.