Skip to content

Commit

Permalink
OD using health producer
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Sep 4, 2024
1 parent 1ac12e8 commit 09573c0
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 14 deletions.
10 changes: 9 additions & 1 deletion balancer/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ var (
// Can be changed in init() if this balancer is to be registered as the default
// pickfirst.
Name = "pick_first_leaf"
// TODO(arjan-bal): This is a hack to inform Outlier Detection when the generic
// health check producer is being used, as opposed to suchannel health checking.
// Once dualstack is completed, healthchecking will be removed from the subchannel.
// Remove this when implementing the dualstack design.
GenericHealthProducerEnabledKey = "generic_health_producer_enabled"
GenericHealthProducerEnabledValue = &struct{}{}
)

const logPrefix = "[pick-first-leaf-lb %p] "
Expand Down Expand Up @@ -121,7 +127,9 @@ func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
healthState: balancer.SubConnState{ConnectivityState: connectivity.Idle},
addr: addr,
}
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
addrWithAttr := addr
addrWithAttr.Attributes = addr.Attributes.WithValue(GenericHealthProducerEnabledKey, GenericHealthProducerEnabledValue)
sc, err := b.cc.NewSubConn([]resolver.Address{addrWithAttr}, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
// Store the state and delegate.
b.serializer.TrySchedule(func(_ context.Context) {
Expand Down
86 changes: 73 additions & 13 deletions xds/internal/balancer/outlierdetection/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (
"unsafe"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirstleaf"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/health/producer"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/channelz"
Expand Down Expand Up @@ -353,8 +355,32 @@ func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state
return
}
if state.ConnectivityState == connectivity.Shutdown {
if scw.unregisterHealthListener != nil {
scw.unregisterHealthListener()
scw.unregisterHealthListener = nil
}
delete(b.scWrappers, scw.SubConn)
}
if scw.genericHealthProducerEnabled {
scw.listener(state)
return
}
b.scUpdateCh.Put(&scUpdate{
scw: scw,
state: state,
})
}

func (b *outlierDetectionBalancer) updateHealthState(sc balancer.SubConn, state balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
scw, ok := b.scWrappers[sc]
if !ok {
// Shouldn't happen if passed down a SubConnWrapper to child on SubConn
// creation.
b.logger.Errorf("OnStateChange called with SubConn that has no corresponding SubConnWrapper")
return
}
b.scUpdateCh.Put(&scUpdate{
scw: scw,
state: state,
Expand All @@ -380,6 +406,13 @@ func (b *outlierDetectionBalancer) Close() {
if b.intervalTimer != nil {
b.intervalTimer.Stop()
}

for _, scw := range b.scWrappers {
if scw.unregisterHealthListener != nil {
scw.unregisterHealthListener()
scw.unregisterHealthListener = nil
}
}
}

func (b *outlierDetectionBalancer) ExitIdle() {
Expand Down Expand Up @@ -466,7 +499,22 @@ func (b *outlierDetectionBalancer) UpdateState(s balancer.State) {
b.pickerUpdateCh.Put(s)
}

type healthListener struct {
b *outlierDetectionBalancer
sc balancer.SubConn
}

func (hl *healthListener) OnStateChange(state balancer.SubConnState) {
hl.b.updateHealthState(hl.sc, state)
}

func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
// Determine if the generic health producer is being used.
genericHealthProducerEnabled := false
if len(addrs) == 1 && addrs[0].Attributes.Value(pickfirstleaf.GenericHealthProducerEnabledKey) == pickfirstleaf.GenericHealthProducerEnabledValue {
genericHealthProducerEnabled = true
}

var sc balancer.SubConn
oldListener := opts.StateListener
opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state) }
Expand All @@ -475,10 +523,17 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal
return nil, err
}
scw := &subConnWrapper{
SubConn: sc,
addresses: addrs,
scUpdateCh: b.scUpdateCh,
listener: oldListener,
SubConn: sc,
addresses: addrs,
scUpdateCh: b.scUpdateCh,
listener: oldListener,
genericHealthProducerEnabled: genericHealthProducerEnabled,
}
if genericHealthProducerEnabled {
scw.healthListener, scw.unregisterHealthListener = producer.SwapRootListener(&healthListener{
b: b,
sc: sc,
}, sc)
}
b.mu.Lock()
defer b.mu.Unlock()
Expand Down Expand Up @@ -611,12 +666,15 @@ func min(x, y time.Duration) time.Duration {
func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) {
scw := u.scw
scw.latestState = u.state
if !scw.ejected {
if scw.listener != nil {
b.childMu.Lock()
scw.listener(u.state)
b.childMu.Unlock()
}
if scw.ejected {
return
}
b.childMu.Lock()
defer b.childMu.Unlock()
if scw.genericHealthProducerEnabled {
scw.healthListener.OnStateChange(u.state)
} else if scw.listener != nil {
scw.listener(u.state)
}
}

Expand All @@ -633,10 +691,12 @@ func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) {
ConnectivityState: connectivity.TransientFailure,
}
}
if scw.listener != nil {
b.childMu.Lock()
b.childMu.Lock()
defer b.childMu.Unlock()
if scw.genericHealthProducerEnabled {
scw.healthListener.OnStateChange(stateToUpdate)
} else if scw.listener != nil {
scw.listener(stateToUpdate)
b.childMu.Unlock()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/pickfirstleaf"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpctest"
Expand Down Expand Up @@ -367,3 +369,80 @@ func (s) TestNoopConfiguration(t *testing.T) {
t.Fatalf("error in expected round robin: %v", err)
}
}

// TestPickFirstIsNoop verifies that outlier detection is performed using the
// generic health producer when the pickfirstleaf LB policy is used. The test
// server returns error for consecutive requests and test verifies that the
// endpoint is not ejected.
func (s) TestPickFirst(t *testing.T) {
backend1 := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return nil, errors.New("some error")
},
}
if err := backend1.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
defer backend1.Stop()
t.Logf("Started bad TestService backend at: %q", backend1.Address)

backend2 := &stubserver.StubServer{}
if err := backend2.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
defer backend2.Stop()
t.Logf("Started bad TestService backend at: %q", backend1.Address)

countingODServiceConfigJSON := fmt.Sprintf(`
{
"loadBalancingConfig": [
{
"outlier_detection_experimental": {
"interval": "0.025s",
"baseEjectionTime": "100s",
"maxEjectionTime": "300s",
"failurePercentageEjection": {
"threshold": 50,
"enforcementPercentage": 100,
"minimumHosts": 0,
"requestVolume": 2
},
"childPolicy": [{"%s": {}}]
}
}
]
}`, pickfirstleaf.Name)
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(countingODServiceConfigJSON)

mr := manual.NewBuilderWithScheme("od-e2e")
// The full list of addresses.
mr.InitialState(resolver.State{
Addresses: []resolver.Address{{Addr: backend1.Address}, {Addr: backend2.Address}},
ServiceConfig: sc,
})
cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testServiceClient := testgrpc.NewTestServiceClient(cc)

// The first request should not cause ejection.
testServiceClient.EmptyCall(ctx, &testpb.Empty{})
// Wait for the failure rate algorithm to run once.
<-time.After(50 * time.Millisecond)
if got, want := cc.GetState(), connectivity.Ready; got != want {
t.Fatalf("cc.GetState() = %v, want = %v", got, want)
}

// 2 failing request should cause ejection.
testServiceClient.EmptyCall(ctx, &testpb.Empty{})
testServiceClient.EmptyCall(ctx, &testpb.Empty{})
// Wait for the failure rate algorithm to run once.
<-time.After(50 * time.Millisecond)
if got, want := cc.GetState(), connectivity.TransientFailure; got != want {
t.Fatalf("cc.GetState() = %v, want = %v", got, want)
}
}
3 changes: 3 additions & 0 deletions xds/internal/balancer/outlierdetection/subconn_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
// to help track the latest state update from the underlying SubConn, and also
// whether or not this SubConn is ejected.
type subConnWrapper struct {
genericHealthProducerEnabled bool
unregisterHealthListener func()
healthListener balancer.StateListener
balancer.SubConn
listener func(balancer.SubConnState)

Expand Down

0 comments on commit 09573c0

Please sign in to comment.