Skip to content

Commit

Permalink
refactor: upgrade Go-Akt to 2.6.0 and remove unnecessary code (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Aug 26, 2024
1 parent 9d13afb commit 81ecfb6
Show file tree
Hide file tree
Showing 14 changed files with 141 additions and 284 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
env:
FORCE_COLOR: 1
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4
- uses: actions/checkout@v4
with:
submodules: false
fetch-depth: 0
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
env:
FORCE_COLOR: 1
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4
- uses: actions/checkout@v4
with:
submodules: false
fetch-depth: 0
Expand Down
8 changes: 4 additions & 4 deletions actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (entity *actor) PreStart(ctx context.Context) error {
}

// Receive processes any message dropped into the actor mailbox.
func (entity *actor) Receive(ctx actors.ReceiveContext) {
func (entity *actor) Receive(ctx *actors.ReceiveContext) {
_, span := telemetry.SpanContext(ctx.Context(), "Receive")
defer span.End()

Expand Down Expand Up @@ -157,7 +157,7 @@ func (entity *actor) recoverFromSnapshot(ctx context.Context) error {
}

// sendErrorReply sends an error as a reply message
func (entity *actor) sendErrorReply(ctx actors.ReceiveContext, err error) {
func (entity *actor) sendErrorReply(ctx *actors.ReceiveContext, err error) {
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_ErrorReply{
ErrorReply: &egopb.ErrorReply{
Expand All @@ -170,7 +170,7 @@ func (entity *actor) sendErrorReply(ctx actors.ReceiveContext, err error) {
}

// getStateAndReply returns the current state of the entity
func (entity *actor) getStateAndReply(ctx actors.ReceiveContext) {
func (entity *actor) getStateAndReply(ctx *actors.ReceiveContext) {
latestEvent, err := entity.eventsStore.GetLatestEvent(ctx.Context(), entity.ID())
if err != nil {
entity.sendErrorReply(ctx, err)
Expand All @@ -193,7 +193,7 @@ func (entity *actor) getStateAndReply(ctx actors.ReceiveContext) {
}

// processCommandAndReply processes the incoming command
func (entity *actor) processCommandAndReply(ctx actors.ReceiveContext, command Command) {
func (entity *actor) processCommandAndReply(ctx *actors.ReceiveContext, command Command) {
goCtx := ctx.Context()
events, err := entity.HandleCommand(goCtx, command, entity.currentState)
if err != nil {
Expand Down
20 changes: 5 additions & 15 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/tochemey/ego/v3/egopb"
"github.com/tochemey/ego/v3/eventstore"
"github.com/tochemey/ego/v3/eventstream"
egotel "github.com/tochemey/ego/v3/internal/telemetry"
"github.com/tochemey/ego/v3/offsetstore"
"github.com/tochemey/ego/v3/projection"
)
Expand Down Expand Up @@ -85,7 +84,7 @@ func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option)
name: name,
eventsStore: eventsStore,
enableCluster: atomic.NewBool(false),
logger: log.DefaultLogger,
logger: log.New(log.InfoLevel, os.Stderr),
telemetry: telemetry.New(),
eventStream: eventstream.New(),
locker: &sync.Mutex{},
Expand Down Expand Up @@ -153,9 +152,6 @@ func (x *Engine) Start(ctx context.Context) error {

// AddProjection add a projection to the running eGo engine and start it
func (x *Engine) AddProjection(ctx context.Context, name string, handler projection.Handler, offsetStore offsetstore.OffsetStore, opts ...projection.Option) error {
spanCtx, span := egotel.SpanContext(ctx, "AddProjection")
defer span.End()

x.locker.Lock()
started := x.started.Load()
x.locker.Unlock()
Expand All @@ -166,20 +162,20 @@ func (x *Engine) AddProjection(ctx context.Context, name string, handler project
actor := projection.New(name, handler, x.eventsStore, offsetStore, opts...)

var (
pid actors.PID
pid *actors.PID
err error
)

x.locker.Lock()
actorSystem := x.actorSystem
x.locker.Unlock()

if pid, err = actorSystem.Spawn(spanCtx, name, actor); err != nil {
if pid, err = actorSystem.Spawn(ctx, name, actor); err != nil {
x.logger.Error(fmt.Errorf("failed to register the projection=(%s): %w", name, err))
return err
}

if err := actors.Tell(spanCtx, pid, projection.Start); err != nil {
if err := actors.Tell(ctx, pid, projection.Start); err != nil {
x.logger.Error(fmt.Errorf("failed to start the projection=(%s): %w", name, err))
return err
}
Expand All @@ -195,10 +191,7 @@ func (x *Engine) Stop(ctx context.Context) error {
}

// Subscribe creates an events subscriber
func (x *Engine) Subscribe(ctx context.Context) (eventstream.Subscriber, error) {
_, span := egotel.SpanContext(ctx, "Subscribe")
defer span.End()

func (x *Engine) Subscribe() (eventstream.Subscriber, error) {
x.locker.Lock()
started := x.started.Load()
x.locker.Unlock()
Expand Down Expand Up @@ -251,9 +244,6 @@ func (x *Engine) Entity(ctx context.Context, behavior EntityBehavior) error {
// 2. nil when there is no resulting state or no event persisted
// 3. an error in case of error
func (x *Engine) SendCommand(ctx context.Context, entityID string, cmd Command, timeout time.Duration) (resultingState State, revision uint64, err error) {
ctx, span := egotel.SpanContext(ctx, "SendCommand")
defer span.End()

x.locker.Lock()
started := x.started.Load()
x.locker.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestEgo(t *testing.T) {
require.NoError(t, err)

// subscribe to events
subscriber, err := engine.Subscribe(ctx)
subscriber, err := engine.Subscribe()
require.NoError(t, err)
require.NotNil(t, subscriber)

Expand Down
61 changes: 10 additions & 51 deletions eventstore/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (

"github.com/tochemey/ego/v3/egopb"
"github.com/tochemey/ego/v3/eventstore"
"github.com/tochemey/ego/v3/internal/telemetry"
)

// EventsStore keep in memory every journal
Expand All @@ -68,11 +67,7 @@ func NewEventsStore() *EventsStore {
}

// Connect connects to the journal store
func (s *EventsStore) Connect(ctx context.Context) error {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.Connect")
defer span.End()

func (s *EventsStore) Connect(context.Context) error {
// check whether this instance of the journal is connected or not
if s.connected.Load() {
return nil
Expand All @@ -94,11 +89,7 @@ func (s *EventsStore) Connect(ctx context.Context) error {
}

// Disconnect disconnect the journal store
func (s *EventsStore) Disconnect(ctx context.Context) error {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.Disconnect")
defer span.End()

func (s *EventsStore) Disconnect(context.Context) error {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil
Expand All @@ -125,25 +116,17 @@ func (s *EventsStore) Disconnect(ctx context.Context) error {

// Ping verifies a connection to the database is still alive, establishing a connection if necessary.
func (s *EventsStore) Ping(ctx context.Context) error {
// add a span context
spanCtx, span := telemetry.SpanContext(ctx, "eventsStore.Ping")
defer span.End()

// check whether we are connected or not
if !s.connected.Load() {
return s.Connect(spanCtx)
return s.Connect(ctx)
}

return nil
}

// PersistenceIDs returns the distinct list of all the persistence ids in the journal store
// FIXME: enhance the implementation. As it stands it will be a bit slow when there are a lot of records
func (s *EventsStore) PersistenceIDs(ctx context.Context, pageSize uint64, pageToken string) (persistenceIDs []string, nextPageToken string, err error) {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.PersistenceIDs")
defer span.End()

func (s *EventsStore) PersistenceIDs(_ context.Context, pageSize uint64, pageToken string) (persistenceIDs []string, nextPageToken string, err error) {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, "", errors.New("journal store is not connected")
Expand Down Expand Up @@ -214,11 +197,7 @@ func (s *EventsStore) PersistenceIDs(ctx context.Context, pageSize uint64, pageT
}

// WriteEvents persist events in batches for a given persistenceID
func (s *EventsStore) WriteEvents(ctx context.Context, events []*egopb.Event) error {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.WriteEvents")
defer span.End()

func (s *EventsStore) WriteEvents(_ context.Context, events []*egopb.Event) error {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return errors.New("journal store is not connected")
Expand Down Expand Up @@ -266,11 +245,7 @@ func (s *EventsStore) WriteEvents(ctx context.Context, events []*egopb.Event) er

// DeleteEvents deletes events from the store upt to a given sequence number (inclusive)
// FIXME: enhance the implementation. As it stands it may be a bit slow when there are a lot of records
func (s *EventsStore) DeleteEvents(ctx context.Context, persistenceID string, toSequenceNumber uint64) error {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.DeleteEvents")
defer span.End()

func (s *EventsStore) DeleteEvents(_ context.Context, persistenceID string, toSequenceNumber uint64) error {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return errors.New("journal store is not connected")
Expand Down Expand Up @@ -319,11 +294,7 @@ func (s *EventsStore) DeleteEvents(ctx context.Context, persistenceID string, to
}

// ReplayEvents fetches events for a given persistence ID from a given sequence number(inclusive) to a given sequence number(inclusive)
func (s *EventsStore) ReplayEvents(ctx context.Context, persistenceID string, fromSequenceNumber, toSequenceNumber uint64, max uint64) ([]*egopb.Event, error) {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.ReplayEvents")
defer span.End()

func (s *EventsStore) ReplayEvents(_ context.Context, persistenceID string, fromSequenceNumber, toSequenceNumber uint64, max uint64) ([]*egopb.Event, error) {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, errors.New("journal store is not connected")
Expand Down Expand Up @@ -392,11 +363,7 @@ func (s *EventsStore) ReplayEvents(ctx context.Context, persistenceID string, fr
}

// GetLatestEvent fetches the latest event
func (s *EventsStore) GetLatestEvent(ctx context.Context, persistenceID string) (*egopb.Event, error) {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.GetLatestEvent")
defer span.End()

func (s *EventsStore) GetLatestEvent(_ context.Context, persistenceID string) (*egopb.Event, error) {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, errors.New("journal store is not connected")
Expand Down Expand Up @@ -447,11 +414,7 @@ func (s *EventsStore) GetLatestEvent(ctx context.Context, persistenceID string)
}

// GetShardEvents returns the next (max) events after the offset in the journal for a given shard
func (s *EventsStore) GetShardEvents(ctx context.Context, shardNumber uint64, offset int64, max uint64) ([]*egopb.Event, int64, error) {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.GetShardEvents")
defer span.End()

func (s *EventsStore) GetShardEvents(_ context.Context, shardNumber uint64, offset int64, max uint64) ([]*egopb.Event, int64, error) {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, 0, errors.New("journal store is not connected")
Expand Down Expand Up @@ -533,11 +496,7 @@ func (s *EventsStore) GetShardEvents(ctx context.Context, shardNumber uint64, of
}

// ShardNumbers returns the distinct list of all the shards in the journal store
func (s *EventsStore) ShardNumbers(ctx context.Context) ([]uint64, error) {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.NumShards")
defer span.End()

func (s *EventsStore) ShardNumbers(context.Context) ([]uint64, error) {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, errors.New("journal store is not connected")
Expand Down
40 changes: 0 additions & 40 deletions eventstore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (

"github.com/tochemey/ego/v3/egopb"
"github.com/tochemey/ego/v3/eventstore"
"github.com/tochemey/ego/v3/internal/telemetry"
)

var (
Expand Down Expand Up @@ -90,9 +89,6 @@ func NewEventsStore(config *postgres.Config) *EventsStore {

// Connect connects to the underlying postgres database
func (s *EventsStore) Connect(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.Connect")
defer span.End()
// check whether this instance of the journal is connected or not
if s.connected.Load() {
return nil
Expand All @@ -111,10 +107,6 @@ func (s *EventsStore) Connect(ctx context.Context) error {

// Disconnect disconnects from the underlying postgres database
func (s *EventsStore) Disconnect(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.Disconnect")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil
Expand All @@ -132,10 +124,6 @@ func (s *EventsStore) Disconnect(ctx context.Context) error {

// Ping verifies a connection to the database is still alive, establishing a connection if necessary.
func (s *EventsStore) Ping(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.Ping")
defer span.End()

// check whether we are connected or not
if !s.connected.Load() {
return s.Connect(ctx)
Expand All @@ -146,10 +134,6 @@ func (s *EventsStore) Ping(ctx context.Context) error {

// PersistenceIDs returns the distinct list of all the persistence ids in the journal store
func (s *EventsStore) PersistenceIDs(ctx context.Context, pageSize uint64, pageToken string) (persistenceIDs []string, nextPageToken string, err error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.PersistenceIDs")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, "", errors.New("journal store is not connected")
Expand Down Expand Up @@ -201,10 +185,6 @@ func (s *EventsStore) PersistenceIDs(ctx context.Context, pageSize uint64, pageT

// WriteEvents writes a bunch of events into the underlying postgres database
func (s *EventsStore) WriteEvents(ctx context.Context, events []*egopb.Event) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.WriteEvents")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return errors.New("journal store is not connected")
Expand Down Expand Up @@ -288,10 +268,6 @@ func (s *EventsStore) WriteEvents(ctx context.Context, events []*egopb.Event) er

// DeleteEvents deletes events from the postgres up to a given sequence number (inclusive)
func (s *EventsStore) DeleteEvents(ctx context.Context, persistenceID string, toSequenceNumber uint64) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.DeleteEvents")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return errors.New("journal store is not connected")
Expand Down Expand Up @@ -319,10 +295,6 @@ func (s *EventsStore) DeleteEvents(ctx context.Context, persistenceID string, to

// ReplayEvents fetches events for a given persistence ID from a given sequence number(inclusive) to a given sequence number(inclusive)
func (s *EventsStore) ReplayEvents(ctx context.Context, persistenceID string, fromSequenceNumber, toSequenceNumber uint64, max uint64) ([]*egopb.Event, error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.ReplayEvents")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, errors.New("journal store is not connected")
Expand Down Expand Up @@ -357,10 +329,6 @@ func (s *EventsStore) ReplayEvents(ctx context.Context, persistenceID string, fr

// GetLatestEvent fetches the latest event
func (s *EventsStore) GetLatestEvent(ctx context.Context, persistenceID string) (*egopb.Event, error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.GetLatestEvent")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, errors.New("journal store is not connected")
Expand Down Expand Up @@ -398,10 +366,6 @@ func (s *EventsStore) GetLatestEvent(ctx context.Context, persistenceID string)

// GetShardEvents returns the next (max) events after the offset in the journal for a given shard
func (s *EventsStore) GetShardEvents(ctx context.Context, shardNumber uint64, offset int64, max uint64) ([]*egopb.Event, int64, error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.GetShardEvents")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, 0, errors.New("journal store is not connected")
Expand Down Expand Up @@ -448,10 +412,6 @@ func (s *EventsStore) GetShardEvents(ctx context.Context, shardNumber uint64, of

// ShardNumbers returns the distinct list of all the shards in the journal store
func (s *EventsStore) ShardNumbers(ctx context.Context) ([]uint64, error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.NumShards")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, errors.New("journal store is not connected")
Expand Down
Loading

0 comments on commit 81ecfb6

Please sign in to comment.