Skip to content

Commit 8cd5ee3

Browse files
authored
Merge pull request #49 from nats-rpc/48-server-side-concurrency
server side concurrency
2 parents 7b212b7 + 914de1b commit 8cd5ee3

File tree

9 files changed

+1234
-654
lines changed

9 files changed

+1234
-654
lines changed

examples/alloptions/alloptions.nrpc.go

+194-206
Large diffs are not rendered by default.

examples/alloptions/alloptions_test.go

+347-166
Large diffs are not rendered by default.

examples/helloworld/helloworld/helloworld.nrpc.go

+48-30
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ type GreeterServer interface {
2020
// GreeterHandler provides a NATS subscription handler that can serve a
2121
// subscription using a given GreeterServer implementation.
2222
type GreeterHandler struct {
23-
ctx context.Context
24-
nc nrpc.NatsConn
25-
server GreeterServer
23+
ctx context.Context
24+
workers *nrpc.WorkerPool
25+
nc nrpc.NatsConn
26+
server GreeterServer
2627
}
2728

2829
func NewGreeterHandler(ctx context.Context, nc nrpc.NatsConn, s GreeterServer) *GreeterHandler {
@@ -33,13 +34,26 @@ func NewGreeterHandler(ctx context.Context, nc nrpc.NatsConn, s GreeterServer) *
3334
}
3435
}
3536

37+
func NewGreeterConcurrentHandler(workers *nrpc.WorkerPool, nc nrpc.NatsConn, s GreeterServer) *GreeterHandler {
38+
return &GreeterHandler{
39+
workers: workers,
40+
nc: nc,
41+
server: s,
42+
}
43+
}
44+
3645
func (h *GreeterHandler) Subject() string {
3746
return "helloworld.Greeter.>"
3847
}
3948

