Skip to content

Commit

Permalink
refactor!: complete refactoring of the code to account for cluster (#80)
Browse files Browse the repository at this point in the history
* refactor!: complete refactoring of the code to account for cluster

* refactor!: add more tests
  • Loading branch information
Tochemey authored Jul 4, 2024
1 parent e7ede34 commit ffa04da
Show file tree
Hide file tree
Showing 34 changed files with 650 additions and 619 deletions.
2 changes: 1 addition & 1 deletion Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ protogen:
--path protos/ego

# save artifact to
SAVE ARTIFACT gen/ego/v2 AS LOCAL egopb
SAVE ARTIFACT gen/ego/v3 AS LOCAL egopb

testprotogen:
# copy the proto files to generate
Expand Down
34 changes: 17 additions & 17 deletions actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,23 @@ import (
"github.com/tochemey/goakt/v2/actors"
"github.com/tochemey/goakt/v2/goaktpb"

"github.com/tochemey/ego/v2/egopb"
"github.com/tochemey/ego/v2/eventstore"
"github.com/tochemey/ego/v2/eventstream"
"github.com/tochemey/ego/v2/internal/telemetry"
"github.com/tochemey/ego/v3/egopb"
"github.com/tochemey/ego/v3/eventstore"
"github.com/tochemey/ego/v3/eventstream"
"github.com/tochemey/ego/v3/internal/telemetry"
)

var (
eventsTopic = "topic.events.%d"
)

// actor is an event sourced based actor
type actor[T State] struct {
EntityBehavior[T]
type actor struct {
EntityBehavior
// specifies the events store
eventsStore eventstore.EventsStore
// specifies the current state
currentState T
currentState State

eventsCounter *atomic.Uint64
lastCommandTime time.Time
Expand All @@ -64,12 +64,12 @@ type actor[T State] struct {
}

// enforce compilation error
var _ actors.Actor = &actor[State]{}
var _ actors.Actor = (*actor)(nil)

// newActor creates an instance of actor provided the eventSourcedHandler and the events store
func newActor[T State](behavior EntityBehavior[T], eventsStore eventstore.EventsStore, eventsStream eventstream.Stream) *actor[T] {
func newActor(behavior EntityBehavior, eventsStore eventstore.EventsStore, eventsStream eventstream.Stream) *actor {
// create an instance of entity and return it
return &actor[T]{
return &actor{
eventsStore: eventsStore,
EntityBehavior: behavior,
eventsCounter: atomic.NewUint64(0),
Expand All @@ -80,7 +80,7 @@ func newActor[T State](behavior EntityBehavior[T], eventsStore eventstore.Events

// PreStart pre-starts the actor
// At this stage we connect to the various stores
func (entity *actor[T]) PreStart(ctx context.Context) error {
func (entity *actor) PreStart(ctx context.Context) error {
spanCtx, span := telemetry.SpanContext(ctx, "PreStart")
defer span.End()
entity.mu.Lock()
Expand All @@ -98,7 +98,7 @@ func (entity *actor[T]) PreStart(ctx context.Context) error {
}

// Receive processes any message dropped into the actor mailbox.
func (entity *actor[T]) Receive(ctx actors.ReceiveContext) {
func (entity *actor) Receive(ctx actors.ReceiveContext) {
_, span := telemetry.SpanContext(ctx.Context(), "Receive")
defer span.End()

Expand All @@ -119,7 +119,7 @@ func (entity *actor[T]) Receive(ctx actors.ReceiveContext) {
}

// PostStop prepares the actor to gracefully shutdown
func (entity *actor[T]) PostStop(ctx context.Context) error {
func (entity *actor) PostStop(ctx context.Context) error {
_, span := telemetry.SpanContext(ctx, "PostStop")
defer span.End()

Expand All @@ -131,7 +131,7 @@ func (entity *actor[T]) PostStop(ctx context.Context) error {

// recoverFromSnapshot reset the persistent actor to the latest snapshot in case there is one
// this is vital when the entity actor is restarting.
func (entity *actor[T]) recoverFromSnapshot(ctx context.Context) error {
func (entity *actor) recoverFromSnapshot(ctx context.Context) error {
spanCtx, span := telemetry.SpanContext(ctx, "RecoverFromSnapshot")
defer span.End()

Expand All @@ -157,7 +157,7 @@ func (entity *actor[T]) recoverFromSnapshot(ctx context.Context) error {
}

// sendErrorReply sends an error as a reply message
func (entity *actor[T]) sendErrorReply(ctx actors.ReceiveContext, err error) {
func (entity *actor) sendErrorReply(ctx actors.ReceiveContext, err error) {
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_ErrorReply{
ErrorReply: &egopb.ErrorReply{
Expand All @@ -170,7 +170,7 @@ func (entity *actor[T]) sendErrorReply(ctx actors.ReceiveContext, err error) {
}

// getStateAndReply returns the current state of the entity
func (entity *actor[T]) getStateAndReply(ctx actors.ReceiveContext) {
func (entity *actor) getStateAndReply(ctx actors.ReceiveContext) {
latestEvent, err := entity.eventsStore.GetLatestEvent(ctx.Context(), entity.ID())
if err != nil {
entity.sendErrorReply(ctx, err)
Expand All @@ -193,7 +193,7 @@ func (entity *actor[T]) getStateAndReply(ctx actors.ReceiveContext) {
}

// processCommandAndReply processes the incoming command
func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, command Command) {
func (entity *actor) processCommandAndReply(ctx actors.ReceiveContext, command Command) {
goCtx := ctx.Context()
events, err := entity.HandleCommand(goCtx, command, entity.currentState)
if err != nil {
Expand Down
22 changes: 11 additions & 11 deletions actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ import (
"github.com/tochemey/goakt/v2/log"
"github.com/tochemey/gopack/postgres"

"github.com/tochemey/ego/v2/egopb"
"github.com/tochemey/ego/v2/eventstore/memory"
pgeventstore "github.com/tochemey/ego/v2/eventstore/postgres"
"github.com/tochemey/ego/v2/eventstream"
testpb "github.com/tochemey/ego/v2/test/data/pb/v1"
"github.com/tochemey/ego/v3/egopb"
"github.com/tochemey/ego/v3/eventstore/memory"
pgeventstore "github.com/tochemey/ego/v3/eventstore/postgres"
"github.com/tochemey/ego/v3/eventstream"
testpb "github.com/tochemey/ego/v3/test/data/pb/v3"
)

func TestActor(t *testing.T) {
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestActor(t *testing.T) {
eventStream := eventstream.New()

// create the persistence actor using the behavior previously created
actor := newActor[*testpb.Account](behavior, eventStore, eventStream)
actor := newActor(behavior, eventStore, eventStream)
// spawn the actor
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor)
require.NotNil(t, pid)
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestActor(t *testing.T) {
eventStream := eventstream.New()

// create the persistence actor using the behavior previously created
persistentActor := newActor[*testpb.Account](behavior, eventStore, eventStream)
persistentActor := newActor(behavior, eventStore, eventStream)
// spawn the actor
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NotNil(t, pid)
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestActor(t *testing.T) {
eventStream := eventstream.New()

// create the persistence actor using the behavior previously created
persistentActor := newActor[*testpb.Account](behavior, eventStore, eventStream)
persistentActor := newActor(behavior, eventStore, eventStream)
// spawn the actor
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NotNil(t, pid)
Expand Down Expand Up @@ -340,7 +340,7 @@ func TestActor(t *testing.T) {
eventStream := eventstream.New()

// create the persistence actor using the behavior previously created
persistentActor := newActor[*testpb.Account](behavior, eventStore, eventStream)
persistentActor := newActor(behavior, eventStore, eventStream)
// spawn the actor
pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NoError(t, err)
Expand Down Expand Up @@ -464,7 +464,7 @@ func TestActor(t *testing.T) {
eventStream := eventstream.New()

// create the persistence actor using the behavior previously created
actor := newActor[*testpb.Account](behavior, eventStore, eventStream)
actor := newActor(behavior, eventStore, eventStream)
// spawn the actor
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor)
require.NotNil(t, pid)
Expand Down Expand Up @@ -582,7 +582,7 @@ func TestActor(t *testing.T) {
eventStream := eventstream.New()

// create the persistence actor using the behavior previously created
actor := newActor[*testpb.Account](behavior, eventStore, eventStream)
actor := newActor(behavior, eventStore, eventStream)
// spawn the actor
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor)
require.NotNil(t, pid)
Expand Down
8 changes: 4 additions & 4 deletions behavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ type Event proto.Message
type State proto.Message

// EntityBehavior defines an event sourced behavior when modeling a CQRS EntityBehavior.
type EntityBehavior[T State] interface {
type EntityBehavior interface {
// ID defines the id that will be used in the event journal.
// This helps track the entity in the events store.
ID() string
// InitialState returns the event sourced actor initial state.
// This is set as the initial state when there are no snapshots found the entity
InitialState() T
InitialState() State
// HandleCommand helps handle commands received by the event sourced actor. The command handlers define how to handle each incoming command,
// which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can
// be returned as a no-op. Command handlers are the meat of the event sourced actor.
Expand All @@ -53,8 +53,8 @@ type EntityBehavior[T State] interface {
// Every event emitted are processed one after the other in the same order they were emitted to guarantee consistency.
// It is at the discretion of the application developer to know in which order a given command should return the list of events
// This is really powerful when a command needs to return two events. For instance, an OpenAccount command can result in two events: one is AccountOpened and the second is AccountCredited
HandleCommand(ctx context.Context, command Command, priorState T) (events []Event, err error)
HandleCommand(ctx context.Context, command Command, priorState State) (events []Event, err error)
// HandleEvent handle events emitted by the command handlers. The event handlers are used to mutate the state of the event sourced actor by applying the events to it.
// Event handlers must be pure functions as they will be used when instantiating the event sourced actor and replaying the event journal.
HandleEvent(ctx context.Context, event Event, priorState T) (state T, err error)
HandleEvent(ctx context.Context, event Event, priorState State) (state State, err error)
}
Loading

0 comments on commit ffa04da

Please sign in to comment.