forked from mmp/vice
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patheventstream.go
252 lines (204 loc) · 6.41 KB
/
eventstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
// eventstream.go
// Copyright(c) 2022 Matt Pharr, licensed under the GNU Public License, Version 3.
// SPDX: GPL-3.0-only
package main
import (
"fmt"
"runtime"
"time"
)
type EventSubscriberId int
var (
nextSubscriberId EventSubscriberId
lastCompact time.Time
)
// Reserve 0 as an invalid id so that zero-initialization of objects that
// store EventSubscriberIds works well.
const InvalidEventSubscriberId = 0
// EventStream provides a basic pub/sub event interface that allows any
// part of the system to post an event to the stream and other parts to
// subscribe and receive messages from the stream. It is the backbone for
// communicating events, world updates, and user actions across the various
// parts of the system.
type EventStream struct {
stream []interface{}
subscribers map[EventSubscriberId]*EventSubscriber
}
type EventSubscriber struct {
// offset is offset in the EventStream stream array up to which the
// subscriber has consumed events so far.
offset int
source string
}
func NewEventStream() *EventStream {
return &EventStream{subscribers: make(map[EventSubscriberId]*EventSubscriber)}
}
// Subscribe registers a new subscriber to the stream and returns an
// EventSubscriberId for the subscriber that can then be passed to other
// EventStream methods.
func (e *EventStream) Subscribe() EventSubscriberId {
nextSubscriberId++ // start handing them out at 1
id := nextSubscriberId
// Record the subscriber's callsite, so that we can more easily debug
// subscribers that aren't consuming events.
_, fn, line, _ := runtime.Caller(1)
source := fmt.Sprintf("%s:%d", fn, line)
e.subscribers[id] = &EventSubscriber{
offset: len(e.stream),
source: source}
return id
}
// Unsubscribe removes a subscriber from the subscriber list; the provided
// id can no longer be passed to the Get method to get events.
func (e *EventStream) Unsubscribe(id EventSubscriberId) {
if _, ok := e.subscribers[id]; !ok {
lg.ErrorfUp1("Attempted to unsubscribe invalid id: %d", id)
}
delete(e.subscribers, id)
}
// Post adds an event to the event stream. The type used to encode the
// event is arbitrary; it's up to the EventStream users to establish
// conventions.
func (e *EventStream) Post(event interface{}) {
if false && *devmode {
if s, ok := event.(interface{ String() string }); ok {
lg.PrintfUp1("Post %s; %d subscribers stream length %d, cap %d",
s.String(), len(e.subscribers), len(e.stream), cap(e.stream))
} else {
lg.PrintfUp1("Post %s; %d subscribers stream length %d, cap %d",
s, len(e.subscribers), len(e.stream), cap(e.stream))
}
}
// Ignore the event if no one's paying attention.
if len(e.subscribers) > 0 {
if len(e.stream)+1 == cap(e.stream) && *devmode && lg != nil {
// Dump the state of things if the array's about to grow; in
// general we expect it to pretty quickly reach steady state
// with just a handful of entries.
lg.Printf("%s", e.Dump())
}
e.stream = append(e.stream, event)
}
}
// Get returns all of the events from the stream since the last time Get
// was called with the given id. Note that events before an id was created
// with Subscribe are never reported for that id.
func (e *EventStream) Get(id EventSubscriberId) []interface{} {
sub, ok := e.subscribers[id]
if !ok {
lg.ErrorfUp1("Attempted to get with invalid id: %d", id)
return nil
}
s := e.stream[sub.offset:]
sub.offset = len(e.stream)
if time.Since(lastCompact) > 1*time.Second {
e.compact()
lastCompact = time.Now()
}
return s
}
// compact reclaims storage for events that all subscribers have seen; it
// is called periodically so that EventStream memory usage doesn't grow
// without bound.
func (e *EventStream) compact() {
minOffset := len(e.stream)
for _, sub := range e.subscribers {
if sub.offset < minOffset {
minOffset = sub.offset
}
}
if len(e.stream) > 1000 && lg != nil {
lg.Errorf("EventStream length %d", len(e.stream))
}
if minOffset > cap(e.stream)/2 {
n := len(e.stream) - minOffset
copy(e.stream, e.stream[minOffset:])
e.stream = e.stream[:n]
for _, sub := range e.subscribers {
sub.offset -= minOffset
}
}
}
// Dump prints out information about the internals of the event stream that
// may be useful for debugging.
func (e *EventStream) Dump() string {
s := fmt.Sprintf("stream: len %d cap %d", len(e.stream), cap(e.stream))
if len(e.stream) > 0 {
s += fmt.Sprintf("\n last elt %v", e.stream[len(e.stream)-1])
}
for i, sub := range e.subscribers {
s += fmt.Sprintf(" sub %d: %+v", i, sub)
}
return s
}
///////////////////////////////////////////////////////////////////////////
type AddedAircraftEvent struct {
ac *Aircraft
}
func (e *AddedAircraftEvent) String() string {
return "AddedAircraftEvent: " + e.ac.Callsign
}
type ModifiedAircraftEvent struct {
ac *Aircraft
}
func (e *ModifiedAircraftEvent) String() string {
return "ModifiedAircraftEvent: " + e.ac.Callsign
}
type RemovedAircraftEvent struct {
ac *Aircraft
}
func (e *RemovedAircraftEvent) String() string {
return "RemovedAircraftEvent: " + e.ac.Callsign
}
type InitiatedTrackEvent struct {
ac *Aircraft
}
func (e *InitiatedTrackEvent) String() string {
return "InitiatedTrackEvent: " + e.ac.Callsign
}
type DroppedTrackEvent struct {
ac *Aircraft
}
func (e *DroppedTrackEvent) String() string {
return "DroppedTrackEvent: " + e.ac.Callsign
}
type PushedFlightStripEvent struct {
callsign string
}
func (e *PushedFlightStripEvent) String() string {
return "PushedFlightStripEvent: " + e.callsign
}
type PointOutEvent struct {
controller string
ac *Aircraft
}
func (e *PointOutEvent) String() string {
return "PointOutEvent: " + e.controller + " " + e.ac.Callsign
}
type AcceptedHandoffEvent struct {
controller string
ac *Aircraft
}
func (e *AcceptedHandoffEvent) String() string {
return "AcceptedHandoffEvent: " + e.controller + " " + e.ac.Callsign
}
type CanceledHandoffEvent struct {
controller string
ac *Aircraft
}
func (e *CanceledHandoffEvent) String() string {
return "CanceledHandoffEvent: " + e.controller + " " + e.ac.Callsign
}
type RejectedHandoffEvent struct {
controller string
ac *Aircraft
}
func (e *RejectedHandoffEvent) String() string {
return "RejectedHandoffEvent: " + e.controller + " " + e.ac.Callsign
}
type RadioTransmissionEvent struct {
callsign, message string
}
func (e *RadioTransmissionEvent) String() string {
return "RadioTransmissionEvent: callsign: " + e.callsign + ", message: " + e.message
}