Skip to content

Commit

Permalink
feat: add projection options (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Oct 15, 2023
1 parent 880741c commit cdfdd21
Show file tree
Hide file tree
Showing 18 changed files with 813 additions and 212 deletions.
47 changes: 23 additions & 24 deletions egopb/ego.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,17 @@ func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option)
// set the projection runners
for i, p := range e.projections {
// create the projection instance and add it to the list of runners
e.projectionRunners[i] = projection.NewRunner(p.Name, p.Handler, e.eventsStore, p.OffsetStore, p.Recovery, e.logger)
e.projectionRunners[i] = projection.NewRunner(
p.Name,
p.Handler,
e.eventsStore,
p.OffsetStore,
projection.WithMaxBufferSize(p.MaxBufferSize),
projection.WithRefreshInterval(p.RefreshInterval),
projection.WithRecoveryStrategy(p.Recovery),
projection.WithStartOffset(p.StartOffset),
projection.WithResetOffset(p.ResetOffset),
projection.WithLogger(e.logger))
}

e.started.Store(false)
Expand Down
2 changes: 1 addition & 1 deletion eventstore/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (s *EventsStore) ShardNumbers(ctx context.Context) ([]uint64, error) {
return nil, errors.Wrap(err, "failed to fetch the list of shard number")
}

