Skip to content

Commit 674dc97

Browse files
committed
optimizations: conditionally enable tracing, use worker pool in stream controller
1 parent b3ad333 commit 674dc97

11 files changed

+317
-207
lines changed

clientconn.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,17 @@ func (cc *ClientConn) invoke(
104104
panic(fmt.Sprintf("[totem] unsupported request type: %T", req))
105105
}
106106

107-
name, attr := spanInfo(method, peerFromCtx(ctx))
108-
attr = append(attr, attribute.String("func", "clientConn.Invoke"))
109-
ctx, span := cc.tracer.Start(ctx, name,
110-
trace.WithSpanKind(trace.SpanKindClient),
111-
trace.WithAttributes(attr...),
112-
)
113-
defer span.End()
114-
otel.GetTextMapPropagator().Inject(ctx, &mdSupplier)
115-
107+
var span trace.Span
108+
if TracingEnabled {
109+
name, attr := spanInfo(method, peerFromCtx(ctx))
110+
attr = append(attr, attribute.String("func", "clientConn.Invoke"))
111+
ctx, span = cc.tracer.Start(ctx, name,
112+
trace.WithSpanKind(trace.SpanKindClient),
113+
trace.WithAttributes(attr...),
114+
)
115+
defer span.End()
116+
otel.GetTextMapPropagator().Inject(ctx, &mdSupplier)
117+
}
116118
cc.metrics.TrackTxBytes(serviceName, methodName, int64(len(reqMsg)))
117119

118120
rpc := &RPC{

discovery.go

+14-9
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@ func discoverServices(ctx context.Context, ctrl *StreamController, maxHops int32
1515
RemainingHops: maxHops,
1616
})
1717

18-
ctx, span := Tracer().Start(ctx, "totem.discoverServices")
19-
defer span.End()
20-
21-
lg := ctrl.logger.With(
22-
zap.String("traceID", span.SpanContext().TraceID().String()),
23-
)
18+
lg := ctrl.logger
19+
var span trace.Span
20+
if TracingEnabled {
21+
ctx, span = Tracer().Start(ctx, "totem.discoverServices")
22+
defer span.End()
23+
lg = lg.With(
24+
zap.String("traceID", span.SpanContext().TraceID().String()),
25+
)
26+
}
2427
lg.Debug("starting service discovery")
2528

