Skip to content

drpcserver: add server interceptor support in drpc #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
.vscode
vendor
result
.idea/*
.idea/*
.DS_Store
82 changes: 76 additions & 6 deletions drpcserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,66 @@ type Options struct {
// CollectStats controls whether the server should collect stats on the
// rpcs it serves.
CollectStats bool

serverInt ServerInterceptor
serverInts []ServerInterceptor
}

// A ServerOption sets options such as server interceptors.
type ServerOption func(options *Options)

// WithChainServerInterceptor creates a ServerOption that chains multiple server interceptors,
// with the first being the outermost wrapper and the last being the innermost.
func WithChainServerInterceptor(ints ...ServerInterceptor) ServerOption {
return func(opt *Options) {
opt.serverInts = append(opt.serverInts, ints...)
}
}

// chainServerInterceptors chains all server interceptors in the Options into a single interceptor.
// The combined chained interceptor is stored in opts.serverInt. The interceptors are invoked in the order they were added.
//
// Example usage:
//
// Interceptors are typically added using WithChainServerInterceptor when creating the server.
// The NewWithOptions function calls chainServerInterceptors internally to process these.
// server := drpcserver.NewWithOptions(
// drpcHandler,
// drpcserver.Options{}, // base server options
// drpcserver.WithChainServerInterceptor(loggingInterceptor, metricsInterceptor),
// )
//
// // Chain the interceptors
// chainServerInterceptors(server)
// // server.opts.serverInt now contains the chained server interceptors.
func chainServerInterceptors(s *Server) {
switch n := len(s.opts.serverInts); n {
case 0:
s.opts.serverInt = nil
case 1:
s.opts.serverInt = s.opts.serverInts[0]
default:
s.opts.serverInt = func(
ctx context.Context,
rpc string,
stream drpc.Stream,
handler drpc.Handler,
) error {
chained := handler
for i := n - 1; i >= 0; i-- {
next := chained
interceptor := s.opts.serverInts[i]
chainedFn := func(
stream drpc.Stream,
rpc string,
) error {
return interceptor(ctx, rpc, stream, next)
}
chained = HandlerFunc(chainedFn)
}
return chained.HandleRPC(stream, rpc)
}
}
}

// Server is an implementation of drpc.Server to serve drpc connections.
Expand All @@ -51,7 +111,7 @@ func New(handler drpc.Handler) *Server {

// NewWithOptions constructs a new Server using the provided options to tune
// how the drpc connections are handled.
func NewWithOptions(handler drpc.Handler, opts Options) *Server {
func NewWithOptions(handler drpc.Handler, opts Options, sopts ...ServerOption) *Server {
s := &Server{
opts: opts,
handler: handler,
Expand All @@ -61,6 +121,10 @@ func NewWithOptions(handler drpc.Handler, opts Options) *Server {
drpcopts.SetManagerStatsCB(&s.opts.Manager.Internal, s.getStats)
s.stats = make(map[string]*drpcstats.Stats)
}
for _, opt := range sopts {
opt(&s.opts)
}
chainServerInterceptors(s)

return s
}
Expand Down Expand Up @@ -105,7 +169,7 @@ func (s *Server) ServeOne(ctx context.Context, tr drpc.Transport) (err error) {
if err != nil {
return errs.Wrap(err)
}
if err := s.handleRPC(stream, rpc); err != nil {
if err := s.handleRPC(ctx, stream, rpc); err != nil {
return errs.Wrap(err)
}
}
Expand Down Expand Up @@ -162,10 +226,16 @@ func (s *Server) Serve(ctx context.Context, lis net.Listener) (err error) {
}

// handleRPC handles the rpc that has been requested by the stream.
func (s *Server) handleRPC(stream *drpcstream.Stream, rpc string) (err error) {
err = s.handler.HandleRPC(stream, rpc)
if err != nil {
return errs.Wrap(stream.SendError(err))
func (s *Server) handleRPC(ctx context.Context, stream *drpcstream.Stream, rpc string) (err error) {
var processingErr error
if s.opts.serverInt != nil {
processingErr = s.opts.serverInt(ctx, rpc, stream, s.handler)
} else {
processingErr = s.handler.HandleRPC(stream, rpc)
}

if processingErr != nil {
return errs.Wrap(stream.SendError(processingErr))
}
return errs.Wrap(stream.CloseSend())
}
24 changes: 24 additions & 0 deletions drpcserver/server_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package drpcserver

import (
"context"
"storj.io/drpc"
)

// HandlerFunc is an adapter to allow the use of ordinary functions as drpc.Handlers.
// If f is a function with the appropriate signature, HandlerFunc(f) is a
// drpc.Handler object that calls f.
type HandlerFunc func(stream drpc.Stream, rpc string) error

// HandleRPC calls f(stream, rpc).
// It implements the drpc.Handler interface.
func (f HandlerFunc) HandleRPC(stream drpc.Stream, rpc string) error {
return f(stream, rpc)
}

// ServerInterceptor is a function that intercepts the execution of a DRPC method on the server.
// It allows for cross-cutting concerns like logging, metrics, authentication, or request manipulation
// to be applied to RPCs.
// It is the responsibility of the interceptor to call handler.HandleRPC to continue
// processing the RPC, or to terminate the RPC by returning an error or handling it directly.
type ServerInterceptor func(ctx context.Context, rpc string, stream drpc.Stream, handler drpc.Handler) error
Loading