Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server side concurrency #49

Merged
merged 21 commits into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8f9f8d3
Add a Request type
cdevienne Sep 17, 2018
5d5487e
Store the service and package parameters on the request
cdevienne Sep 17, 2018
0311b15
Context is now passed to the handler by Request
cdevienne Sep 17, 2018
85ec181
Call request.Run() in a unique place in the service handler,
cdevienne Sep 17, 2018
32d288d
Add Request.SendReply and use it instead of nrpc.Publish
cdevienne Sep 18, 2018
c59a3ad
De-specialize unmarshalling requests of streamed replies methods
cdevienne Sep 18, 2018
9f435ef
Streamed replies are now handled by Request
cdevienne Sep 18, 2018
c36e2be
Add Request.RunAndReply
cdevienne Sep 18, 2018
fcce55e
Implement server-side concurrency
cdevienne Sep 18, 2018
3f3a380
Cancel requests that are pending for too long
cdevienne Sep 18, 2018
a8fcf90
Fix tests
cdevienne Sep 19, 2018
fbe01c7
protect maxPendingDuration with the mutex
cdevienne Sep 19, 2018
729aa24
Protect SetSize with the mutex
cdevienne Sep 19, 2018
a4ce384
WorkerPool.QueueRequest now sends the TOOBUSY error itself
cdevienne Sep 19, 2018
2285329
WorkerPool: close the schedule chan after scheduler() returns
cdevienne Sep 19, 2018
36f3637
WorkerPool: fix: cancel extra requests if current queue has more pend…
cdevienne Sep 19, 2018
552a89f
WorkerPool.Close: Send an error to the pending requests callers
cdevienne Sep 19, 2018
453f2a5
Stream replies are now initialized at queueing or by the request itse…
cdevienne Sep 19, 2018
978e4fb
Fix: Use log.Printf instead of log.Println
cdevienne Sep 19, 2018
58accd6
Request: Use pointer receiver for all functions
cdevienne Sep 19, 2018
914de1b
Remove debug logs
cdevienne Sep 19, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
400 changes: 194 additions & 206 deletions examples/alloptions/alloptions.nrpc.go

Large diffs are not rendered by default.

513 changes: 347 additions & 166 deletions examples/alloptions/alloptions_test.go

Large diffs are not rendered by default.

78 changes: 48 additions & 30 deletions examples/helloworld/helloworld/helloworld.nrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ type GreeterServer interface {
// GreeterHandler provides a NATS subscription handler that can serve a
// subscription using a given GreeterServer implementation.
type GreeterHandler struct {
ctx context.Context
nc nrpc.NatsConn
server GreeterServer
ctx context.Context
workers *nrpc.WorkerPool
nc nrpc.NatsConn
server GreeterServer
}

func NewGreeterHandler(ctx context.Context, nc nrpc.NatsConn, s GreeterServer) *GreeterHandler {
Expand All @@ -33,13 +34,26 @@ func NewGreeterHandler(ctx context.Context, nc nrpc.NatsConn, s GreeterServer) *
}
}

func NewGreeterConcurrentHandler(workers *nrpc.WorkerPool, nc nrpc.NatsConn, s GreeterServer) *GreeterHandler {
return &GreeterHandler{
workers: workers,
nc: nc,
server: s,
}
}

func (h *GreeterHandler) Subject() string {
return "helloworld.Greeter.>"
}

