Skip to content

Commit

Permalink
Merge branch 'main' into refac
Browse files Browse the repository at this point in the history
# Conflicts:
#	actor.go
#	actor_test.go
#	behavior.go
#	ego.go
#	engine_test.go
#	example/main.go
#	go.mod
#	go.sum
#	helper_test.go
  • Loading branch information
Tochemey committed Oct 7, 2023
2 parents 4067838 + aa00b13 commit e25008b
Show file tree
Hide file tree
Showing 18 changed files with 456 additions and 662 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ jobs:
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN}}
- name: Download latest earthly
run: "sudo /bin/sh -c 'wget https://github.com/earthly/earthly/releases/download/v0.7.1/earthly-linux-amd64 -O /usr/local/bin/earthly && chmod +x /usr/local/bin/earthly'"
- uses: earthly/actions/setup-earthly@v1
with:
version: v0.7.19
- name: Earthly version
run: earthly --version
- name: Run Linter and Tests
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ jobs:
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN}}
- name: Download latest earthly
run: "sudo /bin/sh -c 'wget https://github.com/earthly/earthly/releases/download/v0.7.1/earthly-linux-amd64 -O /usr/local/bin/earthly && chmod +x /usr/local/bin/earthly'"
- uses: earthly/actions/setup-earthly@v1
with:
version: v0.7.19
- name: Earthly version
run: earthly --version
- name: Run Linter and Tests
Expand Down
108 changes: 53 additions & 55 deletions actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,112 +14,110 @@ import (
"go.uber.org/atomic"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"

"google.golang.org/protobuf/proto"
)

