Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: upgrade Go-Akt to 2.6.0 and remove unnecessary code #85

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
env:
FORCE_COLOR: 1
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4
- uses: actions/checkout@v4
with:
submodules: false
fetch-depth: 0
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
env:
FORCE_COLOR: 1
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4
- uses: actions/checkout@v4
with:
submodules: false
fetch-depth: 0
Expand Down
8 changes: 4 additions & 4 deletions actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (entity *actor) PreStart(ctx context.Context) error {
}

// Receive processes any message dropped into the actor mailbox.
func (entity *actor) Receive(ctx actors.ReceiveContext) {
func (entity *actor) Receive(ctx *actors.ReceiveContext) {
_, span := telemetry.SpanContext(ctx.Context(), "Receive")
defer span.End()

Expand Down Expand Up @@ -157,7 +157,7 @@ func (entity *actor) recoverFromSnapshot(ctx context.Context) error {
}

// sendErrorReply sends an error as a reply message
func (entity *actor) sendErrorReply(ctx actors.ReceiveContext, err error) {
func (entity *actor) sendErrorReply(ctx *actors.ReceiveContext, err error) {
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_ErrorReply{
ErrorReply: &egopb.ErrorReply{
Expand All @@ -170,7 +170,7 @@ func (entity *actor) sendErrorReply(ctx actors.ReceiveContext, err error) {
}

// getStateAndReply returns the current state of the entity
func (entity *actor) getStateAndReply(ctx actors.ReceiveContext) {
func (entity *actor) getStateAndReply(ctx *actors.ReceiveContext) {
latestEvent, err := entity.eventsStore.GetLatestEvent(ctx.Context(), entity.ID())
if err != nil {
entity.sendErrorReply(ctx, err)
Expand All @@ -193,7 +193,7 @@ func (entity *actor) getStateAndReply(ctx actors.ReceiveContext) {
}

// processCommandAndReply processes the incoming command
func (entity *actor) processCommandAndReply(ctx actors.ReceiveContext, command Command) {
func (entity *actor) processCommandAndReply(ctx *actors.ReceiveContext, command Command) {
goCtx := ctx.Context()
events, err := entity.HandleCommand(goCtx, command, entity.currentState)
if err != nil {
Expand Down
20 changes: 5 additions & 15 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/tochemey/ego/v3/egopb"
"github.com/tochemey/ego/v3/eventstore"
"github.com/tochemey/ego/v3/eventstream"
egotel "github.com/tochemey/ego/v3/internal/telemetry"
"github.com/tochemey/ego/v3/offsetstore"
"github.com/tochemey/ego/v3/projection"
)
Expand Down Expand Up @@ -85,7 +84,7 @@ func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option)
name: name,
eventsStore: eventsStore,
enableCluster: atomic.NewBool(false),
logger: log.DefaultLogger,
logger: log.New(log.InfoLevel, os.Stderr),
telemetry: telemetry.New(),
eventStream: eventstream.New(),
locker: &sync.Mutex{},
Expand Down Expand Up @@ -153,9 +152,6 @@ func (x *Engine) Start(ctx context.Context) error {

// AddProjection add a projection to the running eGo engine and start it
func (x *Engine) AddProjection(ctx context.Context, name string, handler projection.Handler, offsetStore offsetstore.OffsetStore, opts ...projection.Option) error {
spanCtx, span := egotel.SpanContext(ctx, "AddProjection")
defer span.End()

x.locker.Lock()
started := x.started.Load()
x.locker.Unlock()
Expand All @@ -166,20 +162,20 @@ func (x *Engine) AddProjection(ctx context.Context, name string, handler project
actor := projection.New(name, handler, x.eventsStore, offsetStore, opts...)

var (
pid actors.PID
pid *actors.PID
err error
)

x.locker.Lock()
actorSystem := x.actorSystem
x.locker.Unlock()

if pid, err = actorSystem.Spawn(spanCtx, name, actor); err != nil {
if pid, err = actorSystem.Spawn(ctx, name, actor); err != nil {
x.logger.Error(fmt.Errorf("failed to register the projection=(%s): %w", name, err))
return err
}

if err := actors.Tell(spanCtx, pid, projection.Start); err != nil {
if err := actors.Tell(ctx, pid, projection.Start); err != nil {
x.logger.Error(fmt.Errorf("failed to start the projection=(%s): %w", name, err))
return err
}
Expand All @@ -195,10 +191,7 @@ func (x *Engine) Stop(ctx context.Context) error {
}

// Subscribe creates an events subscriber
func (x *Engine) Subscribe(ctx context.Context) (eventstream.Subscriber, error) {
_, span := egotel.SpanContext(ctx, "Subscribe")
defer span.End()

func (x *Engine) Subscribe() (eventstream.Subscriber, error) {
x.locker.Lock()
started := x.started.Load()
x.locker.Unlock()
Expand Down Expand Up @@ -251,9 +244,6 @@ func (x *Engine) Entity(ctx context.Context, behavior EntityBehavior) error {
// 2. nil when there is no resulting state or no event persisted
// 3. an error in case of error
func (x *Engine) SendCommand(ctx context.Context, entityID string, cmd Command, timeout time.Duration) (resultingState State, revision uint64, err error) {
ctx, span := egotel.SpanContext(ctx, "SendCommand")
defer span.End()

x.locker.Lock()
started := x.started.Load()
x.locker.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestEgo(t *testing.T) {
require.NoError(t, err)

// subscribe to events
subscriber, err := engine.Subscribe(ctx)
subscriber, err := engine.Subscribe()
require.NoError(t, err)
require.NotNil(t, subscriber)

Expand Down
61 changes: 10 additions & 51 deletions eventstore/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (

"github.com/tochemey/ego/v3/egopb"
"github.com/tochemey/ego/v3/eventstore"
"github.com/tochemey/ego/v3/internal/telemetry"
)

// EventsStore keep in memory every journal
Expand All @@ -68,11 +67,7 @@ func NewEventsStore() *EventsStore {
}

// Connect connects to the journal store
func (s *EventsStore) Connect(ctx context.Context) error {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.Connect")
defer span.End()

func (s *EventsStore) Connect(context.Context) error {
// check whether this instance of the journal is connected or not
if s.connected.Load() {
return nil
Expand All @@ -94,11 +89,7 @@ func (s *EventsStore) Connect(ctx context.Context) error {
}

// Disconnect disconnect the journal store
func (s *EventsStore) Disconnect(ctx context.Context) error {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.Disconnect")
defer span.End()

func (s *EventsStore) Disconnect(context.Context) error {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil
Expand All @@ -125,25 +116,17 @@ func (s *EventsStore) Disconnect(ctx context.Context) error {

// Ping verifies a connection to the database is still alive, establishing a connection if necessary.
func (s *EventsStore) Ping(ctx context.Context) error {
// add a span context
spanCtx, span := telemetry.SpanContext(ctx, "eventsStore.Ping")
defer span.End()

// check whether we are connected or not
if !s.connected.Load() {
return s.Connect(spanCtx)
return s.Connect(ctx)
}

return nil
}

// PersistenceIDs returns the distinct list of all the persistence ids in the journal store
// FIXME: enhance the implementation. As it stands it will be a bit slow when there are a lot of records
func (s *EventsStore) PersistenceIDs(ctx context.Context, pageSize uint64, pageToken string) (persistenceIDs []string, nextPageToken string, err error) {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.PersistenceIDs")
defer span.End()

func (s *EventsStore) PersistenceIDs(_ context.Context, pageSize uint64, pageToken string) (persistenceIDs []string, nextPageToken string, err error) {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, "", errors.New("journal store is not connected")
Expand Down Expand Up @@ -214,11 +197,7 @@ func (s *EventsStore) PersistenceIDs(ctx context.Context, pageSize uint64, pageT
}

// WriteEvents persist events in batches for a given persistenceID
func (s *EventsStore) WriteEvents(ctx context.Context, events []*egopb.Event) error {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.WriteEvents")
defer span.End()

func (s *EventsStore) WriteEvents(_ context.Context, events []*egopb.Event) error {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return errors.New("journal store is not connected")
Expand Down Expand Up @@ -266,11 +245,7 @@ func (s *EventsStore) WriteEvents(ctx context.Context, events []*egopb.Event) er

// DeleteEvents deletes events from the store upt to a given sequence number (inclusive)
// FIXME: enhance the implementation. As it stands it may be a bit slow when there are a lot of records
func (s *EventsStore) DeleteEvents(ctx context.Context, persistenceID string, toSequenceNumber uint64) error {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.DeleteEvents")
defer span.End()

func (s *EventsStore) DeleteEvents(_ context.Context, persistenceID string, toSequenceNumber uint64) error {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return errors.New("journal store is not connected")
Expand Down Expand Up @@ -319,11 +294,7 @@ func (s *EventsStore) DeleteEvents(ctx context.Context, persistenceID string, to
}

// ReplayEvents fetches events for a given persistence ID from a given sequence number(inclusive) to a given sequence number(inclusive)
func (s *EventsStore) ReplayEvents(ctx context.Context, persistenceID string, fromSequenceNumber, toSequenceNumber uint64, max uint64) ([]*egopb.Event, error) {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.ReplayEvents")
defer span.End()

func (s *EventsStore) ReplayEvents(_ context.Context, persistenceID string, fromSequenceNumber, toSequenceNumber uint64, max uint64) ([]*egopb.Event, error) {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, errors.New("journal store is not connected")
Expand Down Expand Up @@ -392,11 +363,7 @@ func (s *EventsStore) ReplayEvents(ctx context.Context, persistenceID string, fr
}

// GetLatestEvent fetches the latest event
func (s *EventsStore) GetLatestEvent(ctx context.Context, persistenceID string) (*egopb.Event, error) {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.GetLatestEvent")
defer span.End()

func (s *EventsStore) GetLatestEvent(_ context.Context, persistenceID string) (*egopb.Event, error) {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, errors.New("journal store is not connected")
Expand Down Expand Up @@ -447,11 +414,7 @@ func (s *EventsStore) GetLatestEvent(ctx context.Context, persistenceID string)
}

// GetShardEvents returns the next (max) events after the offset in the journal for a given shard
func (s *EventsStore) GetShardEvents(ctx context.Context, shardNumber uint64, offset int64, max uint64) ([]*egopb.Event, int64, error) {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.GetShardEvents")
defer span.End()

func (s *EventsStore) GetShardEvents(_ context.Context, shardNumber uint64, offset int64, max uint64) ([]*egopb.Event, int64, error) {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, 0, errors.New("journal store is not connected")
Expand Down Expand Up @@ -533,11 +496,7 @@ func (s *EventsStore) GetShardEvents(ctx context.Context, shardNumber uint64, of
}

// ShardNumbers returns the distinct list of all the shards in the journal store
func (s *EventsStore) ShardNumbers(ctx context.Context) ([]uint64, error) {
// add a span context
_, span := telemetry.SpanContext(ctx, "eventsStore.NumShards")
defer span.End()

func (s *EventsStore) ShardNumbers(context.Context) ([]uint64, error) {
// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, errors.New("journal store is not connected")
Expand Down
40 changes: 0 additions & 40 deletions eventstore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (

"github.com/tochemey/ego/v3/egopb"
"github.com/tochemey/ego/v3/eventstore"
"github.com/tochemey/ego/v3/internal/telemetry"
)

var (
Expand Down Expand Up @@ -90,9 +89,6 @@ func NewEventsStore(config *postgres.Config) *EventsStore {

// Connect connects to the underlying postgres database
func (s *EventsStore) Connect(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.Connect")
defer span.End()
// check whether this instance of the journal is connected or not
if s.connected.Load() {
return nil
Expand All @@ -111,10 +107,6 @@ func (s *EventsStore) Connect(ctx context.Context) error {

// Disconnect disconnects from the underlying postgres database
func (s *EventsStore) Disconnect(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.Disconnect")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil
Expand All @@ -132,10 +124,6 @@ func (s *EventsStore) Disconnect(ctx context.Context) error {

// Ping verifies a connection to the database is still alive, establishing a connection if necessary.
func (s *EventsStore) Ping(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.Ping")
defer span.End()

// check whether we are connected or not
if !s.connected.Load() {
return s.Connect(ctx)
Expand All @@ -146,10 +134,6 @@ func (s *EventsStore) Ping(ctx context.Context) error {

// PersistenceIDs returns the distinct list of all the persistence ids in the journal store
func (s *EventsStore) PersistenceIDs(ctx context.Context, pageSize uint64, pageToken string) (persistenceIDs []string, nextPageToken string, err error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.PersistenceIDs")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, "", errors.New("journal store is not connected")
Expand Down Expand Up @@ -201,10 +185,6 @@ func (s *EventsStore) PersistenceIDs(ctx context.Context, pageSize uint64, pageT

// WriteEvents writes a bunch of events into the underlying postgres database
func (s *EventsStore) WriteEvents(ctx context.Context, events []*egopb.Event) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.WriteEvents")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return errors.New("journal store is not connected")
Expand Down Expand Up @@ -288,10 +268,6 @@ func (s *EventsStore) WriteEvents(ctx context.Context, events []*egopb.Event) er

// DeleteEvents deletes events from the postgres up to a given sequence number (inclusive)
func (s *EventsStore) DeleteEvents(ctx context.Context, persistenceID string, toSequenceNumber uint64) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.DeleteEvents")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return errors.New("journal store is not connected")
Expand Down Expand Up @@ -319,10 +295,6 @@ func (s *EventsStore) DeleteEvents(ctx context.Context, persistenceID string, to

// ReplayEvents fetches events for a given persistence ID from a given sequence number(inclusive) to a given sequence number(inclusive)
func (s *EventsStore) ReplayEvents(ctx context.Context, persistenceID string, fromSequenceNumber, toSequenceNumber uint64, max uint64) ([]*egopb.Event, error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.ReplayEvents")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, errors.New("journal store is not connected")
Expand Down Expand Up @@ -357,10 +329,6 @@ func (s *EventsStore) ReplayEvents(ctx context.Context, persistenceID string, fr

// GetLatestEvent fetches the latest event
func (s *EventsStore) GetLatestEvent(ctx context.Context, persistenceID string) (*egopb.Event, error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.GetLatestEvent")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, errors.New("journal store is not connected")
Expand Down Expand Up @@ -398,10 +366,6 @@ func (s *EventsStore) GetLatestEvent(ctx context.Context, persistenceID string)

// GetShardEvents returns the next (max) events after the offset in the journal for a given shard
func (s *EventsStore) GetShardEvents(ctx context.Context, shardNumber uint64, offset int64, max uint64) ([]*egopb.Event, int64, error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.GetShardEvents")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, 0, errors.New("journal store is not connected")
Expand Down Expand Up @@ -448,10 +412,6 @@ func (s *EventsStore) GetShardEvents(ctx context.Context, shardNumber uint64, of

// ShardNumbers returns the distinct list of all the shards in the journal store
func (s *EventsStore) ShardNumbers(ctx context.Context) ([]uint64, error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "eventsStore.NumShards")
defer span.End()

// check whether this instance of the journal is connected or not
if !s.connected.Load() {
return nil, errors.New("journal store is not connected")
Expand Down
Loading
Loading