-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdiscovery.go
73 lines (61 loc) · 1.69 KB
/
discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package totem
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/proto"
)
type discoverOptions struct {
MaxHops int32
}
func discoverServices(ctx context.Context, ctrl *StreamController, opts discoverOptions) (*ServiceInfo, error) {
reqBytes, _ := proto.Marshal(&DiscoveryRequest{
Initiator: ctrl.uuid,
Visited: []string{ctrl.uuid},
RemainingHops: opts.MaxHops,
})
lg := ctrl.Logger
var span trace.Span
if TracingEnabled {
ctx, span = ctrl.tracer.Start(ctx, "Discovery",
trace.WithLinks(trace.LinkFromContext(ctrl.stream.Context())),
trace.WithAttributes(
attribute.String("initiator", ctrl.uuid),
attribute.String("name", ctrl.Name),
attribute.Int("maxHops", int(opts.MaxHops)),
),
)
defer span.End()
lg = lg.With(
"traceID", span.SpanContext().TraceID().String(),
)
}
lg.Debug("starting service discovery")
respC := ctrl.Request(ctx, &RPC{
ServiceName: "totem.ServerReflection",
MethodName: "ListServices",
Content: &RPC_Request{
Request: reqBytes,
},
})
resp := <-respC
respMsg := resp.GetResponse()
stat := respMsg.GetStatus()
if err := stat.Err(); err != nil {
lg.Warn("discovery failed", "error", err)
return nil, err
}
infoMsg := &ServiceInfo{}
if err := proto.Unmarshal(respMsg.GetResponse(), infoMsg); err != nil {
lg.Warn("received bad service info message")
return nil, err
}
if TracingEnabled {
span.AddEvent("Completed", trace.WithAttributes(
attribute.StringSlice("services", infoMsg.ServiceNames()),
), trace.WithTimestamp(time.Now()))
}
lg.Debug("discovery complete", "services", infoMsg.ServiceNames())
return infoMsg, nil
}