diff --git a/balancer/pickfirstleaf/pickfirstleaf.go b/balancer/pickfirstleaf/pickfirstleaf.go index 00cb37b02f0a..a2ef0d6b5d4c 100644 --- a/balancer/pickfirstleaf/pickfirstleaf.go +++ b/balancer/pickfirstleaf/pickfirstleaf.go @@ -56,6 +56,12 @@ var ( // Can be changed in init() if this balancer is to be registered as the default // pickfirst. Name = "pick_first_leaf" + // TODO(arjan-bal): This is a hack to inform Outlier Detection when the generic + // health check producer is being used, as opposed to suchannel health checking. + // Once dualstack is completed, healthchecking will be removed from the subchannel. + // Remove this when implementing the dualstack design. + GenericHealthProducerEnabledKey = "generic_health_producer_enabled" + GenericHealthProducerEnabledValue = &struct{}{} ) const logPrefix = "[pick-first-leaf-lb %p] " @@ -121,7 +127,9 @@ func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) { healthState: balancer.SubConnState{ConnectivityState: connectivity.Idle}, addr: addr, } - sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{ + addrWithAttr := addr + addrWithAttr.Attributes = addr.Attributes.WithValue(GenericHealthProducerEnabledKey, GenericHealthProducerEnabledValue) + sc, err := b.cc.NewSubConn([]resolver.Address{addrWithAttr}, balancer.NewSubConnOptions{ StateListener: func(state balancer.SubConnState) { // Store the state and delegate. b.serializer.TrySchedule(func(_ context.Context) { diff --git a/balancer/pickfirstleaf/test/pickfirstleaf_test.go b/balancer/pickfirstleaf/test/pickfirstleaf_test.go index 0b08049d1289..a5318cf3a738 100644 --- a/balancer/pickfirstleaf/test/pickfirstleaf_test.go +++ b/balancer/pickfirstleaf/test/pickfirstleaf_test.go @@ -28,6 +28,7 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc" + "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/pickfirstleaf" "google.golang.org/grpc/codes" @@ -938,7 +939,10 @@ func (b *backendManager) stopAllExcept(index int) { func (b *backendManager) resolverAddrs() []resolver.Address { addrs := make([]resolver.Address, len(b.backends)) for i, backend := range b.backends { - addrs[i] = resolver.Address{Addr: backend.Address} + addrs[i] = resolver.Address{ + Addr: backend.Address, + Attributes: attributes.New(pickfirstleaf.GenericHealthProducerEnabledKey, pickfirstleaf.GenericHealthProducerEnabledValue), + } } return addrs } diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index 80d3d444697e..f600b7ed13e6 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -33,7 +33,9 @@ import ( "unsafe" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/pickfirstleaf" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/health/producer" "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/channelz" @@ -353,8 +355,32 @@ func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state return } if state.ConnectivityState == connectivity.Shutdown { + if scw.unregisterHealthListener != nil { + scw.unregisterHealthListener() + scw.unregisterHealthListener = nil + } delete(b.scWrappers, scw.SubConn) } + if scw.genericHealthProducerEnabled { + scw.listener(state) + return + } + b.scUpdateCh.Put(&scUpdate{ + scw: scw, + state: state, + }) +} + +func (b *outlierDetectionBalancer) updateHealthState(sc balancer.SubConn, state balancer.SubConnState) { + b.mu.Lock() + defer b.mu.Unlock() + scw, ok := b.scWrappers[sc] + if !ok { + // Shouldn't happen if passed down a SubConnWrapper to child on SubConn + // creation. + b.logger.Errorf("OnStateChange called with SubConn that has no corresponding SubConnWrapper") + return + } b.scUpdateCh.Put(&scUpdate{ scw: scw, state: state, @@ -380,6 +406,13 @@ func (b *outlierDetectionBalancer) Close() { if b.intervalTimer != nil { b.intervalTimer.Stop() } + + for _, scw := range b.scWrappers { + if scw.unregisterHealthListener != nil { + scw.unregisterHealthListener() + scw.unregisterHealthListener = nil + } + } } func (b *outlierDetectionBalancer) ExitIdle() { @@ -466,7 +499,22 @@ func (b *outlierDetectionBalancer) UpdateState(s balancer.State) { b.pickerUpdateCh.Put(s) } +type healthListener struct { + b *outlierDetectionBalancer + sc balancer.SubConn +} + +func (hl *healthListener) OnStateChange(state balancer.SubConnState) { + hl.b.updateHealthState(hl.sc, state) +} + func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + // Determine if the generic health producer is being used. + genericHealthProducerEnabled := false + if len(addrs) == 1 && addrs[0].Attributes.Value(pickfirstleaf.GenericHealthProducerEnabledKey) == pickfirstleaf.GenericHealthProducerEnabledValue { + genericHealthProducerEnabled = true + } + var sc balancer.SubConn oldListener := opts.StateListener opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state) } @@ -475,10 +523,17 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal return nil, err } scw := &subConnWrapper{ - SubConn: sc, - addresses: addrs, - scUpdateCh: b.scUpdateCh, - listener: oldListener, + SubConn: sc, + addresses: addrs, + scUpdateCh: b.scUpdateCh, + listener: oldListener, + genericHealthProducerEnabled: genericHealthProducerEnabled, + } + if genericHealthProducerEnabled { + scw.healthListener, scw.unregisterHealthListener = producer.SwapRootListener(&healthListener{ + b: b, + sc: sc, + }, sc) } b.mu.Lock() defer b.mu.Unlock() @@ -611,12 +666,15 @@ func min(x, y time.Duration) time.Duration { func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) { scw := u.scw scw.latestState = u.state - if !scw.ejected { - if scw.listener != nil { - b.childMu.Lock() - scw.listener(u.state) - b.childMu.Unlock() - } + if scw.ejected { + return + } + b.childMu.Lock() + defer b.childMu.Unlock() + if scw.genericHealthProducerEnabled { + scw.healthListener.OnStateChange(u.state) + } else if scw.listener != nil { + scw.listener(u.state) } } @@ -633,10 +691,12 @@ func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) { ConnectivityState: connectivity.TransientFailure, } } - if scw.listener != nil { - b.childMu.Lock() + b.childMu.Lock() + defer b.childMu.Unlock() + if scw.genericHealthProducerEnabled { + scw.healthListener.OnStateChange(stateToUpdate) + } else if scw.listener != nil { scw.listener(stateToUpdate) - b.childMu.Unlock() } } diff --git a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go index 35c4d75301b3..6855dcf045a2 100644 --- a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go +++ b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go @@ -28,6 +28,8 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc" + "google.golang.org/grpc/balancer/pickfirstleaf" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpctest" @@ -367,3 +369,80 @@ func (s) TestNoopConfiguration(t *testing.T) { t.Fatalf("error in expected round robin: %v", err) } } + +// TestPickFirstIsNoop verifies that outlier detection is performed using the +// generic health producer when the pickfirstleaf LB policy is used. The test +// server returns error for consecutive requests and test verifies that the +// endpoint is not ejected. +func (s) TestPickFirst(t *testing.T) { + backend1 := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return nil, errors.New("some error") + }, + } + if err := backend1.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + defer backend1.Stop() + t.Logf("Started bad TestService backend at: %q", backend1.Address) + + backend2 := &stubserver.StubServer{} + if err := backend2.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + defer backend2.Stop() + t.Logf("Started bad TestService backend at: %q", backend1.Address) + + countingODServiceConfigJSON := fmt.Sprintf(` +{ + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": "0.025s", + "baseEjectionTime": "100s", + "maxEjectionTime": "300s", + "failurePercentageEjection": { + "threshold": 50, + "enforcementPercentage": 100, + "minimumHosts": 0, + "requestVolume": 2 + }, + "childPolicy": [{"%s": {}}] + } + } + ] +}`, pickfirstleaf.Name) + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(countingODServiceConfigJSON) + + mr := manual.NewBuilderWithScheme("od-e2e") + // The full list of addresses. + mr.InitialState(resolver.State{ + Addresses: []resolver.Address{{Addr: backend1.Address}, {Addr: backend2.Address}}, + ServiceConfig: sc, + }) + cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient() failed: %v", err) + } + defer cc.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testServiceClient := testgrpc.NewTestServiceClient(cc) + + // The first request should not cause ejection. + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + // Wait for the failure rate algorithm to run once. + <-time.After(50 * time.Millisecond) + if got, want := cc.GetState(), connectivity.Ready; got != want { + t.Fatalf("cc.GetState() = %v, want = %v", got, want) + } + + // 2 failing request should cause ejection. + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + // Wait for the failure rate algorithm to run once. + <-time.After(50 * time.Millisecond) + if got, want := cc.GetState(), connectivity.TransientFailure; got != want { + t.Fatalf("cc.GetState() = %v, want = %v", got, want) + } +} diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go index 0fa422d8f262..ad542c79e721 100644 --- a/xds/internal/balancer/outlierdetection/subconn_wrapper.go +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -30,6 +30,9 @@ import ( // to help track the latest state update from the underlying SubConn, and also // whether or not this SubConn is ejected. type subConnWrapper struct { + genericHealthProducerEnabled bool + unregisterHealthListener func() + healthListener balancer.StateListener balancer.SubConn listener func(balancer.SubConnState)