-
Notifications
You must be signed in to change notification settings - Fork 24
/
eventsourcing.go
179 lines (143 loc) · 5.56 KB
/
eventsourcing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package cqrs
import (
"errors"
"reflect"
"time"
)
// EventSourcingRepository is a repository for event source based aggregates
type EventSourcingRepository interface {
GetEventStreamRepository() EventStreamRepository
GetTypeRegistry() TypeRegistry
Save(EventSourced, string) ([]VersionedEvent, error)
Get(string, EventSourced) error
GetSnapshot(id string) (EventSourced, error)
}
// EventStreamRepository is a persistance layer for events associated with aggregates by ID
type EventStreamRepository interface {
VersionedEventPublicationLogger
Save(string, []VersionedEvent) error
Get(string, int) ([]VersionedEvent, error)
SaveSnapshot(EventSourced) error
GetSnapshot(string) (EventSourced, error)
}
type defaultEventSourcingRepository struct {
Registry TypeRegistry
EventRepository EventStreamRepository
Publisher VersionedEventPublisher
}
// NewRepository constructs an EventSourcingRepository
func NewRepository(eventStreamRepository EventStreamRepository, registry TypeRegistry) EventSourcingRepository {
return NewRepositoryWithPublisher(eventStreamRepository, nil, registry)
}
// NewRepositoryWithPublisher constructs an EventSourcingRepository with a VersionedEventPublisher to dispatch events once persisted to the EventStreamRepository
func NewRepositoryWithPublisher(eventStreamRepository EventStreamRepository, publisher VersionedEventPublisher, registry TypeRegistry) EventSourcingRepository {
return defaultEventSourcingRepository{registry, eventStreamRepository, publisher}
}
func (r defaultEventSourcingRepository) GetEventStreamRepository() EventStreamRepository {
return r.EventRepository
}
func (r defaultEventSourcingRepository) GetTypeRegistry() TypeRegistry {
return r.Registry
}
func (r defaultEventSourcingRepository) Save(source EventSourced, correlationID string) ([]VersionedEvent, error) {
id := source.ID()
if len(correlationID) == 0 {
correlationID = "cid:" + NewUUIDString()
}
saveSnapshot := source.WantsToSaveSnapshot()
currentVersion := source.Version() + 1
var latestVersion int
var events []VersionedEvent
for i, event := range source.Events() {
eventType := reflect.TypeOf(event)
latestVersion = currentVersion + i
versionedEvent := VersionedEvent{
ID: "ve:" + NewUUIDString(),
CorrelationID: correlationID,
SourceID: id,
Version: latestVersion,
EventType: eventType.String(),
Created: time.Now().UTC(),
Event: event}
events = append(events, versionedEvent)
if latestVersion%5 == 0 {
PackageLogger().Debugf("Latest version %v", latestVersion)
saveSnapshot = true
}
source.SetVersion(latestVersion)
}
//PackageLogger().Debugf(stringhelper.PrintJSON("defaultEventSourcingRepository.Save() Ctx Here", ctx))
//PackageLogger().Debugf(stringhelper.PrintJSON("defaultEventSourcingRepository.Save() Events Here:", events))
//PackageLogger().Debugf(stringhelper.PrintJSON("Source looks like: ", source))
if len(events) > 0 {
start := time.Now()
if err := r.EventRepository.Save(id, events); err != nil {
return nil, err
}
end := time.Now()
PackageLogger().Debugf("defaultEventSourcingRepository.Save() - Save Events Took [%dms]", end.Sub(start)/time.Millisecond)
}
if saveSnapshot {
// only save snapshot if actual aggregate events have been persisted (aka accepted)!
saveSnap := func() {
start := time.Now()
PackageLogger().Debugf("Saving version %v", source.Version())
if err := r.EventRepository.SaveSnapshot(source); err != nil {
PackageLogger().Debugf("Unable to save snapshot: %v", err)
// Saving the snapshot is not critical. Continue with process...
}
end := time.Now()
PackageLogger().Debugf("defaultEventSourcingRepository.Save() - Save Snapshot Took [%dms]", end.Sub(start)/time.Millisecond)
}
saveSnap()
}
if r.Publisher == nil {
return nil, nil
}
start := time.Now()
if err := r.Publisher.PublishEvents(events); err != nil {
return nil, err
}
end := time.Now()
PackageLogger().Debugf("defaultEventSourcingRepository.Save() - Publish Events Took [%dms]", end.Sub(start)/time.Millisecond)
return events, nil
}
func (r defaultEventSourcingRepository) GetSnapshot(id string) (EventSourced, error) {
// We don't need to error when we cant get the snapshot but lets at least record the issue.
snapshot, err := r.EventRepository.GetSnapshot(id)
if err != nil {
PackageLogger().Debugf("eventsoucing: GetSnapshot(): Unable to load snapshot: [%s] %v", id, err)
return nil, err
}
return snapshot, err
}
func (r defaultEventSourcingRepository) Get(id string, source EventSourced) error {
PackageLogger().Debugf("defaultEventSourcingRepository.Get() - Get events from version %v", source.Version())
start := time.Now()
events, err := r.EventRepository.Get(id, source.Version()+1)
if err != nil {
return err
}
end := time.Now()
PackageLogger().Debugf("defaultEventSourcingRepository.Get() - Got %v events took [%dms]", len(events), end.Sub(start)/time.Millisecond)
if events == nil {
PackageLogger().Debugf("No events to process")
return nil
}
start = time.Now()
handlers := r.Registry.GetHandlers(source)
for _, event := range events {
eventType := reflect.TypeOf(event.Event)
handler, ok := handlers[eventType]
if !ok {
errorMessage := "Cannot find handler for event type " + event.EventType
PackageLogger().Debugf(errorMessage)
return errors.New(errorMessage)
}
handler(source, event.Event)
}
source.SetVersion(events[len(events)-1].Version)
end = time.Now()
PackageLogger().Debugf("defaultEventSourcingRepository.Get() - Get Handlers Took [%dms]", end.Sub(start)/time.Millisecond)
return nil
}