-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbroadcast.go
137 lines (117 loc) · 3.8 KB
/
broadcast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package graphblast
import (
"encoding/json"
"sync"
)
// Messages have two parts: an Envelope, which summarizes their contents, and
// Contents (which can be generated dynamically, and so might produce an
// error).
type Message interface {
Recipient(string) bool
Envelope() string
Contents() ([]byte, error)
}
// NewJSONMessage makes a new message from an arbitrary object, the contents of
// which is the JSON representation of the object, to be sent to all
// recipients.
func NewJSONMessage(envelope string, contents interface{}) Message {
return NewJSONMessageTo([]string{}, envelope, contents)
}
// NewJSONMessage makes a new message from an arbitrary object, the contents of
// which is the JSON representation of the object, to be sent to specific
// recipients.
func NewJSONMessageTo(recipients []string, envelope string, contents interface{}) Message {
bytes, err := json.Marshal(contents)
recipientSet := make(map[string]bool, len(recipients))
for _, name := range recipients {
recipientSet[name] = true
}
return staticMessage{recipientSet, envelope, bytes, err}
}
// staticMessage is the simplest implementation of Message -- the body is
// computed ahead of time.
type staticMessage struct {
recipients map[string]bool
envelope string
contents []byte
err error
}
func (m staticMessage) Recipient(name string) bool {
return len(m.recipients) == 0 || m.recipients[name]
}
func (m staticMessage) Envelope() string {
return m.envelope
}
func (m staticMessage) Contents() ([]byte, error) {
return m.contents, m.err
}
// Processes that need to receive messages can subscribe to (or unsubscribe
// from) a Publisher.
type Publisher interface {
Subscribe(string) <-chan Message
Unsubscribe(string)
}
// Processes that need to send messages can do so using Subscribers.
type Subscribers interface {
Send(Message)
}
// broadcastRequest asks a Broadcaster to manipulate its map of subscribers.
type broadcastRequest func(map[string]chan<- Message)
// subscribeRequest asks a Broadcaster to add a new subscriber.
func subscribeRequest(name string, channel chan<- Message) broadcastRequest {
return func(listeners map[string]chan<- Message) {
listeners[name] = channel
}
}
// subscribeRequest asks a Broadcaster to remove a subscriber.
func unsubscribeRequest(name string) broadcastRequest {
return func(listeners map[string]chan<- Message) {
delete(listeners, name)
}
}
// A Broadcaster is a Publisher and Subscribers: receivers register with it,
// and messages sent through it are dispatched to all subscribers.
type Broadcaster struct {
messages chan Message
listeners map[string]chan<- Message
*sync.Mutex
}
// NewBroadcaster creates a new, synchronous Broadcaster.
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
messages: make(chan Message),
listeners: make(map[string]chan<- Message),
Mutex: new(sync.Mutex)}
}
// Subscribe adds a new subscriber by name to a Broadcaster, and returns a
// channel on which the subscriber should listen for Messages.
func (b *Broadcaster) Subscribe(name string) <-chan Message {
result := make(chan Message)
b.Lock()
defer b.Unlock()
b.listeners[name] = result
return result
}
// Unsubscribe removes the subscriber from the Broadcaster, closing its channel
// so that it can no longer receive Messages.
func (b *Broadcaster) Unsubscribe(name string) {
b.Lock()
defer b.Unlock()
delete(b.listeners, name)
}
// Send passes the message to all the Broadcaster's subscribers.
func (b *Broadcaster) Send(message Message) {
b.messages <- message
}
// DispatchForever handles requests for new subscribers and dispatches sent
// messages to all subscribers.
func (b *Broadcaster) DispatchForever() {
for message := range b.messages {
for name, listener := range b.listeners {
if !message.Recipient(name) {
continue
}
listener <- message
}
}
}