Skip to content

Commit 8ffa7de

Browse files
committed
feat: add ExecCollectValuesWatch
Signed-off-by: Christian Stewart <[email protected]>
1 parent 7348120 commit 8ffa7de

File tree

1 file changed

+162
-0
lines changed

1 file changed

+162
-0
lines changed

bus/multi.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,3 +164,165 @@ func ExecCollectValues[T directive.Value](
164164
}
165165
}
166166
}
167+
168+
// ExecCollectValuesWatch collects values and calls a callback whenever the slice changes.
169+
// The callback is called with a snapshot of the current values whenever values are added or removed or the error changes.
170+
// The callback runs in a separate goroutine and will be called again if values change while it's running.
171+
// If waitIdle is true, waits until the directive is idle before emitting values after additions.
172+
// Removals always emit immediately (unless no values have been emitted yet).
173+
// errorCb is called with any error returned by the callback (optional).
174+
// If the callback returns any error, it will not be called again.
175+
// Returns the directive instance and reference for cleanup.
176+
// If err != nil, ref == nil.
177+
func ExecCollectValuesWatch[T directive.Value](
178+
ctx context.Context,
179+
bus Bus,
180+
dir directive.Directive,
181+
waitIdle bool,
182+
callback func(resErr error, vals []T) error,
183+
errorCb func(err error),
184+
) (directive.Instance, directive.Reference, error) {
185+
// bcast guards these variables
186+
var bcast broadcast.Broadcast
187+
188+
var vals []T
189+
var valIDs []uint32
190+
var resErr error
191+
var idle bool
192+
var emittedOnce bool
193+
var pendingEmit bool // Track if we need to emit
194+
195+
di, ref, err := bus.AddDirective(
196+
dir,
197+
NewCallbackHandler(
198+
func(v directive.AttachedValue) { // Add handler
199+
val, valOk := v.GetValue().(T)
200+
if !valOk {
201+
return
202+
}
203+
bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
204+
vals = append(vals, val)
205+
valIDs = append(valIDs, v.GetValueID())
206+
207+
// Condition 1: Value added and (!waitIdle || idle)
208+
if !waitIdle || idle {
209+
pendingEmit = true
210+
broadcast()
211+
}
212+
})
213+
},
214+
func(v directive.AttachedValue) { // Remove handler
215+
_, valOk := v.GetValue().(T)
216+
if !valOk {
217+
return
218+
}
219+
bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
220+
id := v.GetValueID()
221+
for i, valID := range valIDs {
222+
if valID == id {
223+
valIDs = slices.Clone(valIDs)
224+
valIDs[i] = valIDs[len(valIDs)-1]
225+
valIDs = valIDs[:len(valIDs)-1]
226+
vals = slices.Clone(vals)
227+
vals[i] = vals[len(vals)-1]
228+
var empty T
229+
vals[len(vals)-1] = empty
230+
vals = vals[:len(vals)-1]
231+
232+
// Condition 3: Value removed and already emitted
233+
if emittedOnce {
234+
pendingEmit = true
235+
broadcast()
236+
}
237+
break
238+
}
239+
}
240+
})
241+
},
242+
func() { // Dispose handler
243+
bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
244+
if resErr == nil {
245+
resErr = directive.ErrDirectiveDisposed
246+
pendingEmit = true
247+
broadcast()
248+
}
249+
})
250+
},
251+
),
252+
)
253+
if err != nil {
254+
if ref != nil {
255+
ref.Release()
256+
}
257+
return nil, nil, err
258+
}
259+
260+
defer di.AddIdleCallback(func(isIdle bool, errs []error) {
261+
bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
262+
if resErr != nil {
263+
return
264+
}
265+
266+
wasIdle := idle
267+
idle = isIdle
268+
269+
// Check for errors
270+
for _, err := range errs {
271+
if err != nil {
272+
resErr = err
273+
pendingEmit = true
274+
broadcast()
275+
return
276+
}
277+
}
278+
279+
// Condition 2: Became idle and haven't emitted yet
280+
if !wasIdle && idle && !emittedOnce && len(vals) > 0 {
281+
pendingEmit = true
282+
broadcast()
283+
}
284+
})
285+
})()
286+
287+
// Start goroutine to handle callbacks
288+
go func() {
289+
for {
290+
var currVals []T
291+
var currErr error
292+
var shouldEmit bool
293+
var waitCh <-chan struct{}
294+
295+
bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
296+
currErr = resErr
297+
shouldEmit = pendingEmit
298+
pendingEmit = false
299+
waitCh = getWaitCh()
300+
301+
if shouldEmit && currErr == nil {
302+
currVals = slices.Clone(vals)
303+
emittedOnce = true
304+
}
305+
})
306+
307+
// Call callback if we should emit
308+
if shouldEmit {
309+
if callback != nil {
310+
// Call with values
311+
if err := callback(currErr, currVals); err != nil && errorCb != nil {
312+
errorCb(err)
313+
return
314+
}
315+
}
316+
}
317+
318+
select {
319+
case <-ctx.Done():
320+
return
321+
case <-waitCh:
322+
// Continue to check for changes
323+
}
324+
}
325+
}()
326+
327+
return di, ref, nil
328+
}

0 commit comments

Comments
 (0)