Skip to content

Commit

Permalink
balancer/rls: Add picker metrics (grpc#7484)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq authored Aug 9, 2024
1 parent 3ee837c commit 7b9e012
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 31 deletions.
43 changes: 35 additions & 8 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
Expand Down Expand Up @@ -77,6 +78,28 @@ var (
clientConnUpdateHook = func() {}
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}

defaultTargetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.default_target_picks",
Description: "EXPERIMENTAL. Number of LB picks sent to the default target.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"},
Default: false,
})
targetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.target_picks",
Description: "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default target is also returned by the RLS server, RPCs sent to that target from the cache will be counted in this metric, not in grpc.rls.default_target_picks.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"},
Default: false,
})
failedPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.failed_picks",
Description: "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the RLS channel being throttled.",
Unit: "pick",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target"},
Default: false,
})
)

func init() {
Expand Down Expand Up @@ -490,15 +513,19 @@ func (b *rlsBalancer) sendNewPickerLocked() {
if b.defaultPolicy != nil {
b.defaultPolicy.acquireRef()
}

picker := &rlsPicker{
kbm: b.lbCfg.kbMap,
origEndpoint: b.bopts.Target.Endpoint(),
lb: b,
defaultPolicy: b.defaultPolicy,
ctrlCh: b.ctrlCh,
maxAge: b.lbCfg.maxAge,
staleAge: b.lbCfg.staleAge,
bg: b.bg,
kbm: b.lbCfg.kbMap,
origEndpoint: b.bopts.Target.Endpoint(),
lb: b,
defaultPolicy: b.defaultPolicy,
ctrlCh: b.ctrlCh,
maxAge: b.lbCfg.maxAge,
staleAge: b.lbCfg.staleAge,
bg: b.bg,
rlsServerTarget: b.lbCfg.lookupService,
grpcTarget: b.bopts.Target.String(),
metricsRecorder: b.bopts.MetricsRecorder,
}
picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker))
state := balancer.State{
Expand Down
102 changes: 79 additions & 23 deletions balancer/rls/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/balancer/rls/internal/keys"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
estats "google.golang.org/grpc/experimental/stats"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -61,12 +62,15 @@ type rlsPicker struct {

// The picker is given its own copy of the below fields from the RLS LB policy
// to avoid having to grab the mutex on the latter.
defaultPolicy *childPolicyWrapper // Child policy for the default target.
ctrlCh *controlChannel // Control channel to the RLS server.
maxAge time.Duration // Cache max age from LB config.
staleAge time.Duration // Cache stale age from LB config.
bg exitIdler
logger *internalgrpclog.PrefixLogger
rlsServerTarget string
grpcTarget string
metricsRecorder estats.MetricsRecorder
defaultPolicy *childPolicyWrapper // Child policy for the default target.
ctrlCh *controlChannel // Control channel to the RLS server.
maxAge time.Duration // Cache max age from LB config.
staleAge time.Duration // Cache stale age from LB config.
bg exitIdler
logger *internalgrpclog.PrefixLogger
}

// isFullMethodNameValid return true if name is of the form `/service/method`.
Expand All @@ -85,7 +89,17 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName)

p.lb.cacheMu.Lock()
defer p.lb.cacheMu.Unlock()
var pr balancer.PickResult
var err error

// Record metrics without the cache mutex held, to prevent lock contention
// between concurrent RPC's and their Pick calls. Metrics Recording can
// potentially be expensive.
metricsCallback := func() {}
defer func() {
p.lb.cacheMu.Unlock()
metricsCallback()
}()

// Lookup data cache and pending request map using request path and keys.
cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str}
Expand All @@ -98,7 +112,8 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
case dcEntry == nil && pendingEntry == nil:
throttled := p.sendRouteLookupRequestLocked(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
pr, metricsCallback, err = p.useDefaultPickIfPossible(info, errRLSThrottled)
return pr, err
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable

Expand All @@ -113,8 +128,8 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData)
}
// Delegate to child policies.
res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
return res, err
pr, metricsCallback, err = p.delegateToChildPoliciesLocked(dcEntry, info)
return pr, err
}

// We get here only if the data cache entry has expired. If entry is in
Expand All @@ -126,32 +141,50 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// message received from the control plane is still fine, as it could be
// useful for debugging purposes.
st := dcEntry.status
return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
pr, metricsCallback, err = p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
return pr, err
}

// We get here only if the entry has expired and is not in backoff.
throttled := p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
pr, metricsCallback, err = p.useDefaultPickIfPossible(info, errRLSThrottled)
return pr, err
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable

// Data cache hit. Pending request exists.
default:
if dcEntry.expiryTime.After(now) {
res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
return res, err
pr, metricsCallback, err = p.delegateToChildPoliciesLocked(dcEntry, info)
return pr, err
}
// Data cache entry has expired and pending request exists. Queue pick.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
}

// errToPickResult is a helper function which converts the error value returned
// by Pick() to a string that represents the pick result.
func errToPickResult(err error) string {
if err == nil {
return "complete"
}
if errors.Is(err, balancer.ErrNoSubConnAvailable) {
return "queue"
}
if _, ok := status.FromError(err); ok {
return "drop"
}
return "fail"
}

// delegateToChildPoliciesLocked is a helper function which iterates through the
// list of child policy wrappers in a cache entry and attempts to find a child
// policy to which this RPC can be routed to. If all child policies are in
// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily.
func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily. Returns
// a function to be invoked to record metrics.
func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, func(), error) {
const rlsDataHeaderName = "x-google-rls-data"
for i, cpw := range dcEntry.childPolicyWrappers {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
Expand All @@ -164,29 +197,52 @@ func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info bala
// X-Google-RLS-Data header.
res, err := state.Picker.Pick(info)
if err != nil {
return res, err
pr := errToPickResult(err)
return res, func() {
if pr == "queue" {
// Don't record metrics for queued Picks.
return
}
targetPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget, cpw.target, pr)
}, err
}

if res.Metadata == nil {
res.Metadata = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
} else {
res.Metadata.Append(rlsDataHeaderName, dcEntry.headerData)
}
return res, nil
return res, func() {
targetPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget, cpw.target, "complete")
}, nil
}
}

// In the unlikely event that we have a cache entry with no targets, we end up
// queueing the RPC.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
return balancer.PickResult{}, func() {}, balancer.ErrNoSubConnAvailable
}

// useDefaultPickIfPossible is a helper method which delegates to the default
// target if one is configured, or fails the pick with the given error.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
// target if one is configured, or fails the pick with the given error. Returns
// a function to be invoked to record metrics.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, func(), error) {
if p.defaultPolicy != nil {
state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
return state.Picker.Pick(info)
res, err := state.Picker.Pick(info)
pr := errToPickResult(err)
return res, func() {
if pr == "queue" {
// Don't record metrics for queued Picks.
return
}
defaultTargetPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget, p.defaultPolicy.target, pr)
}, err
}
return balancer.PickResult{}, errOnNoDefault

return balancer.PickResult{}, func() {
failedPicksMetric.Record(p.metricsRecorder, 1, p.grpcTarget, p.rlsServerTarget)
}, errOnNoDefault
}

// sendRouteLookupRequestLocked adds an entry to the pending request map and
Expand Down

0 comments on commit 7b9e012

Please sign in to comment.