Skip to content

Commit

Permalink
refactor: refactor source code
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Oct 7, 2023
1 parent 5fbe28c commit aa00b13
Show file tree
Hide file tree
Showing 18 changed files with 322 additions and 705 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ jobs:
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN}}
- name: Download latest earthly
run: "sudo /bin/sh -c 'wget https://github.com/earthly/earthly/releases/download/v0.7.1/earthly-linux-amd64 -O /usr/local/bin/earthly && chmod +x /usr/local/bin/earthly'"
- uses: earthly/actions/setup-earthly@v1
with:
version: v0.7.19
- name: Earthly version
run: earthly --version
- name: Run Linter and Tests
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ jobs:
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN}}
- name: Download latest earthly
run: "sudo /bin/sh -c 'wget https://github.com/earthly/earthly/releases/download/v0.7.1/earthly-linux-amd64 -O /usr/local/bin/earthly && chmod +x /usr/local/bin/earthly'"
- uses: earthly/actions/setup-earthly@v1
with:
version: v0.7.19
- name: Earthly version
run: earthly --version
- name: Run Linter and Tests
Expand Down
67 changes: 20 additions & 47 deletions entity/behavior.go → actor.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package entity
package ego

import (
"context"
Expand All @@ -12,40 +12,13 @@ import (
"github.com/tochemey/ego/internal/telemetry"
"github.com/tochemey/goakt/actors"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

type Command proto.Message
type Event proto.Message
type State proto.Message

// Behavior defines an event sourced behavior when modeling a CQRS Behavior.
type Behavior[T State] interface {
// ID defines the id that will be used in the event journal.
// This helps track the entity in the events store.
ID() string
// InitialState returns the event sourced actor initial state.
// This is set as the initial state when there are no snapshots found the entity
InitialState() T
// HandleCommand helps handle commands received by the event sourced actor. The command handlers define how to handle each incoming command,
// which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can
// be returned as a no-op. Command handlers are the meat of the event sourced actor.
// They encode the business rules of your event sourced actor and act as a guardian of the event sourced actor consistency.
// The command eventSourcedHandler must first validate that the incoming command can be applied to the current model state.
// Any decision should be solely based on the data passed in the commands and the state of the Behavior.
// In case of successful validation, one or more events expressing the mutations are persisted.
// Once the events are persisted, they are applied to the state producing a new valid state.
HandleCommand(ctx context.Context, command Command, priorState T) (event Event, err error)
// HandleEvent handle events emitted by the command handlers. The event handlers are used to mutate the state of the event sourced actor by applying the events to it.
// Event handlers must be pure functions as they will be used when instantiating the event sourced actor and replaying the event journal.
HandleEvent(ctx context.Context, event Event, priorState T) (state T, err error)
}

// Entity is an event sourced based actor
type Entity[T State] struct {
Behavior[T]
// actor is an event sourced based actor
type actor[T State] struct {
EntityBehavior[T]
// specifies the events store
eventsStore eventstore.EventsStore
// specifies the current state
Expand All @@ -57,22 +30,22 @@ type Entity[T State] struct {
}

// enforce compilation error
var _ actors.Actor = &Entity[State]{}
var _ actors.Actor = &actor[State]{}

// New creates an instance of Entity provided the eventSourcedHandler and the events store
func New[T State](behavior Behavior[T], eventsStore eventstore.EventsStore) *Entity[T] {
// newActor creates an instance of actor provided the eventSourcedHandler and the events store
func newActor[T State](behavior EntityBehavior[T], eventsStore eventstore.EventsStore) *actor[T] {
// create an instance of entity and return it
return &Entity[T]{
eventsStore: eventsStore,
Behavior: behavior,
eventsCounter: atomic.NewUint64(0),
mu: sync.RWMutex{},
return &actor[T]{
eventsStore: eventsStore,
EntityBehavior: behavior,
eventsCounter: atomic.NewUint64(0),
mu: sync.RWMutex{},
}
}

// PreStart pre-starts the actor
// At this stage we connect to the various stores
func (entity *Entity[T]) PreStart(ctx context.Context) error {
func (entity *actor[T]) PreStart(ctx context.Context) error {
// add a span context
//ctx, span := telemetry.SpanContext(ctx, "PreStart")
//defer span.End()
Expand All @@ -99,7 +72,7 @@ func (entity *Entity[T]) PreStart(ctx context.Context) error {
}

// Receive processes any message dropped into the actor mailbox.
func (entity *Entity[T]) Receive(ctx actors.ReceiveContext) {
func (entity *actor[T]) Receive(ctx actors.ReceiveContext) {
// add a span context
_, span := telemetry.SpanContext(ctx.Context(), "Receive")
defer span.End()
Expand All @@ -119,7 +92,7 @@ func (entity *Entity[T]) Receive(ctx actors.ReceiveContext) {
}

// PostStop prepares the actor to gracefully shutdown
func (entity *Entity[T]) PostStop(ctx context.Context) error {
func (entity *actor[T]) PostStop(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "PostStop")
defer span.End()
Expand All @@ -138,7 +111,7 @@ func (entity *Entity[T]) PostStop(ctx context.Context) error {

// recoverFromSnapshot reset the persistent actor to the latest snapshot in case there is one
// this is vital when the entity actor is restarting.
func (entity *Entity[T]) recoverFromSnapshot(ctx context.Context) error {
func (entity *actor[T]) recoverFromSnapshot(ctx context.Context) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "RecoverFromSnapshot")
defer span.End()
Expand Down Expand Up @@ -168,7 +141,7 @@ func (entity *Entity[T]) recoverFromSnapshot(ctx context.Context) error {
}

// sendErrorReply sends an error as a reply message
func (entity *Entity[T]) sendErrorReply(ctx actors.ReceiveContext, err error) {
func (entity *actor[T]) sendErrorReply(ctx actors.ReceiveContext, err error) {
// create a new error reply
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_ErrorReply{
Expand All @@ -182,7 +155,7 @@ func (entity *Entity[T]) sendErrorReply(ctx actors.ReceiveContext, err error) {
}

// getStateAndReply returns the current state of the entity
func (entity *Entity[T]) getStateAndReply(ctx actors.ReceiveContext) {
func (entity *actor[T]) getStateAndReply(ctx actors.ReceiveContext) {
// let us fetch the latest journal
latestEvent, err := entity.eventsStore.GetLatestEvent(ctx.Context(), entity.ID())
// handle the error
Expand All @@ -209,7 +182,7 @@ func (entity *Entity[T]) getStateAndReply(ctx actors.ReceiveContext) {
}

// processCommandAndReply processes the incoming command
func (entity *Entity[T]) processCommandAndReply(ctx actors.ReceiveContext, command Command) {
func (entity *actor[T]) processCommandAndReply(ctx actors.ReceiveContext, command Command) {
// set the go context
goCtx := ctx.Context()
// pass the received command to the command handler
Expand Down Expand Up @@ -256,7 +229,7 @@ func (entity *Entity[T]) processCommandAndReply(ctx actors.ReceiveContext, comma
sequenceNumber := entity.eventsCounter.Load()
timestamp := timestamppb.Now()
entity.lastCommandTime = timestamp.AsTime()
shardNumber := ctx.Self().ActorSystem().GetPartition(goCtx, entity.ID())
shardNumber := ctx.Self().ActorSystem().GetPartition(entity.ID())

// create the event
envelope := &egopb.Event{
Expand Down
20 changes: 10 additions & 10 deletions entity/behavior_test.go → actor_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package entity
package ego

import (
"context"
Expand All @@ -19,7 +19,7 @@ import (
"google.golang.org/protobuf/proto"
)

func TestAccountAggregate(t *testing.T) {
func TestActor(t *testing.T) {
t.Run("with state reply", func(t *testing.T) {
defer goleak.VerifyNone(t)
ctx := context.TODO()
Expand All @@ -43,9 +43,9 @@ func TestAccountAggregate(t *testing.T) {
behavior := NewAccountEntityBehavior(persistenceID)

// create the persistence actor using the behavior previously created
actor := New[*testpb.Account](behavior, eventStore)
actor := newActor[*testpb.Account](behavior, eventStore)
// spawn the actor
pid := actorSystem.Spawn(ctx, behavior.ID(), actor)
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor)
require.NotNil(t, pid)

var command proto.Message
Expand Down Expand Up @@ -129,9 +129,9 @@ func TestAccountAggregate(t *testing.T) {
behavior := NewAccountEntityBehavior(persistenceID)

// create the persistence actor using the behavior previously created
persistentActor := New[*testpb.Account](behavior, eventStore)
persistentActor := newActor[*testpb.Account](behavior, eventStore)
// spawn the actor
pid := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NotNil(t, pid)

var command proto.Message
Expand Down Expand Up @@ -204,9 +204,9 @@ func TestAccountAggregate(t *testing.T) {
behavior := NewAccountEntityBehavior(persistenceID)

// create the persistence actor using the behavior previously created
persistentActor := New[*testpb.Account](behavior, eventStore)
persistentActor := newActor[*testpb.Account](behavior, eventStore)
// spawn the actor
pid := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NotNil(t, pid)

command := &testpb.TestSend{}
Expand Down Expand Up @@ -271,9 +271,9 @@ func TestAccountAggregate(t *testing.T) {
behavior := NewAccountEntityBehavior(persistenceID)

// create the persistence actor using the behavior previously created
persistentActor := New[*testpb.Account](behavior, eventStore)
persistentActor := newActor[*testpb.Account](behavior, eventStore)
// spawn the actor
pid := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor)
require.NotNil(t, pid)

var command proto.Message
Expand Down
33 changes: 33 additions & 0 deletions behavior.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package ego

import (
"context"

"google.golang.org/protobuf/proto"
)

type Command proto.Message
type Event proto.Message
type State proto.Message

// EntityBehavior defines an event sourced behavior when modeling a CQRS EntityBehavior.
type EntityBehavior[T State] interface {
// ID defines the id that will be used in the event journal.
// This helps track the entity in the events store.
ID() string
// InitialState returns the event sourced actor initial state.
// This is set as the initial state when there are no snapshots found the entity
InitialState() T
// HandleCommand helps handle commands received by the event sourced actor. The command handlers define how to handle each incoming command,
// which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can
// be returned as a no-op. Command handlers are the meat of the event sourced actor.
// They encode the business rules of your event sourced actor and act as a guardian of the event sourced actor consistency.
// The command eventSourcedHandler must first validate that the incoming command can be applied to the current model state.
// Any decision should be solely based on the data passed in the commands and the state of the Behavior.
// In case of successful validation, one or more events expressing the mutations are persisted.
// Once the events are persisted, they are applied to the state producing a new valid state.
HandleCommand(ctx context.Context, command Command, priorState T) (event Event, err error)
// HandleEvent handle events emitted by the command handlers. The event handlers are used to mutate the state of the event sourced actor by applying the events to it.
// Event handlers must be pure functions as they will be used when instantiating the event sourced actor and replaying the event journal.
HandleEvent(ctx context.Context, event Event, priorState T) (state T, err error)
}
Loading

0 comments on commit aa00b13

Please sign in to comment.