func (h *GreeterHandler) Handler(msg *nats.Msg) {
var encoding string
var noreply bool
var ctx context.Context
if h.workers != nil {
ctx = h.workers.Context
} else {
ctx = h.ctx
}
request := nrpc.NewRequest(ctx, h.nc, msg.Subject, msg.Reply)
// extract method name & encoding from subject
_, _, name, tail, err := nrpc.ParseSubject(
"helloworld", 0, "Greeter", 0, msg.Subject)
Expand All @@ -48,54 +62,58 @@ func (h *GreeterHandler) Handler(msg *nats.Msg) {
return
}

ctx := h.ctx
request.MethodName = name
request.SubjectTail = tail

// call handler and form response
var resp proto.Message
var replyError *nrpc.Error
var immediateError *nrpc.Error
switch name {
case "SayHello":
_, encoding, err = nrpc.ParseSubjectTail(0, tail)
_, request.Encoding, err = nrpc.ParseSubjectTail(0, request.SubjectTail)
if err != nil {
log.Printf("SayHelloHanlder: SayHello subject parsing failed: %v", err)
break
}
var req HelloRequest
if err := nrpc.Unmarshal(encoding, msg.Data, &req); err != nil {
if err := nrpc.Unmarshal(request.Encoding, msg.Data, &req); err != nil {
log.Printf("SayHelloHandler: SayHello request unmarshal failed: %v", err)
replyError = &nrpc.Error{
immediateError = &nrpc.Error{
Type: nrpc.Error_CLIENT,
Message: "bad request received: " + err.Error(),
}
} else {
resp, replyError = nrpc.CaptureErrors(
func()(proto.Message, error){
innerResp, err := h.server.SayHello(ctx, req)
if err != nil {
return nil, err
}
return &innerResp, err
})
if replyError != nil {
log.Printf("SayHelloHandler: SayHello handler failed: %s", replyError.Error())
request.Handler = func(ctx context.Context)(proto.Message, error){
innerResp, err := h.server.SayHello(ctx, req)
if err != nil {
return nil, err
}
return &innerResp, err
}
}
default:
log.Printf("GreeterHandler: unknown name %q", name)
replyError = &nrpc.Error{
immediateError = &nrpc.Error{
Type: nrpc.Error_CLIENT,
Message: "unknown name: " + name,
}
}
if immediateError == nil {
if h.workers != nil {
// Try queuing the request
if err := h.workers.QueueRequest(request); err != nil {
log.Printf("nrpc: Error queuing the request: %s", err)
}
} else {
// Run the handler synchronously
request.RunAndReply()
}
}


if !noreply {
// encode and send response
err = nrpc.Publish(resp, replyError, h.nc, msg.Reply, encoding) // error is logged
if immediateError != nil {
if err := request.SendReply(nil, immediateError); err != nil {
log.Printf("GreeterHandler: Greeter handler failed to publish the response: %s", err)
}
} else {
err = nil
}
if err != nil {
log.Println("GreeterHandler: Greeter handler failed to publish the response: %s", err)
}
}

Expand Down
114 changes: 70 additions & 44 deletions examples/metrics_helloworld/helloworld/helloworld.nrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ var (
// GreeterHandler provides a NATS subscription handler that can serve a
// subscription using a given GreeterServer implementation.
type GreeterHandler struct {
ctx context.Context
nc nrpc.NatsConn
server GreeterServer
ctx context.Context
workers *nrpc.WorkerPool
nc nrpc.NatsConn
server GreeterServer
}

func NewGreeterHandler(ctx context.Context, nc nrpc.NatsConn, s GreeterServer) *GreeterHandler {
Expand All @@ -82,13 +83,26 @@ func NewGreeterHandler(ctx context.Context, nc nrpc.NatsConn, s GreeterServer) *
}
}

func NewGreeterConcurrentHandler(workers *nrpc.WorkerPool, nc nrpc.NatsConn, s GreeterServer) *GreeterHandler {
return &GreeterHandler{
workers: workers,
nc: nc,
server: s,
}
}

func (h *GreeterHandler) Subject() string {
return "helloworld.Greeter.>"
}

func (h *GreeterHandler) Handler(msg *nats.Msg) {
var encoding string
var noreply bool
var ctx context.Context
if h.workers != nil {
ctx = h.workers.Context
} else {
ctx = h.ctx
}
request := nrpc.NewRequest(ctx, h.nc, msg.Subject, msg.Reply)
// extract method name & encoding from subject
_, _, name, tail, err := nrpc.ParseSubject(
"helloworld", 0, "Greeter", 0, msg.Subject)
Expand All @@ -97,71 +111,83 @@ func (h *GreeterHandler) Handler(msg *nats.Msg) {
return
}

ctx := h.ctx
request.MethodName = name
request.SubjectTail = tail

// call handler and form response
var resp proto.Message
var replyError *nrpc.Error
var elapsed float64
var immediateError *nrpc.Error
switch name {
case "SayHello":
_, encoding, err = nrpc.ParseSubjectTail(0, tail)
_, request.Encoding, err = nrpc.ParseSubjectTail(0, request.SubjectTail)
if err != nil {
log.Printf("SayHelloHanlder: SayHello subject parsing failed: %v", err)
break
}
var req HelloRequest
if err := nrpc.Unmarshal(encoding, msg.Data, &req); err != nil {
if err := nrpc.Unmarshal(request.Encoding, msg.Data, &req); err != nil {
log.Printf("SayHelloHandler: SayHello request unmarshal failed: %v", err)
replyError = &nrpc.Error{
immediateError = &nrpc.Error{
Type: nrpc.Error_CLIENT,
Message: "bad request received: " + err.Error(),
}
serverRequestsForGreeter.WithLabelValues(
"SayHello", encoding, "unmarshal_fail").Inc()
"SayHello", request.Encoding, "unmarshal_fail").Inc()
} else {
start := time.Now()
resp, replyError = nrpc.CaptureErrors(
func()(proto.Message, error){
innerResp, err := h.server.SayHello(ctx, req)
if err != nil {
return nil, err
}
return &innerResp, err
})
elapsed = time.Since(start).Seconds()
if replyError != nil {
log.Printf("SayHelloHandler: SayHello handler failed: %s", replyError.Error())
serverRequestsForGreeter.WithLabelValues(
"SayHello", encoding, "handler_fail").Inc()
request.Handler = func(ctx context.Context)(proto.Message, error){
innerResp, err := h.server.SayHello(ctx, req)
if err != nil {
return nil, err
}
return &innerResp, err
}
}
default:
log.Printf("GreeterHandler: unknown name %q", name)
replyError = &nrpc.Error{
immediateError = &nrpc.Error{
Type: nrpc.Error_CLIENT,
Message: "unknown name: " + name,
}
serverRequestsForGreeter.WithLabelValues(
"Greeter", encoding, "name_fail").Inc()
"Greeter", request.Encoding, "name_fail").Inc()
}


if !noreply {
// encode and send response
err = nrpc.Publish(resp, replyError, h.nc, msg.Reply, encoding) // error is logged
} else {
err = nil
request.AfterReply = func(request *nrpc.Request, success, replySuccess bool) {
if !replySuccess {
serverRequestsForGreeter.WithLabelValues(
request.MethodName, request.Encoding, "sendreply_fail").Inc()
}
if success {
serverRequestsForGreeter.WithLabelValues(
request.MethodName, request.Encoding, "success").Inc()
} else {
serverRequestsForGreeter.WithLabelValues(
request.MethodName, request.Encoding, "handler_fail").Inc()
}
// report metric to Prometheus
serverHETForGreeter.WithLabelValues(request.MethodName).Observe(
request.Elapsed().Seconds())
}
if err != nil {
serverRequestsForGreeter.WithLabelValues(
name, encoding, "sendreply_fail").Inc()
} else if replyError == nil {
serverRequestsForGreeter.WithLabelValues(
name, encoding, "success").Inc()
if immediateError == nil {
if h.workers != nil {
// Try queuing the request
if err := h.workers.QueueRequest(request); err != nil {
log.Printf("nrpc: Error queuing the request: %s", err)
}
} else {
// Run the handler synchronously
request.RunAndReply()
}
}

// report metric to Prometheus
serverHETForGreeter.WithLabelValues(name).Observe(elapsed)
if immediateError != nil {
if err := request.SendReply(nil, immediateError); err != nil {
log.Printf("GreeterHandler: Greeter handler failed to publish the response: %s", err)
serverRequestsForGreeter.WithLabelValues(
request.MethodName, request.Encoding, "handler_fail").Inc()
}
serverHETForGreeter.WithLabelValues(request.MethodName).Observe(
request.Elapsed().Seconds())
} else {
}
}

type GreeterClient struct {
Expand Down
Loading