Skip to content

Commit

Permalink
refactor: refactor source code
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Oct 7, 2023
1 parent 5fbe28c commit 4067838
Show file tree
Hide file tree
Showing 11 changed files with 395 additions and 521 deletions.
131 changes: 53 additions & 78 deletions entity/behavior.go → actor.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package entity
package ego

import (
"context"
Expand All @@ -12,57 +12,32 @@ import (
"github.com/tochemey/ego/internal/telemetry"
"github.com/tochemey/goakt/actors"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

type Command proto.Message
type Event proto.Message
type State proto.Message

// Behavior defines an event sourced behavior when modeling a CQRS Behavior.
type Behavior[T State] interface {
// ID defines the id that will be used in the event journal.
// This helps track the entity in the events store.
ID() string
// InitialState returns the event sourced actor initial state.
// This is set as the initial state when there are no snapshots found the entity
InitialState() T
// HandleCommand helps handle commands received by the event sourced actor. The command handlers define how to handle each incoming command,
// which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can
// be returned as a no-op. Command handlers are the meat of the event sourced actor.
// They encode the business rules of your event sourced actor and act as a guardian of the event sourced actor consistency.
// The command eventSourcedHandler must first validate that the incoming command can be applied to the current model state.
// Any decision should be solely based on the data passed in the commands and the state of the Behavior.
// In case of successful validation, one or more events expressing the mutations are persisted.
// Once the events are persisted, they are applied to the state producing a new valid state.
HandleCommand(ctx context.Context, command Command, priorState T) (event Event, err error)
// HandleEvent handle events emitted by the command handlers. The event handlers are used to mutate the state of the event sourced actor by applying the events to it.
// Event handlers must be pure functions as they will be used when instantiating the event sourced actor and replaying the event journal.
HandleEvent(ctx context.Context, event Event, priorState T) (state T, err error)
}
"google.golang.org/protobuf/proto"
)

// Entity is an event sourced based actor
type Entity[T State] struct {
Behavior[T]
// actor is an event sourced based actor
type actor struct {
Behavior
// specifies the events store
eventsStore eventstore.EventsStore
// specifies the current state
currentState T
currentState proto.Message

eventsCounter *atomic.Uint64
lastCommandTime time.Time
mu sync.RWMutex
}

// enforce compilation error
var _ actors.Actor = &Entity[State]{}
// ensure that the struct does implement the given interface
var _ actors.Actor = &actor{}

// New creates an instance of Entity provided the eventSourcedHandler and the events store
func New[T State](behavior Behavior[T], eventsStore eventstore.EventsStore) *Entity[T] {
// newActor creates an instance of behaviorImpl provided the eventSourcedHandler and the events store
func newActor(behavior Behavior, eventsStore eventstore.EventsStore) *actor {
// create an instance of entity and return it
return &Entity[T]{
return &actor{
eventsStore: eventsStore,
Behavior: behavior,
eventsCounter: atomic.NewUint64(0),
Expand All @@ -72,79 +47,79 @@ func New[T State](behavior Behavior[T], eventsStore eventstore.EventsStore) *Ent

// PreStart pre-starts the actor
// At this stage we connect to the various stores
func (entity *Entity[T]) PreStart(ctx context.Context) error {
func (x *actor) PreStart(ctx context.Context) error {
// add a span context
//ctx, span := telemetry.SpanContext(ctx, "PreStart")
//defer span.End()
// acquire the lock
entity.mu.Lock()
x.mu.Lock()
// release lock when done
defer entity.mu.Unlock()
defer x.mu.Unlock()

// connect to the various stores
if entity.eventsStore == nil {
if x.eventsStore == nil {
return errors.New("events store is not defined")
}

// call the connect method of the journal store
if err := entity.eventsStore.Connect(ctx); err != nil {
if err := x.eventsStore.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to the events store: %v", err)
}

// check whether there is a snapshot to recover from
if err := entity.recoverFromSnapshot(ctx); err != nil {
if err := x.recoverFromSnapshot(ctx); err != nil {
return errors.Wrap(err, "failed to recover from snapshot")
}
return nil
}

// Receive processes any message dropped into the actor mailbox.
func (entity *Entity[T]) Receive(ctx actors.ReceiveContext) {
func (x *actor) Receive(ctx actors.ReceiveContext) {
// add a span context
_, span := telemetry.SpanContext(ctx.Context(), "Receive")
defer span.End()

// acquire the lock
entity.mu.Lock()
x.mu.Lock()
// release lock when done
defer entity.mu.Unlock()
defer x.mu.Unlock()

// grab the command sent
switch command := ctx.Message().(type) {
case *egopb.GetStateCommand:
entity.getStateAndReply(ctx)
x.getStateAndReply(ctx)
default:
entity.processCommandAndReply(ctx, command)
x.processCommandAndReply(ctx, command)
}
}

// PostStop prepares the actor to gracefully shutdown
func (entity *Entity[T]) PostStop(ctx context.Context) error {
func (x *actor) PostStop(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "PostStop")
defer span.End()

// acquire the lock
entity.mu.Lock()
x.mu.Lock()
// release lock when done
defer entity.mu.Unlock()
defer x.mu.Unlock()

// disconnect the journal
if err := entity.eventsStore.Disconnect(ctx); err != nil {
if err := x.eventsStore.Disconnect(ctx); err != nil {
return fmt.Errorf("failed to disconnect the events store: %v", err)
}
return nil
}

// recoverFromSnapshot reset the persistent actor to the latest snapshot in case there is one
// this is vital when the entity actor is restarting.
func (entity *Entity[T]) recoverFromSnapshot(ctx context.Context) error {
func (x *actor) recoverFromSnapshot(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "RecoverFromSnapshot")
defer span.End()

// check whether there is a snapshot to recover from
event, err := entity.eventsStore.GetLatestEvent(ctx, entity.ID())
event, err := x.eventsStore.GetLatestEvent(ctx, x.ID())
// handle the error
if err != nil {
return errors.Wrap(err, "failed to recover the latest journal")
Expand All @@ -153,22 +128,22 @@ func (entity *Entity[T]) recoverFromSnapshot(ctx context.Context) error {
// we do have the latest state just recover from it
if event != nil {
// set the current state
if err := event.GetResultingState().UnmarshalTo(entity.currentState); err != nil {
if err := event.GetResultingState().UnmarshalTo(x.currentState); err != nil {
return errors.Wrap(err, "failed unmarshal the latest state")
}

// set the event counter
entity.eventsCounter.Store(event.GetSequenceNumber())
x.eventsCounter.Store(event.GetSequenceNumber())
return nil
}

// in case there is no snapshot
entity.currentState = entity.InitialState()
x.currentState = x.InitialState()
return nil
}

// sendErrorReply sends an error as a reply message
func (entity *Entity[T]) sendErrorReply(ctx actors.ReceiveContext, err error) {
func (x *actor) sendErrorReply(ctx actors.ReceiveContext, err error) {
// create a new error reply
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_ErrorReply{
Expand All @@ -182,12 +157,12 @@ func (entity *Entity[T]) sendErrorReply(ctx actors.ReceiveContext, err error) {
}

// getStateAndReply returns the current state of the entity
func (entity *Entity[T]) getStateAndReply(ctx actors.ReceiveContext) {
func (x *actor) getStateAndReply(ctx actors.ReceiveContext) {
// let us fetch the latest journal
latestEvent, err := entity.eventsStore.GetLatestEvent(ctx.Context(), entity.ID())
latestEvent, err := x.eventsStore.GetLatestEvent(ctx.Context(), x.ID())
// handle the error
if err != nil {
entity.sendErrorReply(ctx, err)
x.sendErrorReply(ctx, err)
return
}

Expand All @@ -196,7 +171,7 @@ func (entity *Entity[T]) getStateAndReply(ctx actors.ReceiveContext) {
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_StateReply{
StateReply: &egopb.StateReply{
PersistenceId: entity.ID(),
PersistenceId: x.ID(),
State: resultingState,
SequenceNumber: latestEvent.GetSequenceNumber(),
Timestamp: latestEvent.GetTimestamp(),
Expand All @@ -209,15 +184,15 @@ func (entity *Entity[T]) getStateAndReply(ctx actors.ReceiveContext) {
}

// processCommandAndReply processes the incoming command
func (entity *Entity[T]) processCommandAndReply(ctx actors.ReceiveContext, command Command) {
func (x *actor) processCommandAndReply(ctx actors.ReceiveContext, command proto.Message) {
// set the go context
goCtx := ctx.Context()
// pass the received command to the command handler
event, err := entity.HandleCommand(goCtx, command, entity.currentState)
event, err := x.HandleCommand(goCtx, command, x.currentState)
// handle the command handler error
if err != nil {
// send an error reply
entity.sendErrorReply(ctx, err)
x.sendErrorReply(ctx, err)
return
}

Expand All @@ -235,58 +210,58 @@ func (entity *Entity[T]) processCommandAndReply(ctx actors.ReceiveContext, comma
}

// process the event by calling the event handler
resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState)
resultingState, err := x.HandleEvent(goCtx, event, x.currentState)
// handle the event handler error
if err != nil {
// send an error reply
entity.sendErrorReply(ctx, err)
x.sendErrorReply(ctx, err)
return
}

// increment the event counter
entity.eventsCounter.Inc()
x.eventsCounter.Inc()

// set the current state for the next command
entity.currentState = resultingState
x.currentState = resultingState

// marshal the event and the resulting state
marshaledEvent, _ := anypb.New(event)
marshaledState, _ := anypb.New(resultingState)

sequenceNumber := entity.eventsCounter.Load()
sequenceNumber := x.eventsCounter.Load()
timestamp := timestamppb.Now()
entity.lastCommandTime = timestamp.AsTime()
shardNumber := ctx.Self().ActorSystem().GetPartition(goCtx, entity.ID())
x.lastCommandTime = timestamp.AsTime()
shardNumber := ctx.Self().ActorSystem().GetPartition(x.ID())

// create the event
envelope := &egopb.Event{
PersistenceId: entity.ID(),
PersistenceId: x.ID(),
SequenceNumber: sequenceNumber,
IsDeleted: false,
Event: marshaledEvent,
ResultingState: marshaledState,
Timestamp: entity.lastCommandTime.Unix(),
Timestamp: x.lastCommandTime.Unix(),
Shard: shardNumber,
}

// create a journal list
journals := []*egopb.Event{envelope}

// TODO persist the event in batch using a child actor
if err := entity.eventsStore.WriteEvents(goCtx, journals); err != nil {
if err := x.eventsStore.WriteEvents(goCtx, journals); err != nil {
// send an error reply
entity.sendErrorReply(ctx, err)
x.sendErrorReply(ctx, err)
return
}

// create the command reply to send
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_StateReply{
StateReply: &egopb.StateReply{
PersistenceId: entity.ID(),
PersistenceId: x.ID(),
State: marshaledState,
SequenceNumber: sequenceNumber,
Timestamp: entity.lastCommandTime.Unix(),
Timestamp: x.lastCommandTime.Unix(),
},
},
}
Expand Down
30 changes: 17 additions & 13 deletions entity/behavior_test.go → actor_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package entity
package ego

import (
"context"
Expand Down Expand Up @@ -40,13 +40,14 @@ func TestAccountAggregate(t *testing.T) {
// create a persistence id
persistenceID := uuid.NewString()
// create the persistence behavior
behavior := NewAccountEntityBehavior(persistenceID)
behavior := NewAccountActor(persistenceID)

// create the persistence actor using the behavior previously created
actor := New[*testpb.Account](behavior, eventStore)
actor := newActor(behavior, eventStore)
// spawn the actor
pid := actorSystem.Spawn(ctx, behavior.ID(), actor)
pid, err := actorSystem.Spawn(ctx, behavior.ID(), actor)
require.NotNil(t, pid)
require.NoError(t, err)

var command proto.Message

Expand Down Expand Up @@ -126,13 +127,14 @@ func TestAccountAggregate(t *testing.T) {
// create a persistence id
persistenceID := uuid.NewString()
// create the persistence behavior
behavior := NewAccountEntityBehavior(persistenceID)
behavior := NewAccountActor(persistenceID)

// create the persistence actor using the behavior previously created
persistentActor := New[*testpb.Account](behavior, eventStore)
persistentActor := newActor(behavior, eventStore)
// spawn the actor
pid := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NotNil(t, pid)
require.NoError(t, err)

var command proto.Message

Expand Down Expand Up @@ -201,13 +203,14 @@ func TestAccountAggregate(t *testing.T) {
// create a persistence id
persistenceID := uuid.NewString()
// create the persistence behavior
behavior := NewAccountEntityBehavior(persistenceID)
behavior := NewAccountActor(persistenceID)

// create the persistence actor using the behavior previously created
persistentActor := New[*testpb.Account](behavior, eventStore)
persistentActor := newActor(behavior, eventStore)
// spawn the actor
pid := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NotNil(t, pid)
require.NoError(t, err)

command := &testpb.TestSend{}
// send the command to the actor
Expand Down Expand Up @@ -268,13 +271,14 @@ func TestAccountAggregate(t *testing.T) {
// create a persistence id
persistenceID := uuid.NewString()
// create the persistence behavior
behavior := NewAccountEntityBehavior(persistenceID)
behavior := NewAccountActor(persistenceID)

// create the persistence actor using the behavior previously created
persistentActor := New[*testpb.Account](behavior, eventStore)
persistentActor := newActor(behavior, eventStore)
// spawn the actor
pid := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NotNil(t, pid)
require.NoError(t, err)

var command proto.Message

Expand Down
Loading

0 comments on commit 4067838

Please sign in to comment.