4049
func (h *GreeterHandler) Handler(msg *nats.Msg) {
41-
var encoding string
42-
var noreply bool
50+
var ctx context.Context
51+
if h.workers != nil {
52+
ctx = h.workers.Context
53+
} else {
54+
ctx = h.ctx
55+
}
56+
request := nrpc.NewRequest(ctx, h.nc, msg.Subject, msg.Reply)
4357
// extract method name & encoding from subject
4458
_, _, name, tail, err := nrpc.ParseSubject(
4559
"helloworld", 0, "Greeter", 0, msg.Subject)
@@ -48,54 +62,58 @@ func (h *GreeterHandler) Handler(msg *nats.Msg) {
4862
return
4963
}
5064

51-
ctx := h.ctx
65+
request.MethodName = name
66+
request.SubjectTail = tail
67+
5268
// call handler and form response
53-
var resp proto.Message
54-
var replyError *nrpc.Error
69+
var immediateError *nrpc.Error
5570
switch name {
5671
case "SayHello":
57-
_, encoding, err = nrpc.ParseSubjectTail(0, tail)
72+
_, request.Encoding, err = nrpc.ParseSubjectTail(0, request.SubjectTail)
5873
if err != nil {
5974
log.Printf("SayHelloHanlder: SayHello subject parsing failed: %v", err)
6075
break
6176
}
6277
var req HelloRequest
63-
if err := nrpc.Unmarshal(encoding, msg.Data, &req); err != nil {
78+
if err := nrpc.Unmarshal(request.Encoding, msg.Data, &req); err != nil {
6479
log.Printf("SayHelloHandler: SayHello request unmarshal failed: %v", err)
65-
replyError = &nrpc.Error{
80+
immediateError = &nrpc.Error{
6681
Type: nrpc.Error_CLIENT,
6782
Message: "bad request received: " + err.Error(),
6883
}
6984
} else {
70-
resp, replyError = nrpc.CaptureErrors(
71-
func()(proto.Message, error){
72-
innerResp, err := h.server.SayHello(ctx, req)
73-
if err != nil {
74-
return nil, err
75-
}
76-
return &innerResp, err
77-
})
78-
if replyError != nil {
79-
log.Printf("SayHelloHandler: SayHello handler failed: %s", replyError.Error())
85+
request.Handler = func(ctx context.Context)(proto.Message, error){
86+
innerResp, err := h.server.SayHello(ctx, req)
87+
if err != nil {
88+
return nil, err
89+
}
90+
return &innerResp, err
8091
}
8192
}
8293
default:
8394
log.Printf("GreeterHandler: unknown name %q", name)
84-
replyError = &nrpc.Error{
95+
immediateError = &nrpc.Error{
8596
Type: nrpc.Error_CLIENT,
8697
Message: "unknown name: " + name,
8798
}
8899
}
100+
if immediateError == nil {
101+
if h.workers != nil {
102+
// Try queuing the request
103+
if err := h.workers.QueueRequest(request); err != nil {
104+
log.Printf("nrpc: Error queuing the request: %s", err)
105+
}
106+
} else {
107+
// Run the handler synchronously
108+
request.RunAndReply()
109+
}
110+
}
89111

90-
91-
if !noreply {
92-
// encode and send response
93-
err = nrpc.Publish(resp, replyError, h.nc, msg.Reply, encoding) // error is logged
112+
if immediateError != nil {
113+
if err := request.SendReply(nil, immediateError); err != nil {
114+
log.Printf("GreeterHandler: Greeter handler failed to publish the response: %s", err)
115+
}
94116
} else {
95-
err = nil
96-
}
97-
if err != nil {
98-
log.Println("GreeterHandler: Greeter handler failed to publish the response: %s", err)
99117
}
100118
}
101119

examples/metrics_helloworld/helloworld/helloworld.nrpc.go

+70-44
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,10 @@ var (
6969
// GreeterHandler provides a NATS subscription handler that can serve a
7070
// subscription using a given GreeterServer implementation.
7171
type GreeterHandler struct {
72-
ctx context.Context
73-
nc nrpc.NatsConn
74-
server GreeterServer
72+
ctx context.Context
73+
workers *nrpc.WorkerPool
74+
nc nrpc.NatsConn
75+
server GreeterServer
7576
}
7677

7778
func NewGreeterHandler(ctx context.Context, nc nrpc.NatsConn, s GreeterServer) *GreeterHandler {
@@ -82,13 +83,26 @@ func NewGreeterHandler(ctx context.Context, nc nrpc.NatsConn, s GreeterServer) *
8283
}
8384
}
8485

86+
func NewGreeterConcurrentHandler(workers *nrpc.WorkerPool, nc nrpc.NatsConn, s GreeterServer) *GreeterHandler {
87+
return &GreeterHandler{
88+
workers: workers,
89+
nc: nc,
90+
server: s,
91+
}
92+
}
93+
8594
func (h *GreeterHandler) Subject() string {
8695
return "helloworld.Greeter.>"
8796
}
8897

8998
func (h *GreeterHandler) Handler(msg *nats.Msg) {
90-
var encoding string
91-
var noreply bool
99+
var ctx context.Context
100+
if h.workers != nil {
101+
ctx = h.workers.Context
102+
} else {
103+
ctx = h.ctx
104+
}
105+
request := nrpc.NewRequest(ctx, h.nc, msg.Subject, msg.Reply)
92106
// extract method name & encoding from subject
93107
_, _, name, tail, err := nrpc.ParseSubject(
94108
"helloworld", 0, "Greeter", 0, msg.Subject)
@@ -97,71 +111,83 @@ func (h *GreeterHandler) Handler(msg *nats.Msg) {
97111
return
98112
}
99113

100-
ctx := h.ctx
114+
request.MethodName = name
115+
request.SubjectTail = tail
116+
101117
// call handler and form response
102-
var resp proto.Message
103-
var replyError *nrpc.Error
104-
var elapsed float64
118+
var immediateError *nrpc.Error
105119
switch name {
106120
case "SayHello":
107-
_, encoding, err = nrpc.ParseSubjectTail(0, tail)
121+
_, request.Encoding, err = nrpc.ParseSubjectTail(0, request.SubjectTail)
108122
if err != nil {
109123
log.Printf("SayHelloHanlder: SayHello subject parsing failed: %v", err)
110124
break
111125
}
112126
var req HelloRequest
113-
if err := nrpc.Unmarshal(encoding, msg.Data, &req); err != nil {
127+
if err := nrpc.Unmarshal(request.Encoding, msg.Data, &req); err != nil {
114128
log.Printf("SayHelloHandler: SayHello request unmarshal failed: %v", err)
115-
replyError = &nrpc.Error{
129+
immediateError = &nrpc.Error{
116130
Type: nrpc.Error_CLIENT,
117131
Message: "bad request received: " + err.Error(),
118132
}
119133
serverRequestsForGreeter.WithLabelValues(
120-
"SayHello", encoding, "unmarshal_fail").Inc()
134+
"SayHello", request.Encoding, "unmarshal_fail").Inc()
121135
} else {
122-
start := time.Now()
123-
resp, replyError = nrpc.CaptureErrors(
124-
func()(proto.Message, error){
125-
innerResp, err := h.server.SayHello(ctx, req)
126-
if err != nil {
127-
return nil, err
128-
}
129-
return &innerResp, err
130-
})
131-
elapsed = time.Since(start).Seconds()
132-
if replyError != nil {
133-
log.Printf("SayHelloHandler: SayHello handler failed: %s", replyError.Error())
134-
serverRequestsForGreeter.WithLabelValues(
135-
"SayHello", encoding, "handler_fail").Inc()
136+
request.Handler = func(ctx context.Context)(proto.Message, error){
137+
innerResp, err := h.server.SayHello(ctx, req)
138+
if err != nil {
139+
return nil, err
140+
}
141+
return &innerResp, err
136142
}
137143
}
138144
default:
139145
log.Printf("GreeterHandler: unknown name %q", name)
140-
replyError = &nrpc.Error{
146+
immediateError = &nrpc.Error{
141147
Type: nrpc.Error_CLIENT,
142148
Message: "unknown name: " + name,
143149
}
144150
serverRequestsForGreeter.WithLabelValues(
145-
"Greeter", encoding, "name_fail").Inc()
151+
"Greeter", request.Encoding, "name_fail").Inc()
146152
}
147-
148-
149-
if !noreply {
150-
// encode and send response
151-
err = nrpc.Publish(resp, replyError, h.nc, msg.Reply, encoding) // error is logged
152-
} else {
153-
err = nil
153+
request.AfterReply = func(request *nrpc.Request, success, replySuccess bool) {
154+
if !replySuccess {
155+
serverRequestsForGreeter.WithLabelValues(
156+
request.MethodName, request.Encoding, "sendreply_fail").Inc()
157+
}
158+
if success {
159+
serverRequestsForGreeter.WithLabelValues(
160+
request.MethodName, request.Encoding, "success").Inc()
161+
} else {
162+
serverRequestsForGreeter.WithLabelValues(
163+
request.MethodName, request.Encoding, "handler_fail").Inc()
164+
}
165+
// report metric to Prometheus
166+
serverHETForGreeter.WithLabelValues(request.MethodName).Observe(
167+
request.Elapsed().Seconds())
154168
}
155-
if err != nil {
156-
serverRequestsForGreeter.WithLabelValues(
157-
name, encoding, "sendreply_fail").Inc()
158-
} else if replyError == nil {
159-
serverRequestsForGreeter.WithLabelValues(
160-
name, encoding, "success").Inc()
169+
if immediateError == nil {
170+
if h.workers != nil {
171+
// Try queuing the request
172+
if err := h.workers.QueueRequest(request); err != nil {
173+
log.Printf("nrpc: Error queuing the request: %s", err)
174+
}
175+
} else {
176+
// Run the handler synchronously
177+
request.RunAndReply()
178+
}
161179
}
162180

163-
// report metric to Prometheus
164-
serverHETForGreeter.WithLabelValues(name).Observe(elapsed)
181+
if immediateError != nil {
182+
if err := request.SendReply(nil, immediateError); err != nil {
183+
log.Printf("GreeterHandler: Greeter handler failed to publish the response: %s", err)
184+
serverRequestsForGreeter.WithLabelValues(
185+
request.MethodName, request.Encoding, "handler_fail").Inc()
186+
}
187+
serverHETForGreeter.WithLabelValues(request.MethodName).Observe(
188+
request.Elapsed().Seconds())
189+
} else {
190+
}
165191
}
166192

167193
type GreeterClient struct {

0 commit comments

Comments
 (0)