-
Notifications
You must be signed in to change notification settings - Fork 14
/
pipeline.go
89 lines (78 loc) · 3.16 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package nethttplibrary
import (
nethttp "net/http"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
// Pipeline contract for middleware infrastructure
type Pipeline interface {
// Next moves the request object through middlewares in the pipeline
Next(req *nethttp.Request, middlewareIndex int) (*nethttp.Response, error)
}
// custom transport for net/http with a middleware pipeline
type customTransport struct {
// middleware pipeline in use for the client
middlewarePipeline *middlewarePipeline
}
// middleware pipeline implementation using a roundtripper from net/http
type middlewarePipeline struct {
// the round tripper to use to execute the request
transport nethttp.RoundTripper
// the middlewares to execute
middlewares []Middleware
}
func newMiddlewarePipeline(middlewares []Middleware, transport nethttp.RoundTripper) *middlewarePipeline {
return &middlewarePipeline{
transport: transport,
middlewares: middlewares,
}
}
// Next moves the request object through middlewares in the pipeline
func (pipeline *middlewarePipeline) Next(req *nethttp.Request, middlewareIndex int) (*nethttp.Response, error) {
if middlewareIndex < len(pipeline.middlewares) {
middleware := pipeline.middlewares[middlewareIndex]
return middleware.Intercept(pipeline, middlewareIndex+1, req)
}
obsOptions := GetObservabilityOptionsFromRequest(req)
ctx := req.Context()
var span trace.Span
var observabilityName string
if obsOptions != nil {
observabilityName = obsOptions.GetTracerInstrumentationName()
ctx, span = otel.GetTracerProvider().Tracer(observabilityName).Start(ctx, "request_transport")
defer span.End()
req = req.WithContext(ctx)
}
return pipeline.transport.RoundTrip(req)
}
// RoundTrip executes the the next middleware and returns a response
func (transport *customTransport) RoundTrip(req *nethttp.Request) (*nethttp.Response, error) {
return transport.middlewarePipeline.Next(req, 0)
}
// GetDefaultTransport returns the default http transport used by the library
func GetDefaultTransport() nethttp.RoundTripper {
defaultTransport, ok := nethttp.DefaultTransport.(*nethttp.Transport)
if !ok {
return nethttp.DefaultTransport
}
defaultTransport = defaultTransport.Clone()
defaultTransport.ForceAttemptHTTP2 = true
defaultTransport.DisableCompression = false
return defaultTransport
}
// NewCustomTransport creates a new custom transport for http client with the provided set of middleware
func NewCustomTransport(middlewares ...Middleware) *customTransport {
return NewCustomTransportWithParentTransport(nil, middlewares...)
}
// NewCustomTransportWithParentTransport creates a new custom transport which relies on the provided transport for http client with the provided set of middleware
func NewCustomTransportWithParentTransport(parentTransport nethttp.RoundTripper, middlewares ...Middleware) *customTransport {
if len(middlewares) == 0 {
middlewares = GetDefaultMiddlewares()
}
if parentTransport == nil {
parentTransport = GetDefaultTransport()
}
return &customTransport{
middlewarePipeline: newMiddlewarePipeline(middlewares, parentTransport),
}
}