-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsubscription.go
89 lines (79 loc) · 1.48 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package grpc
import (
"io"
"sync"
)
// Subscription - general interface for subscription
type Subscription[T any, P any] interface {
Filter(typ T) bool
Send(msg P)
Listen() <-chan P
io.Closer
}
// Subscriptions -
type Subscriptions[T any, P any] struct {
m map[uint64]Subscription[T, P]
mx *sync.RWMutex
}
// NewSubscriptions -
func NewSubscriptions[T any, P any]() *Subscriptions[T, P] {
return &Subscriptions[T, P]{
m: make(map[uint64]Subscription[T, P]),
mx: new(sync.RWMutex),
}
}
// NotifyAll -
func (s *Subscriptions[T, P]) NotifyAll(typ T, converter func(uint64, T) P) {
s.mx.RLock()
{
for id, sub := range s.m {
if sub != nil && sub.Filter(typ) {
sub.Send(converter(id, typ))
}
}
}
s.mx.RUnlock()
}
// Add -
func (s *Subscriptions[T, P]) Add(id uint64, subscription Subscription[T, P]) {
s.mx.Lock()
{
s.m[id] = subscription
}
s.mx.Unlock()
}
// Remove -
func (s *Subscriptions[T, P]) Remove(id uint64) error {
s.mx.Lock()
{
if subs, ok := s.m[id]; ok {
if err := subs.Close(); err != nil {
return err
}
delete(s.m, id)
}
}
s.mx.Unlock()
return nil
}
// Get -
func (s *Subscriptions[T, P]) Get(id uint64) (Subscription[T, P], bool) {
defer s.mx.RUnlock()
s.mx.RLock()
subs, ok := s.m[id]
return subs, ok
}
// Close -
func (s *Subscriptions[T, P]) Close() error {
s.mx.Lock()
defer s.mx.Unlock()
for _, subs := range s.m {
if subs == nil {
continue
}
if err := subs.Close(); err != nil {
return err
}
}
return nil
}