2629
respC := ctrl.Request(ctx, &RPC{
@@ -47,9 +50,11 @@ func discoverServices(ctx context.Context, ctrl *StreamController, maxHops int32
4750
return nil, err
4851
}
4952

50-
span.AddEvent("Results", trace.WithAttributes(
51-
attribute.StringSlice("methods", infoMsg.MethodNames()),
52-
))
53+
if TracingEnabled {
54+
span.AddEvent("Results", trace.WithAttributes(
55+
attribute.StringSlice("methods", infoMsg.MethodNames()),
56+
))
57+
}
5358

5459
lg.With(
5560
zap.Any("methods", infoMsg.MethodNames()),

examples/simple/main.go

-4
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,6 @@ type helloServer struct {
6868
// There is nothing special about the Hello implementation - it is just a
6969
// regular unary RPC.
7070
func (h *helloServer) Hello(ctx context.Context, req *HelloRequest) (*HelloResponse, error) {
71-
// We can check if we're inside a Totem stream using the context:
72-
if totem.CheckContext(ctx) {
73-
// inside a Totem stream (not a real grpc server)
74-
}
7571
return &HelloResponse{
7672
Message: "Hello, " + req.GetName(),
7773
}, nil

go.mod

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/kralicky/totem
33
go 1.21
44

55
require (
6+
github.com/alitto/pond v1.8.3
67
github.com/charmbracelet/lipgloss v0.7.1
78
github.com/google/uuid v1.3.0
89
github.com/jhump/protoreflect v1.15.1
@@ -22,7 +23,6 @@ require (
2223
go.uber.org/atomic v1.11.0
2324
go.uber.org/zap v1.25.0
2425
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
25-
golang.org/x/sync v0.3.0
2626
google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5
2727
google.golang.org/grpc v1.57.0
2828
google.golang.org/protobuf v1.31.0
@@ -56,6 +56,7 @@ require (
5656
go.uber.org/multierr v1.11.0 // indirect
5757
golang.org/x/mod v0.12.0 // indirect
5858
golang.org/x/net v0.14.0 // indirect
59+
golang.org/x/sync v0.3.0 // indirect
5960
golang.org/x/sys v0.11.0 // indirect
6061
golang.org/x/text v0.12.0 // indirect
6162
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ cloud.google.com/go v0.110.7 h1:rJyC7nWRg2jWGZ4wSJ5nY65GTdYJkg0cd/uXb+ACI6o=
22
cloud.google.com/go/compute v1.23.0 h1:tP41Zoavr8ptEqaW6j+LQOnyBBhO7OkOMAGrgLopTwY=
33
cloud.google.com/go/compute v1.23.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM=
44
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
5+
github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs=
6+
github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q=
57
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
68
github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k=
79
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=

invokers.go

+21-15
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,21 @@ func (l *localServiceInvoker) Invoke(ctx context.Context, req *RPC) ([]byte, err
6161
zap.Strings("md", req.GetMetadata().Keys()),
6262
).Debug("invoking method using local service")
6363

64-
attrs := []attribute.KeyValue{
65-
attribute.String("func", "localServiceInvoker.Invoke"),
66-
attribute.String("name", l.service.ServiceName),
67-
}
64+
var span trace.Span
65+
if TracingEnabled {
66+
attrs := []attribute.KeyValue{
67+
attribute.String("func", "localServiceInvoker.Invoke"),
68+
attribute.String("name", l.service.ServiceName),
69+
}
6870

69-
ctx, span := Tracer().Start(ctx, "Invoke/Local: "+req.QualifiedMethodName(),
70-
trace.WithAttributes(attrs...))
71-
defer span.End()
71+
ctx, span = Tracer().Start(ctx, "Invoke/Local: "+req.QualifiedMethodName(),
72+
trace.WithAttributes(attrs...))
73+
defer span.End()
74+
}
7275

7376
if m, ok := l.methods[req.MethodName]; ok {
7477
startTime := time.Now()
75-
resp, err := m.Handler(l.serviceImpl, addTotemToContext(ctx), func(v any) error {
78+
resp, err := m.Handler(l.serviceImpl, ctx, func(v any) error {
7679
reqBytes := req.GetRequest()
7780
l.metrics.TrackRxBytes(serviceName, methodName, int64(len(reqBytes)))
7881
return proto.Unmarshal(reqBytes, protoimpl.X.ProtoMessageV2Of(v))
@@ -129,14 +132,17 @@ func (r *streamControllerInvoker) Invoke(ctx context.Context, req *RPC) ([]byte,
129132
ctx = metadata.NewOutgoingContext(ctx, md)
130133
}
131134

132-
attrs := []attribute.KeyValue{
133-
attribute.String("func", "streamControllerInvoker.Invoke"),
134-
attribute.String("name", r.controller.name),
135-
}
135+
var span trace.Span
136+
if TracingEnabled {
137+
attrs := []attribute.KeyValue{
138+
attribute.String("func", "streamControllerInvoker.Invoke"),
139+
attribute.String("name", r.controller.name),
140+
}
136141

137-
ctx, span := Tracer().Start(ctx, "Invoke/Stream: "+req.QualifiedMethodName(),
138-
trace.WithAttributes(attrs...))
139-
defer span.End()
142+
ctx, span = Tracer().Start(ctx, "Invoke/Stream: "+req.QualifiedMethodName(),
143+
trace.WithAttributes(attrs...))
144+
defer span.End()
145+
}
140146

141147
r.controller.metrics.TrackTxBytes(serviceName, methodName, int64(len(req.GetRequest())))
142148
rc := r.controller.Request(ctx, req)

server.go

+24-12
Original file line numberDiff line numberDiff line change
@@ -165,19 +165,25 @@ func (r *Server) Splice(stream Stream, opts ...StreamControllerOption) error {
165165
}
166166
}()
167167

168-
ctx, span := Tracer().Start(stream.Context(), "Server.Splice/Discovery",
169-
trace.WithAttributes(
170-
attribute.String("name", r.name),
171-
),
172-
)
168+
ctx := stream.Context()
169+
var span trace.Span
170+
if TracingEnabled {
171+
ctx, span = Tracer().Start(ctx, "Server.Splice/Discovery",
172+
trace.WithAttributes(
173+
attribute.String("name", r.name),
174+
),
175+
)
176+
}
173177
info, err := discoverServices(ctx, ctrl, r.discoveryHopLimit)
174178
if err != nil {
175179
err := fmt.Errorf("service discovery failed: %w", err)
176180
span.RecordError(err)
177181
span.SetStatus(otelcodes.Error, err.Error())
178182
return err
179183
}
180-
span.End()
184+
if TracingEnabled {
185+
span.End()
186+
}
181187
r.logger.With(
182188
zap.Any("methods", info.MethodNames()),
183189
).Debug("splicing stream")
@@ -253,13 +259,19 @@ func (r *Server) Serve() (grpc.ClientConnInterface, <-chan error) {
253259
ch <- runErr
254260
}()
255261

256-
ctx, span := Tracer().Start(r.Context(), "Server.Serve/Discovery",
257-
trace.WithAttributes(
258-
attribute.String("name", r.name),
259-
),
260-
)
262+
ctx := context.Background()
263+
var span trace.Span
264+
if TracingEnabled {
265+
ctx, span = Tracer().Start(r.Context(), "Server.Serve/Discovery",
266+
trace.WithAttributes(
267+
attribute.String("name", r.name),
268+
),
269+
)
270+
}
261271
info, err := discoverServices(ctx, r.controller, r.discoveryHopLimit)
262-
span.End()
272+
if TracingEnabled {
273+
span.End()
274+
}
263275

264276
if err != nil {
265277
r.controller.Kick(fmt.Errorf("service discovery failed: %w", err))

0 commit comments

Comments
 (0)