forked from libp2p/zeroconf
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathclient.go
312 lines (272 loc) · 8.31 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
package zeroconf
import (
"errors"
"fmt"
"net/netip"
"slices"
"time"
"github.com/miekg/dns"
)
const (
// RFC6762 Section 8.3: The Multicast DNS responder MUST send at least two unsolicited
// responses
announceCount = 4
// These intervals are for exponential backoff, used for periodic actions like sending queries
minInterval = 2 * time.Second
maxInterval = time.Hour
// Enough to send a UDP packet without causing a timeout error
writeTimeout = 10 * time.Millisecond
// Max time window to coalesce cache-updates
cacheDelay = time.Millisecond * 50
)
// A zeroconf client which publishes and/or browses for services.
//
// The methods return a self-pointer for optional method chaining.
type Client struct {
done chan struct{}
conn *conn
opts *options
reload chan struct{}
}
// Returns a new client. Next, provide your options and then call `Open`.
func New() *Client {
return &Client{
opts: defaultOpts(),
reload: make(chan struct{}, 1),
done: make(chan struct{}),
}
}
// Opens the socket and starts the zeroconf service. All options must be set beforehand,
// including at least `Browse`, `Publish`, or both.
func (c *Client) Open() (_ *Client, err error) {
if err = c.opts.validate(); err != nil {
return nil, err
}
c.conn, err = newConn(c.opts)
if err != nil {
return nil, err
}
c.opts.logger.Debug("open socket", "ifaces", c.conn.ifaces)
msgCh := make(chan msgMeta, 32)
go c.conn.RunReader(msgCh)
go c.serve(msgCh)
return c, nil
}
// The main loop serving a client
func (c *Client) serve(msgCh <-chan msgMeta) {
defer close(c.done)
var (
bo = newBackoff(minInterval, maxInterval)
timer = time.NewTimer(0)
)
defer timer.Stop()
loop:
for {
var (
isPeriodic bool
// Use wall time exclusively in order to restore accurate state when waking from sleep,
// (time jumps forward) such as cache expiry.
now time.Time
)
select {
case <-c.reload:
if !timer.Stop() {
<-timer.C
}
now = time.Now().Round(0)
bo.reset()
_, err := c.conn.loadIfaces()
if err != nil {
c.opts.logger.Warn("reload failed (ifaces unchanged)", "err", err)
}
c.opts.logger.Debug("reload", "ifaces", c.conn.ifaces)
case msg, ok := <-msgCh:
now = time.Now().Round(0)
if !ok {
break loop
}
_ = c.handleQuery(msg)
if c.handleResponse(now, msg) && timer.Stop() {
// If the cache was touched, we want the update soon
timer.Reset(cacheDelay)
}
continue
case now = <-timer.C:
now = now.Round(0)
}
// Invariant: the timer is stopped.
isPeriodic = bo.advance(now)
// Publish initial announcements
if c.opts.publish != nil && isPeriodic && bo.n <= announceCount {
err := c.broadcastRecords(false)
c.opts.logger.Debug("announce", "err", err)
}
// Handle all browser-related maintenance
nextBrowserDeadline := c.advanceBrowser(now, isPeriodic)
nextTimeout := earliest(bo.next, nextBrowserDeadline).Sub(now)
// Damage control: ensure timeout isn't firing all the time in case of a bug
timer.Reset(max(200*time.Millisecond, nextTimeout))
}
}
// Reloads network interfaces and resets backoff timers, in order to reach
// newly available peers. This has no effect if the client is closed.
func (c *Client) Reload() {
select {
case c.reload <- struct{}{}:
default:
}
}
// Gracefully stops all background tasks, unannounces any services and closes the socket.
func (c *Client) Close() error {
c.conn.SetReadDeadline(time.Now())
<-c.done
if c.opts.publish != nil {
err := c.broadcastRecords(true)
c.opts.logger.Debug("unannounce", "err", err)
}
return c.conn.Close()
}
// Generate DNS records with the IPs (A/AAAA) for the provided interface (unless addrs were
// provided by the user).
func (c *Client) recordsForIface(iface *connInterface, unannounce bool) []dns.RR {
// Copy the service to create a new one with the right ips
svc := *c.opts.publish
if len(svc.Addrs) == 0 {
svc.Addrs = append(svc.Addrs, iface.v4...)
svc.Addrs = append(svc.Addrs, iface.v6...)
}
return recordsFromService(&svc, unannounce)
}
func (c *Client) handleQuery(msg msgMeta) error {
if c.opts.publish == nil {
return nil
}
// RFC6762 Section 8.2: Probing messages are ignored, for now.
if len(msg.Ns) > 0 || len(msg.Question) == 0 {
return nil
}
// If we can't determine an interface source, we simply reply as if it were sent on all interfaces.
var errs []error
for _, iface := range c.conn.ifaces {
if msg.IfIndex == 0 || msg.IfIndex == iface.Index {
if err := c.handleQueryForIface(msg.Msg, iface, msg.Src); err != nil {
errs = append(errs, fmt.Errorf("%v %w", iface.Name, err))
}
}
}
return errors.Join(errs...)
}
// handleQuery is used to handle an incoming query
func (c *Client) handleQueryForIface(query *dns.Msg, iface *connInterface, src netip.AddrPort) (err error) {
// TODO: Match quickly against the query without producing full records for each iface.
records := c.recordsForIface(iface, false)
// RFC6762 Section 5.2: Multiple questions in the same message are responded to individually.
for _, q := range query.Question {
// Check that
resp := dns.Msg{}
resp.SetReply(query)
resp.Compress = true
resp.RecursionDesired = false
resp.Authoritative = true
resp.Question = nil // RFC6762 Section 6: "responses MUST NOT contain any questions"
resp.Answer, resp.Extra = answerTo(records, query.Answer, q)
if len(resp.Answer) == 0 {
continue
}
c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
isUnicast := q.Qclass&qClassUnicastResponse != 0
if isUnicast {
err = c.conn.WriteUnicast(&resp, iface.Index, src)
} else {
err = c.conn.WriteMulticast(&resp, iface.Index, &src)
}
c.opts.logger.Debug("respond", "iface", iface.Name, "src", src, "unicast", isUnicast, "err", err)
}
return err
}
// Broadcast all records to all interfaces. If unannounce is set, the TTLs are zero
func (c *Client) broadcastRecords(unannounce bool) error {
if c.opts.publish == nil {
return nil
}
var errs []error
for _, iface := range c.conn.ifaces {
resp := new(dns.Msg)
resp.MsgHdr.Response = true
resp.MsgHdr.Authoritative = true
resp.Compress = true
resp.Answer = c.recordsForIface(iface, unannounce)
c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
err := c.conn.WriteMulticast(resp, iface.Index, nil)
errs = append(errs, err)
}
return errors.Join(errs...)
}
// Returns true if the browser needs to be advanced soon
func (c *Client) handleResponse(now time.Time, msg msgMeta) (changed bool) {
if c.opts.browser == nil {
return false
}
svcs := servicesFromRecords(msg.Msg)
for _, svc := range svcs {
// Exclude self-published services
if c.opts.publish != nil && svc.Equal(c.opts.publish) {
continue
}
// Ensure the service matches any of our "search types"
if slices.IndexFunc(c.opts.browser.types, svc.Matches) == -1 {
continue
}
changed = true
// Set custom TTL unless this is an announcement (we treat TTL=1 as intent to unannounce)
if c.opts.expiry > 0 && svc.ttl > time.Second {
svc.ttl = c.opts.expiry
}
// Override self-reported addrs with source address instead, if enabled
if c.opts.srcAddrs {
svc.Addrs = []netip.Addr{msg.Src.Addr()}
}
// TODO: Debug log when services are refreshed?
c.opts.browser.Put(svc, now)
}
return
}
func (c *Client) advanceBrowser(now time.Time, isPeriodic bool) time.Time {
if c.opts.browser == nil {
return now.Add(aLongTime)
}
if c.opts.browser.Advance(now) || isPeriodic {
err := c.broadcastQuery()
c.opts.logger.Debug("query", "err", err)
c.opts.browser.Queried()
}
return c.opts.browser.NextDeadline()
}
// Performs the actual query by service name.
func (c *Client) broadcastQuery() error {
m := new(dns.Msg)
// Query for all browser types
for _, ty := range c.opts.browser.types {
m.Question = append(m.Question, dns.Question{
Name: queryName(ty),
Qtype: dns.TypePTR,
Qclass: dns.ClassINET,
})
}
if c.opts.publish != nil {
// Include self-published service as "known answers", to avoid responding to ourselves
m.Answer = ptrRecords(c.opts.publish, false)
}
m.Id = dns.Id()
m.Compress = true
m.RecursionDesired = false
var errs []error
for _, iface := range c.conn.ifaces {
c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
err := c.conn.WriteMulticast(m, iface.Index, nil)
if err != nil {
errs = append(errs, fmt.Errorf("idx %v: %w", iface.Index, err))
}
}
return errors.Join(errs...)
}