// actor is an event sourced based actor
type actor struct {
Behavior
type actor[T State] struct {
EntityBehavior[T]
// specifies the events store
eventsStore eventstore.EventsStore
// specifies the current state
currentState proto.Message
currentState T

eventsCounter *atomic.Uint64
lastCommandTime time.Time
mu sync.RWMutex
}

// ensure that the struct does implement the given interface
var _ actors.Actor = &actor{}
// enforce compilation error
var _ actors.Actor = &actor[State]{}

// newActor creates an instance of behaviorImpl provided the eventSourcedHandler and the events store
func newActor(behavior Behavior, eventsStore eventstore.EventsStore) *actor {
// newActor creates an instance of actor provided the eventSourcedHandler and the events store
func newActor[T State](behavior EntityBehavior[T], eventsStore eventstore.EventsStore) *actor[T] {
// create an instance of entity and return it
return &actor{
eventsStore: eventsStore,
Behavior: behavior,
eventsCounter: atomic.NewUint64(0),
mu: sync.RWMutex{},
return &actor[T]{
eventsStore: eventsStore,
EntityBehavior: behavior,
eventsCounter: atomic.NewUint64(0),
mu: sync.RWMutex{},
}
}

// PreStart pre-starts the actor
// At this stage we connect to the various stores
func (x *actor) PreStart(ctx context.Context) error {
func (entity *actor[T]) PreStart(ctx context.Context) error {
// add a span context
//ctx, span := telemetry.SpanContext(ctx, "PreStart")
//defer span.End()
// acquire the lock
x.mu.Lock()
entity.mu.Lock()
// release lock when done
defer x.mu.Unlock()
defer entity.mu.Unlock()

// connect to the various stores
if x.eventsStore == nil {
if entity.eventsStore == nil {
return errors.New("events store is not defined")
}

// call the connect method of the journal store
if err := x.eventsStore.Connect(ctx); err != nil {
if err := entity.eventsStore.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to the events store: %v", err)
}

// check whether there is a snapshot to recover from
if err := x.recoverFromSnapshot(ctx); err != nil {
if err := entity.recoverFromSnapshot(ctx); err != nil {
return errors.Wrap(err, "failed to recover from snapshot")
}
return nil
}

// Receive processes any message dropped into the actor mailbox.
func (x *actor) Receive(ctx actors.ReceiveContext) {
func (entity *actor[T]) Receive(ctx actors.ReceiveContext) {
// add a span context
_, span := telemetry.SpanContext(ctx.Context(), "Receive")
defer span.End()

// acquire the lock
x.mu.Lock()
entity.mu.Lock()
// release lock when done
defer x.mu.Unlock()
defer entity.mu.Unlock()

// grab the command sent
switch command := ctx.Message().(type) {
case *egopb.GetStateCommand:
x.getStateAndReply(ctx)
entity.getStateAndReply(ctx)
default:
x.processCommandAndReply(ctx, command)
entity.processCommandAndReply(ctx, command)
}
}

// PostStop prepares the actor to gracefully shutdown
func (x *actor) PostStop(ctx context.Context) error {
func (entity *actor[T]) PostStop(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "PostStop")
defer span.End()

// acquire the lock
x.mu.Lock()
entity.mu.Lock()
// release lock when done
defer x.mu.Unlock()
defer entity.mu.Unlock()

// disconnect the journal
if err := x.eventsStore.Disconnect(ctx); err != nil {
if err := entity.eventsStore.Disconnect(ctx); err != nil {
return fmt.Errorf("failed to disconnect the events store: %v", err)
}
return nil
}

// recoverFromSnapshot reset the persistent actor to the latest snapshot in case there is one
// this is vital when the entity actor is restarting.
func (x *actor) recoverFromSnapshot(ctx context.Context) error {
func (entity *actor[T]) recoverFromSnapshot(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "RecoverFromSnapshot")
defer span.End()

// check whether there is a snapshot to recover from
event, err := x.eventsStore.GetLatestEvent(ctx, x.ID())
event, err := entity.eventsStore.GetLatestEvent(ctx, entity.ID())
// handle the error
if err != nil {
return errors.Wrap(err, "failed to recover the latest journal")
Expand All @@ -128,22 +126,22 @@ func (x *actor) recoverFromSnapshot(ctx context.Context) error {
// we do have the latest state just recover from it
if event != nil {
// set the current state
if err := event.GetResultingState().UnmarshalTo(x.currentState); err != nil {
if err := event.GetResultingState().UnmarshalTo(entity.currentState); err != nil {
return errors.Wrap(err, "failed unmarshal the latest state")
}

// set the event counter
x.eventsCounter.Store(event.GetSequenceNumber())
entity.eventsCounter.Store(event.GetSequenceNumber())
return nil
}

// in case there is no snapshot
x.currentState = x.InitialState()
entity.currentState = entity.InitialState()
return nil
}

// sendErrorReply sends an error as a reply message
func (x *actor) sendErrorReply(ctx actors.ReceiveContext, err error) {
func (entity *actor[T]) sendErrorReply(ctx actors.ReceiveContext, err error) {
// create a new error reply
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_ErrorReply{
Expand All @@ -157,12 +155,12 @@ func (x *actor) sendErrorReply(ctx actors.ReceiveContext, err error) {
}

// getStateAndReply returns the current state of the entity
func (x *actor) getStateAndReply(ctx actors.ReceiveContext) {
func (entity *actor[T]) getStateAndReply(ctx actors.ReceiveContext) {
// let us fetch the latest journal
latestEvent, err := x.eventsStore.GetLatestEvent(ctx.Context(), x.ID())
latestEvent, err := entity.eventsStore.GetLatestEvent(ctx.Context(), entity.ID())
// handle the error
if err != nil {
x.sendErrorReply(ctx, err)
entity.sendErrorReply(ctx, err)
return
}

Expand All @@ -171,7 +169,7 @@ func (x *actor) getStateAndReply(ctx actors.ReceiveContext) {
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_StateReply{
StateReply: &egopb.StateReply{
PersistenceId: x.ID(),
PersistenceId: entity.ID(),
State: resultingState,
SequenceNumber: latestEvent.GetSequenceNumber(),
Timestamp: latestEvent.GetTimestamp(),
Expand All @@ -184,15 +182,15 @@ func (x *actor) getStateAndReply(ctx actors.ReceiveContext) {
}

// processCommandAndReply processes the incoming command
func (x *actor) processCommandAndReply(ctx actors.ReceiveContext, command proto.Message) {
func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, command Command) {
// set the go context
goCtx := ctx.Context()
// pass the received command to the command handler
event, err := x.HandleCommand(goCtx, command, x.currentState)
event, err := entity.HandleCommand(goCtx, command, entity.currentState)
// handle the command handler error
if err != nil {
// send an error reply
x.sendErrorReply(ctx, err)
entity.sendErrorReply(ctx, err)
return
}

Expand All @@ -210,58 +208,58 @@ func (x *actor) processCommandAndReply(ctx actors.ReceiveContext, command proto.
}

// process the event by calling the event handler
resultingState, err := x.HandleEvent(goCtx, event, x.currentState)
resultingState, err := entity.HandleEvent(goCtx, event, entity.currentState)
// handle the event handler error
if err != nil {
// send an error reply
x.sendErrorReply(ctx, err)
entity.sendErrorReply(ctx, err)
return
}

// increment the event counter
x.eventsCounter.Inc()
entity.eventsCounter.Inc()

// set the current state for the next command
x.currentState = resultingState
entity.currentState = resultingState

// marshal the event and the resulting state
marshaledEvent, _ := anypb.New(event)
marshaledState, _ := anypb.New(resultingState)

sequenceNumber := x.eventsCounter.Load()
sequenceNumber := entity.eventsCounter.Load()
timestamp := timestamppb.Now()
x.lastCommandTime = timestamp.AsTime()
shardNumber := ctx.Self().ActorSystem().GetPartition(x.ID())
entity.lastCommandTime = timestamp.AsTime()
shardNumber := ctx.Self().ActorSystem().GetPartition(entity.ID())

// create the event
envelope := &egopb.Event{
PersistenceId: x.ID(),
PersistenceId: entity.ID(),
SequenceNumber: sequenceNumber,
IsDeleted: false,
Event: marshaledEvent,
ResultingState: marshaledState,
Timestamp: x.lastCommandTime.Unix(),
Timestamp: entity.lastCommandTime.Unix(),
Shard: shardNumber,
}

// create a journal list
journals := []*egopb.Event{envelope}

// TODO persist the event in batch using a child actor
if err := x.eventsStore.WriteEvents(goCtx, journals); err != nil {
if err := entity.eventsStore.WriteEvents(goCtx, journals); err != nil {
// send an error reply
x.sendErrorReply(ctx, err)
entity.sendErrorReply(ctx, err)
return
}

// create the command reply to send
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_StateReply{
StateReply: &egopb.StateReply{
PersistenceId: x.ID(),
PersistenceId: entity.ID(),
State: marshaledState,
SequenceNumber: sequenceNumber,
Timestamp: x.lastCommandTime.Unix(),
Timestamp: entity.lastCommandTime.Unix(),
},
},
}
Expand Down
30 changes: 13 additions & 17 deletions actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"google.golang.org/protobuf/proto"
)

func TestAccountAggregate(t *testing.T) {
func TestActor(t *testing.T) {
t.Run("with state reply", func(t *testing.T) {
defer goleak.VerifyNone(t)
ctx := context.TODO()
Expand All @@ -40,14 +40,13 @@ func TestAccountAggregate(t *testing.T) {
// create a persistence id
persistenceID := uuid.NewString()
// create the persistence behavior
behavior := NewAccountActor(persistenceID)
behavior := NewAccountEntityBehavior(persistenceID)

// create the persistence actor using the behavior previously created
actor := newActor(behavior, eventStore)
actor := newActor[*testpb.Account](behavior, eventStore)
// spawn the actor
pid, err := actorSystem.Spawn(ctx, behavior.ID(), actor)
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor)
require.NotNil(t, pid)
require.NoError(t, err)

var command proto.Message

Expand Down Expand Up @@ -127,14 +126,13 @@ func TestAccountAggregate(t *testing.T) {
// create a persistence id
persistenceID := uuid.NewString()
// create the persistence behavior
behavior := NewAccountActor(persistenceID)
behavior := NewAccountEntityBehavior(persistenceID)

// create the persistence actor using the behavior previously created
persistentActor := newActor(behavior, eventStore)
persistentActor := newActor[*testpb.Account](behavior, eventStore)
// spawn the actor
pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NotNil(t, pid)
require.NoError(t, err)

var command proto.Message

Expand Down Expand Up @@ -203,14 +201,13 @@ func TestAccountAggregate(t *testing.T) {
// create a persistence id
persistenceID := uuid.NewString()
// create the persistence behavior
behavior := NewAccountActor(persistenceID)
behavior := NewAccountEntityBehavior(persistenceID)

// create the persistence actor using the behavior previously created
persistentActor := newActor(behavior, eventStore)
persistentActor := newActor[*testpb.Account](behavior, eventStore)
// spawn the actor
pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NotNil(t, pid)
require.NoError(t, err)

command := &testpb.TestSend{}
// send the command to the actor
Expand Down Expand Up @@ -271,14 +268,13 @@ func TestAccountAggregate(t *testing.T) {
// create a persistence id
persistenceID := uuid.NewString()
// create the persistence behavior
behavior := NewAccountActor(persistenceID)
behavior := NewAccountEntityBehavior(persistenceID)

// create the persistence actor using the behavior previously created
persistentActor := newActor(behavior, eventStore)
persistentActor := newActor[*testpb.Account](behavior, eventStore)
// spawn the actor
pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NotNil(t, pid)
require.NoError(t, err)

var command proto.Message

Expand Down
Loading

0 comments on commit e25008b

Please sign in to comment.