// loop over the records and delete them
// loop over the records
var journals []*journal
for row := it.Next(); row != nil; row = it.Next() {
if journal, ok := row.(*journal); ok {
Expand Down
6 changes: 4 additions & 2 deletions offsetstore/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ type OffsetStore interface {
Connect(ctx context.Context) error
// Disconnect disconnects the offset store
Disconnect(ctx context.Context) error
// Ping verifies a connection to the database is still alive, establishing a connection if necessary.
Ping(ctx context.Context) error
// WriteOffset writes the current offset of the event consumed for a given projection id
// Note: persistence id and the projection name make a record in the journal store unique. Failure to ensure that
// can lead to some un-wanted behaviors and data inconsistency
WriteOffset(ctx context.Context, offset *egopb.Offset) error
// GetCurrentOffset returns the current offset of a given projection id
GetCurrentOffset(ctx context.Context, projectionID *egopb.ProjectionId) (currentOffset *egopb.Offset, err error)
// Ping verifies a connection to the database is still alive, establishing a connection if necessary.
Ping(ctx context.Context) error
// ResetOffset resets the offset of given projection to a given value across all shards
ResetOffset(ctx context.Context, projectionName string, value int64) error
}
104 changes: 82 additions & 22 deletions offsetstore/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package memory
import (
"context"
"fmt"
"time"

"github.com/google/uuid"
"github.com/hashicorp/go-memdb"
Expand Down Expand Up @@ -58,13 +59,13 @@ func NewOffsetStore() *OffsetStore {
}

// Connect connects to the offset store
func (s *OffsetStore) Connect(ctx context.Context) error {
func (x *OffsetStore) Connect(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "OffsetStore.Connect")
defer span.End()

// check whether this instance of the journal is connected or not
if s.connected.Load() {
if x.connected.Load() {
return nil
}

Expand All @@ -75,29 +76,29 @@ func (s *OffsetStore) Connect(ctx context.Context) error {
return err
}
// set the journal store underlying database
s.db = db
x.db = db

// set the connection status
s.connected.Store(true)
x.connected.Store(true)

return nil
}

// Disconnect disconnects the offset store
func (s *OffsetStore) Disconnect(ctx context.Context) error {
func (x *OffsetStore) Disconnect(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "OffsetStore.Disconnect")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
if !x.connected.Load() {
return nil
}

// clear all records
if !s.KeepRecordsAfterDisconnect {
if !x.KeepRecordsAfterDisconnect {
// spawn a db transaction for read-only
txn := s.db.Txn(true)
txn := x.db.Txn(true)

// free memory resource
if _, err := txn.DeleteAll(offsetTableName, offsetPK); err != nil {
Expand All @@ -107,45 +108,46 @@ func (s *OffsetStore) Disconnect(ctx context.Context) error {
txn.Commit()
}
// set the connection status
s.connected.Store(false)
x.connected.Store(false)

return nil
}

// Ping verifies a connection to the database is still alive, establishing a connection if necessary.
func (s *OffsetStore) Ping(ctx context.Context) error {
func (x *OffsetStore) Ping(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "OffsetStore.Ping")
defer span.End()

// check whether we are connected or not
if !s.connected.Load() {
return s.Connect(ctx)
if !x.connected.Load() {
return x.Connect(ctx)
}

return nil
}

// WriteOffset writes an offset to the offset store
func (s *OffsetStore) WriteOffset(ctx context.Context, offset *egopb.Offset) error {
func (x *OffsetStore) WriteOffset(ctx context.Context, offset *egopb.Offset) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "OffsetStore.WriteOffset")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
if !x.connected.Load() {
return errors.New("offset store is not connected")
}

// spawn a db transaction
txn := s.db.Txn(true)
txn := x.db.Txn(true)

// create an offset row
record := &offsetRow{
Ordering: uuid.NewString(),
ProjectionName: offset.GetProjectionName(),
ShardNumber: offset.GetShardNumber(),
CurrentOffset: offset.GetCurrentOffset(),
LastUpdated: offset.GetTimestamp(),
Value: offset.GetValue(),
Timestamp: offset.GetTimestamp(),
}

// persist the record
Expand All @@ -162,18 +164,18 @@ func (s *OffsetStore) WriteOffset(ctx context.Context, offset *egopb.Offset) err
}

// GetCurrentOffset return the offset of a projection
func (s *OffsetStore) GetCurrentOffset(ctx context.Context, projectionID *egopb.ProjectionId) (current *egopb.Offset, err error) {
func (x *OffsetStore) GetCurrentOffset(ctx context.Context, projectionID *egopb.ProjectionId) (current *egopb.Offset, err error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "OffsetStore.GetCurrentOffset")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
if !x.connected.Load() {
return nil, errors.New("offset store is not connected")
}

// spawn a db transaction for read-only
txn := s.db.Txn(false)
txn := x.db.Txn(false)
defer txn.Abort()
// let us fetch the last record
raw, err := txn.Last(offsetTableName, rowIndex, projectionID.GetProjectionName(), projectionID.GetShardNumber())
Expand All @@ -196,12 +198,70 @@ func (s *OffsetStore) GetCurrentOffset(ctx context.Context, projectionID *egopb.
current = &egopb.Offset{
ShardNumber: offsetRow.ShardNumber,
ProjectionName: offsetRow.ProjectionName,
CurrentOffset: offsetRow.CurrentOffset,
Timestamp: offsetRow.LastUpdated,
Value: offsetRow.Value,
Timestamp: offsetRow.Timestamp,
}
return
}

return nil, fmt.Errorf("failed to get the current offset for shard=%d given projection=%s",
projectionID.GetShardNumber(), projectionID.GetProjectionName())
}

// ResetOffset resets the offset of given projection to a given value across all shards
func (x *OffsetStore) ResetOffset(ctx context.Context, projectionName string, value int64) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "offsetStore.ResetOffset")
defer span.End()

// check whether this instance of the offset store is connected or not
if !x.connected.Load() {
return errors.New("offset store is not connected")
}

// spawn a db transaction for read-only
txn := x.db.Txn(false)
// fetch all the records for the given projection
it, err := txn.Get(offsetTableName, projectionNameIndex, projectionName)
// handle the error
if err != nil {
// abort the transaction
txn.Abort()
return errors.Wrap(err, "failed to fetch the list of shard number")
}

// loop over the records
var offsetRows []*offsetRow
for row := it.Next(); row != nil; row = it.Next() {
if journal, ok := row.(*offsetRow); ok {
offsetRows = append(offsetRows, journal)
}
}
// let us abort the transaction after fetching the matching records
txn.Abort()

// update the records
ts := time.Now().UnixMilli()
for _, row := range offsetRows {
row.Value = value
row.Timestamp = ts
}

// spawn a db write transaction
txn = x.db.Txn(true)
// iterate the list of offset rows and update the values
for _, row := range offsetRows {
// persist the record
if err := txn.Insert(offsetTableName, row); err != nil {
// abort the transaction
txn.Abort()
// return the error
return errors.Wrap(err, "failed to persist offset record on to the offset store")
}
}

// commit the transaction
txn.Commit()

return nil
}
Loading

0 comments on commit cdfdd21

Please sign in to comment.