From 484935653f60621a1ed0a48082d7f5cba3130802 Mon Sep 17 00:00:00 2001 From: tochemey Date: Sat, 14 Oct 2023 16:29:06 +0000 Subject: [PATCH] feat: hook projection as an option to the engine --- engine.go | 42 ++++++++++++++++-- option.go | 16 +++++++ option_test.go | 18 ++++++++ projection.go | 43 +++++++++++++++++++ projection/{projection.go => runner.go} | 32 ++++++++------ .../{projection_test.go => runner_test.go} | 10 ++--- readme.md | 12 +++--- 7 files changed, 146 insertions(+), 27 deletions(-) create mode 100644 projection.go rename projection/{projection.go => runner.go} (93%) rename projection/{projection_test.go => runner_test.go} (95%) diff --git a/engine.go b/engine.go index fdb3794..acf1021 100644 --- a/engine.go +++ b/engine.go @@ -26,10 +26,9 @@ import ( "context" "time" - "github.com/tochemey/ego/offsetstore" - "github.com/pkg/errors" "github.com/tochemey/ego/eventstore" + "github.com/tochemey/ego/projection" "github.com/tochemey/goakt/actors" "github.com/tochemey/goakt/discovery" "github.com/tochemey/goakt/log" @@ -49,7 +48,10 @@ type Engine struct { telemetry *telemetry.Telemetry // telemetry is the observability engine partitionsCount uint64 // partitionsCount specifies the number of partitions started atomic.Bool - offsetStore offsetstore.OffsetStore + + // define the list of projections + projections []*Projection + projectionRunners []*projection.Runner } // NewEngine creates an instance of Engine @@ -61,11 +63,19 @@ func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option) enableCluster: atomic.NewBool(false), logger: log.DefaultLogger, telemetry: telemetry.New(), + projections: make([]*Projection, 0), } // apply the various options for _, opt := range opts { opt.Apply(e) } + + // set the projection runners + for i, p := range e.projections { + // create the projection instance and add it to the list of runners + e.projectionRunners[i] = projection.NewRunner(p.Name, p.Handler, e.eventsStore, p.OffsetStore, p.Recovery, e.logger) + } + e.started.Store(false) return e } @@ -103,6 +113,20 @@ func (x *Engine) Start(ctx context.Context) error { } // set the started to true x.started.Store(true) + + // start the projections if is set + if len(x.projectionRunners) > 0 { + // simply iterate the list of projections and start them + // fail start when a single projection fail to start + for _, runner := range x.projectionRunners { + // start the runner + if err := runner.Start(ctx); err != nil { + x.logger.Error(errors.Wrapf(err, "failed to start projection=(%s)", runner.Name())) + return err + } + } + } + return nil } @@ -110,5 +134,17 @@ func (x *Engine) Start(ctx context.Context) error { func (x *Engine) Stop(ctx context.Context) error { // set the started to false x.started.Store(false) + // stop the projections + if len(x.projectionRunners) > 0 { + // simply iterate the list of projections and start them + // fail stop when a single projection fail to stop + for _, runner := range x.projectionRunners { + // stop the runner + if err := runner.Stop(ctx); err != nil { + x.logger.Error(errors.Wrapf(err, "failed to stop projection=(%s)", runner.Name())) + return err + } + } + } return x.actorSystem.Stop(ctx) } diff --git a/option.go b/option.go index 32ece14..16c3d8f 100644 --- a/option.go +++ b/option.go @@ -68,3 +68,19 @@ func WithTelemetry(telemetry *telemetry.Telemetry) Option { e.telemetry = telemetry }) } + +// WithProjection defines a projection +func WithProjection(projection *Projection) Option { + return OptionFunc(func(e *Engine) { + // discard the projection when the name is already added + for _, p := range e.projections { + // already exist discard this setup + if p.Name == projection.Name { + return + } + } + + // add the created projection to the list of projections + e.projections = append(e.projections, projection) + }) +} diff --git a/option_test.go b/option_test.go index ba20b7a..84f1b3d 100644 --- a/option_test.go +++ b/option_test.go @@ -26,6 +26,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/tochemey/ego/offsetstore/memory" + "github.com/tochemey/ego/projection" "github.com/tochemey/goakt/discovery" "github.com/tochemey/goakt/discovery/kubernetes" "github.com/tochemey/goakt/log" @@ -40,6 +42,15 @@ func TestOptions(t *testing.T) { discoveryProvider := kubernetes.NewDiscovery() config := discovery.NewConfig() tel := telemetry.New() + offsetStore := memory.NewOffsetStore() + recovery := projection.NewRecovery() + handler := projection.NewDiscardHandler(logger) + projection := &Projection{ + Name: "kafka", + Handler: handler, + OffsetStore: offsetStore, + Recovery: recovery, + } testCases := []struct { name string @@ -66,6 +77,13 @@ func TestOptions(t *testing.T) { option: WithTelemetry(tel), expected: Engine{telemetry: tel}, }, + { + name: "WithProjection", + option: WithProjection(projection), + expected: Engine{ + projections: []*Projection{projection}, + }, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/projection.go b/projection.go new file mode 100644 index 0000000..795e71a --- /dev/null +++ b/projection.go @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2022-2023 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package ego + +import ( + "github.com/tochemey/ego/offsetstore" + "github.com/tochemey/ego/projection" +) + +// Projection is a wrapper for the main projection +// Help pass in projection as an option to the engine +type Projection struct { + // Name defines the name of the projection + // The name of the projection should be unique and meaningful + // Example: kafka means consume persisted events and push them to kafka + Name string + // Handler defines the projection handler + Handler projection.Handler + // OffsetStore defines the offset store + OffsetStore offsetstore.OffsetStore + // Recovery policy + Recovery *projection.Recovery +} diff --git a/projection/projection.go b/projection/runner.go similarity index 93% rename from projection/projection.go rename to projection/runner.go index d6be640..6b5774d 100644 --- a/projection/projection.go +++ b/projection/runner.go @@ -41,9 +41,9 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -// Projection defines the projection -type Projection struct { - // Name specifies the projection Name +// Runner defines the projection runner +type Runner struct { + // Name specifies the runner Name name string // Logger specifies the logger logger log.Logger @@ -61,14 +61,15 @@ type Projection struct { isStarted *atomic.Bool } -// New create an instance of Projection given the name of the projection, the handler and the offsets store -func New(name string, +// NewRunner create an instance of Runner given the name of the projection, the handler and the offsets store +// The name of the projection should be unique +func NewRunner(name string, handler Handler, eventsStore eventstore.EventsStore, offsetStore offsetstore.OffsetStore, recovery *Recovery, - logger log.Logger) *Projection { - return &Projection{ + logger log.Logger) *Runner { + return &Runner{ handler: handler, offsetsStore: offsetStore, name: name, @@ -80,8 +81,8 @@ func New(name string, } } -// Start starts the projection -func (p *Projection) Start(ctx context.Context) error { +// Start starts the projection runner +func (p *Runner) Start(ctx context.Context) error { // add a span context ctx, span := telemetry.SpanContext(ctx, "PreStart") defer span.End() @@ -149,8 +150,8 @@ func (p *Projection) Start(ctx context.Context) error { return nil } -// Stop stops the projection -func (p *Projection) Stop(ctx context.Context) error { +// Stop stops the projection runner +func (p *Runner) Stop(ctx context.Context) error { // add a span context ctx, span := telemetry.SpanContext(ctx, "PostStop") defer span.End() @@ -178,8 +179,13 @@ func (p *Projection) Stop(ctx context.Context) error { return nil } +// Name returns the projection runner Name +func (p *Runner) Name() string { + return p.name +} + // processingLoop is a loop that continuously runs to process events persisted onto the journal store until the projection is stopped -func (p *Projection) processingLoop(ctx context.Context) { +func (p *Runner) processingLoop(ctx context.Context) { for { select { case <-p.stopSignal: @@ -255,7 +261,7 @@ func (p *Projection) processingLoop(ctx context.Context) { } // doProcess processes all events of a given persistent entity and hand them over to the handler -func (p *Projection) doProcess(ctx context.Context, shard uint64) error { +func (p *Runner) doProcess(ctx context.Context, shard uint64) error { // add a span context ctx, span := telemetry.SpanContext(ctx, "HandleShard") defer span.End() diff --git a/projection/projection_test.go b/projection/runner_test.go similarity index 95% rename from projection/projection_test.go rename to projection/runner_test.go index 7b2142f..93a4028 100644 --- a/projection/projection_test.go +++ b/projection/runner_test.go @@ -64,7 +64,7 @@ func TestProjection(t *testing.T) { handler := NewDiscardHandler(logger) // create an instance of the projection - projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery(), logger) + projection := NewRunner(projectionName, handler, journalStore, offsetStore, NewRecovery(), logger) // start the projection err := projection.Start(ctx) require.NoError(t, err) @@ -133,7 +133,7 @@ func TestProjection(t *testing.T) { // create a handler that return successfully handler := &testHandler1{} - projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery(), logger) + projection := NewRunner(projectionName, handler, journalStore, offsetStore, NewRecovery(), logger) // start the projection err := projection.Start(ctx) require.NoError(t, err) @@ -189,7 +189,7 @@ func TestProjection(t *testing.T) { // create a handler that return successfully handler := &testHandler1{} - projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery( + projection := NewRunner(projectionName, handler, journalStore, offsetStore, NewRecovery( WithRecoveryPolicy(RetryAndFail), WithRetries(2), WithRetryDelay(100*time.Millisecond)), logger) @@ -251,7 +251,7 @@ func TestProjection(t *testing.T) { // create a handler that return successfully handler := &testHandler2{counter: atomic.NewInt32(0)} - projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery( + projection := NewRunner(projectionName, handler, journalStore, offsetStore, NewRecovery( WithRecoveryPolicy(Skip), WithRetries(2), WithRetryDelay(100*time.Millisecond)), logger) @@ -321,7 +321,7 @@ func TestProjection(t *testing.T) { // create a handler that return successfully handler := &testHandler2{counter: atomic.NewInt32(0)} - projection := New(projectionName, handler, journalStore, offsetStore, NewRecovery( + projection := NewRunner(projectionName, handler, journalStore, offsetStore, NewRecovery( WithRecoveryPolicy(RetryAndSkip), WithRetries(2), WithRetryDelay(100*time.Millisecond)), logger) diff --git a/readme.md b/readme.md index e1c0ad9..d5d581a 100644 --- a/readme.md +++ b/readme.md @@ -21,15 +21,15 @@ Under the hood, ego leverages [goakt](https://github.com/Tochemey/goakt) to scal Event handlers must be pure functions as they will be used when instantiating the Aggregate and replaying the event store. - Extensible events store - Built-in events store: - - Postgres: Schema can be found [here](./resources/eventstore_postgres.sql) - - Memory (for testing purpose only) + - [Postgres](./eventstore/postgres/postgres.go): Schema can be found [here](./resources/eventstore_postgres.sql) + - [Memory](./eventstore/memory/memory.go) (for testing purpose only) - [Cluster Mode](https://github.com/Tochemey/goakt#clustering) -- Read Model: - - Projection: The engine that help build a read model +- Projection: + - Runner: Helps consume and handle events persisted by entity. It depends on the [Offset Store](./offsetstore/iface.go) to track consumers' offset. - Extensible Offset store: Helps store offsets of events consumed and processed by projections - Built-in offset stores: - - Postgres: Schema can be found [here](./resources/offsetstore_postgres.sql) - - Memory (for testing purpose only) + - [Postgres](./offsetstore/postgres/postgres.go): Schema can be found [here](./resources/offsetstore_postgres.sql) + - [Memory](./offsetstore/memory/memory.go) (for testing purpose only) - Examples (check the [examples](./example)) ### Installation