Skip to content

Commit

Permalink
refactor: refactor code base and remove unnecessary dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Sep 29, 2024
1 parent 56fbf88 commit fbdedf8
Show file tree
Hide file tree
Showing 29 changed files with 1,387 additions and 279 deletions.
44 changes: 44 additions & 0 deletions .github/workflows/stale.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: Stale

on:
workflow_dispatch:
schedule:
- cron: '0 12 * * *'

jobs:
Stale:
runs-on: ubuntu-latest
name: Run stale
steps:
- name: Checkout
id: checkout
uses: actions/checkout@v4
- name: Stale
id: stale
uses: sonia-corporation/[email protected]
with:
pull-request-stale-label: stale-label
pull-request-days-before-stale: 14
pull-request-days-before-close: 10
pull-request-ignore-all-assignees: true
pull-request-delete-branch-after-close: true
pull-request-stale-comment: |
This pull is inactive since 14 days!
If there is no activity, it will be closed in two weeks.
You should take one of the following actions:
- Manually close this PR if it is no longer relevant
- Push new commits or comment if you have more information to share
issue-stale-label: inactive
issue-ignore-all-assignees: true
issue-days-before-stale: 14
issue-days-before-close: 10
issue-stale-comment: |
This issue is inactive since 14 days!
If there is no activity, it will be closed in two weeks.
You should take one of the following actions:
- Manually close this issue if it is no longer relevant
- Comment if you have more information to share
issue-add-labels-after-close: |
closed-due-to-inactivity
47 changes: 12 additions & 35 deletions actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -42,7 +40,6 @@ import (
"github.com/tochemey/ego/v3/egopb"
"github.com/tochemey/ego/v3/eventstore"
"github.com/tochemey/ego/v3/eventstream"
"github.com/tochemey/ego/v3/internal/telemetry"
)

var (
Expand All @@ -57,10 +54,10 @@ type actor struct {
// specifies the current state
currentState State

eventsCounter *atomic.Uint64
eventsCounter uint64
lastCommandTime time.Time
mu sync.RWMutex
eventsStream eventstream.Stream

eventsStream eventstream.Stream
}

// enforce compilation error
Expand All @@ -72,25 +69,18 @@ func newActor(behavior EntityBehavior, eventsStore eventstore.EventsStore, event
return &actor{
eventsStore: eventsStore,
EntityBehavior: behavior,
eventsCounter: atomic.NewUint64(0),
mu: sync.RWMutex{},
eventsStream: eventsStream,
}
}

// PreStart pre-starts the actor
// At this stage we connect to the various stores
func (entity *actor) PreStart(ctx context.Context) error {
spanCtx, span := telemetry.SpanContext(ctx, "PreStart")
defer span.End()
entity.mu.Lock()
defer entity.mu.Unlock()

if entity.eventsStore == nil {
return errors.New("events store is not defined")
}

if err := entity.eventsStore.Ping(spanCtx); err != nil {
if err := entity.eventsStore.Ping(ctx); err != nil {
return fmt.Errorf("failed to connect to the events store: %v", err)
}

Expand All @@ -99,12 +89,6 @@ func (entity *actor) PreStart(ctx context.Context) error {

// Receive processes any message dropped into the actor mailbox.
func (entity *actor) Receive(ctx *actors.ReceiveContext) {
_, span := telemetry.SpanContext(ctx.Context(), "Receive")
defer span.End()

entity.mu.Lock()
defer entity.mu.Unlock()

// grab the command sent
switch command := ctx.Message().(type) {
case *goaktpb.PostStart:
Expand All @@ -119,23 +103,16 @@ func (entity *actor) Receive(ctx *actors.ReceiveContext) {
}

// PostStop prepares the actor to gracefully shutdown
// nolint
func (entity *actor) PostStop(ctx context.Context) error {
_, span := telemetry.SpanContext(ctx, "PostStop")
defer span.End()

entity.mu.Lock()
defer entity.mu.Unlock()

entity.eventsCounter = 0
return nil
}

// recoverFromSnapshot reset the persistent actor to the latest snapshot in case there is one
// this is vital when the entity actor is restarting.
func (entity *actor) recoverFromSnapshot(ctx context.Context) error {
spanCtx, span := telemetry.SpanContext(ctx, "RecoverFromSnapshot")
defer span.End()

event, err := entity.eventsStore.GetLatestEvent(spanCtx, entity.ID())
event, err := entity.eventsStore.GetLatestEvent(ctx, entity.ID())
if err != nil {
return fmt.Errorf("failed to recover the latest journal: %w", err)
}
Expand All @@ -148,7 +125,7 @@ func (entity *actor) recoverFromSnapshot(ctx context.Context) error {
}
entity.currentState = currentState

entity.eventsCounter.Store(event.GetSequenceNumber())
entity.eventsCounter = event.GetSequenceNumber()
return nil
}

Expand Down Expand Up @@ -209,7 +186,7 @@ func (entity *actor) processCommandAndReply(ctx *actors.ReceiveContext, command
StateReply: &egopb.StateReply{
PersistenceId: entity.ID(),
State: resultingState,
SequenceNumber: entity.eventsCounter.Load(),
SequenceNumber: entity.eventsCounter,
Timestamp: entity.lastCommandTime.Unix(),
},
},
Expand All @@ -230,7 +207,7 @@ func (entity *actor) processCommandAndReply(ctx *actors.ReceiveContext, command
return
}

entity.eventsCounter.Inc()
entity.eventsCounter++
entity.currentState = resultingState
entity.lastCommandTime = timestamppb.Now().AsTime()

Expand All @@ -239,7 +216,7 @@ func (entity *actor) processCommandAndReply(ctx *actors.ReceiveContext, command

envelope := &egopb.Event{
PersistenceId: entity.ID(),
SequenceNumber: entity.eventsCounter.Load(),
SequenceNumber: entity.eventsCounter,
IsDeleted: false,
Event: event,
ResultingState: state,
Expand Down Expand Up @@ -272,7 +249,7 @@ func (entity *actor) processCommandAndReply(ctx *actors.ReceiveContext, command
StateReply: &egopb.StateReply{
PersistenceId: entity.ID(),
State: state,
SequenceNumber: entity.eventsCounter.Load(),
SequenceNumber: entity.eventsCounter,
Timestamp: entity.lastCommandTime.Unix(),
},
},
Expand Down
Loading

0 comments on commit fbdedf8

Please sign in to comment.