Skip to content

Commit

Permalink
Update roundrobin to use endpointsharding
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Dec 26, 2024
1 parent 724f450 commit 3a594f6
Show file tree
Hide file tree
Showing 14 changed files with 717 additions and 442 deletions.
9 changes: 5 additions & 4 deletions balancer/endpointsharding/endpointsharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package endpointsharding
package endpointsharding_test

import (
"context"
Expand All @@ -28,6 +28,7 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/endpointsharding"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
Expand Down Expand Up @@ -55,7 +56,7 @@ var logger = grpclog.Component("endpoint-sharding-test")

func init() {
var err error
gracefulSwitchPickFirst, err = ParseConfig(json.RawMessage(PickFirstConfig))
gracefulSwitchPickFirst, err = endpointsharding.ParseConfig(json.RawMessage(endpointsharding.PickFirstConfig))
if err != nil {
logger.Fatal(err)
}
Expand All @@ -75,7 +76,7 @@ func (fakePetioleBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptio
ClientConn: cc,
bOpts: opts,
}
fp.Balancer = NewBalancer(fp, opts)
fp.Balancer = endpointsharding.NewBalancer(fp, opts)
return fp
}

Expand Down Expand Up @@ -105,7 +106,7 @@ func (fp *fakePetiole) UpdateClientConnState(state balancer.ClientConnState) err
}

func (fp *fakePetiole) UpdateState(state balancer.State) {
childStates := ChildStatesFromPicker(state.Picker)
childStates := endpointsharding.ChildStatesFromPicker(state.Picker)
// Both child states should be present in the child picker. States and
// picker change over the lifecycle of test, but there should always be two.
if len(childStates) != 2 {
Expand Down
3 changes: 1 addition & 2 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
// Record a connection attempt when exiting CONNECTING.
if newState.ConnectivityState == connectivity.TransientFailure {
sd.connectionFailedInFirstPass = true
sd.lastErr = newState.ConnectionError
connectionAttemptsFailedMetric.Record(b.metricsRecorder, 1, b.target)
}

Expand Down Expand Up @@ -702,7 +703,6 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
})
}
case connectivity.TransientFailure:
sd.lastErr = newState.ConnectionError
sd.effectiveState = connectivity.TransientFailure
// Since we're re-using common SubConns while handling resolver
// updates, we could receive an out of turn TRANSIENT_FAILURE from
Expand All @@ -728,7 +728,6 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
switch newState.ConnectivityState {
case connectivity.TransientFailure:
b.numTF = (b.numTF + 1) % b.subConns.Len()
sd.lastErr = newState.ConnectionError
if b.numTF%b.subConns.Len() == 0 {
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Expand Down
93 changes: 57 additions & 36 deletions balancer/roundrobin/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,60 +22,81 @@
package roundrobin

import (
rand "math/rand/v2"
"sync/atomic"
"encoding/json"
"fmt"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/endpointsharding"
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
"google.golang.org/grpc/grpclog"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/serviceconfig"
)

// Name is the name of round_robin balancer.
const Name = "round_robin"

var logger = grpclog.Component("roundrobin")

// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
}
var (
logger = grpclog.Component("roundrobin")
// endpointSharding which specifies pick first children.
endpointShardingLBConfig serviceconfig.LoadBalancingConfig
)

func init() {
balancer.Register(newBuilder())
var err error
endpointShardingLBConfig, err = endpointsharding.ParseConfig(json.RawMessage(endpointsharding.PickFirstConfig))
if err != nil {
logger.Fatal(err)
}
balancer.Register(builder{})
}

type rrPickerBuilder struct{}
type builder struct{}

func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
logger.Infof("roundrobinPicker: Build called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
scs := make([]balancer.SubConn, 0, len(info.ReadySCs))
for sc := range info.ReadySCs {
scs = append(scs, sc)
func (bb builder) Name() string {
return Name
}

func (bb builder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
bal := &rrBalancer{
cc: cc,
child: endpointsharding.NewBalancer(cc, opts),
}
return &rrPicker{
subConns: scs,
// Start at a random index, as the same RR balancer rebuilds a new
// picker when SubConn states change, and we don't want to apply excess
// load to the first server in the list.
next: uint32(rand.IntN(len(scs))),
bal.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[%p] ", bal))
bal.logger.Infof("Created")
return bal
}

type rrBalancer struct {
cc balancer.ClientConn
child balancer.Balancer
logger *internalgrpclog.PrefixLogger
}

func (b *rrBalancer) Close() {
b.child.Close()
}

func (b *rrBalancer) ExitIdle() {
// Should always be ok, as child is endpoint sharding.
if ei, ok := b.child.(balancer.ExitIdler); ok {
ei.ExitIdle()
}
}

type rrPicker struct {
// subConns is the snapshot of the roundrobin balancer when this picker was
// created. The slice is immutable. Each Get() will do a round robin
// selection from it and return the selected SubConn.
subConns []balancer.SubConn
next uint32
func (b *rrBalancer) ResolverError(err error) {
// Will cause inline picker update from endpoint sharding.
b.child.ResolverError(err)
}

func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
subConnsLen := uint32(len(p.subConns))
nextIndex := atomic.AddUint32(&p.next, 1)
func (b *rrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
// Enable the health listener in pickfirst children for client side health
// checks and outlier detection, if configured.
ccs.ResolverState = pickfirstleaf.EnableHealthListener(ccs.ResolverState)
ccs.BalancerConfig = endpointShardingLBConfig
return b.child.UpdateClientConnState(ccs)
}

sc := p.subConns[nextIndex%subConnsLen]
return balancer.PickResult{SubConn: sc}, nil
func (b *rrBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}
1 change: 1 addition & 0 deletions balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ func (b *wrrBalancer) Close() {
ew.stopORCAListener()
}
}
b.child.Close()
}

func (b *wrrBalancer) ExitIdle() {
Expand Down
Loading

0 comments on commit 3a594f6

Please sign in to comment.