From 2ef3b4036137848e0f462df3901c4420e6df4283 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Fri, 6 Sep 2024 11:19:05 +0530 Subject: [PATCH] Update pf lofic for combining health and raw conn updates --- balancer/pickfirstleaf/pickfirstleaf.go | 176 ++++++++++++++---------- 1 file changed, 100 insertions(+), 76 deletions(-) diff --git a/balancer/pickfirstleaf/pickfirstleaf.go b/balancer/pickfirstleaf/pickfirstleaf.go index 3a96741d150c..63c623a743a8 100644 --- a/balancer/pickfirstleaf/pickfirstleaf.go +++ b/balancer/pickfirstleaf/pickfirstleaf.go @@ -76,8 +76,7 @@ func (pickfirstBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions subConns: resolver.NewAddressMap(), serializer: grpcsync.NewCallbackSerializer(ctx), serializerCancel: cancel, - concludedState: connectivity.Connecting, - rawConnectivityState: connectivity.Connecting, + rawConnectivityState: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, healthCheckOpts: opts.HealthCheckOptions, } b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b)) @@ -176,8 +175,7 @@ type pickfirstBalancer struct { // subconn state updates. serializer *grpcsync.CallbackSerializer serializerCancel func() - concludedState connectivity.State - rawConnectivityState connectivity.State + rawConnectivityState balancer.SubConnState subConns *resolver.AddressMap // scData for active subonns mapped by address. addressList addressList firstPass bool @@ -195,12 +193,12 @@ func (b *pickfirstBalancer) resolverError(err error) { if b.logger.V(2) { b.logger.Infof("Received error from the name resolver: %v", err) } - if b.rawConnectivityState == connectivity.Shutdown { + if b.rawConnectivityState.ConnectivityState == connectivity.Shutdown { return } // The picker will not change since the balancer does not currently // report an error. - if b.rawConnectivityState != connectivity.TransientFailure { + if b.rawConnectivityState.ConnectivityState != connectivity.TransientFailure { if b.logger.V(2) { b.logger.Infof("Ignoring resolver error because balancer is using a previous good update.") } @@ -209,11 +207,11 @@ func (b *pickfirstBalancer) resolverError(err error) { b.closeSubConns() b.addressList.updateEndpointList(nil) - b.rawConnectivityState = connectivity.TransientFailure - b.updateBalancerState(balancer.State{ + b.rawConnectivityState = balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, - Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)}, - }) + ConnectionError: fmt.Errorf("name resolver error: %v", err), + } + b.updateBalancerState() } func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { @@ -230,13 +228,16 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState // updateClientConnState handles clientConn state changes. // Only executed in the context of a serializer callback. func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState) error { - if b.rawConnectivityState == connectivity.Shutdown { + if b.rawConnectivityState.ConnectivityState == connectivity.Shutdown { return errBalancerClosed } if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 { // Cleanup state pertaining to the previous resolver state. // Treat an empty address list like an error by calling b.ResolverError. - b.rawConnectivityState = connectivity.TransientFailure + b.rawConnectivityState = balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + // The connection error will be set by resolver error next. + } b.resolverError(errors.New("produced zero addresses")) return balancer.ErrBadResolverState } @@ -283,7 +284,7 @@ func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState prevAddr := b.addressList.currentAddress() prevAddrsCount := b.addressList.size() b.addressList.updateEndpointList(newEndpoints) - if b.rawConnectivityState == connectivity.Ready && b.addressList.seekTo(prevAddr) { + if b.rawConnectivityState.ConnectivityState == connectivity.Ready && b.addressList.seekTo(prevAddr) { return nil } @@ -295,15 +296,14 @@ func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState // we should still enter CONNECTING because the sticky TF behaviour mentioned // in A62 applies only when the TRANSIENT_FAILURE is reported due to connectivity // failures. - if b.rawConnectivityState == connectivity.Ready || b.rawConnectivityState == connectivity.Connecting || prevAddrsCount == 0 { + if b.rawConnectivityState.ConnectivityState == connectivity.Ready || b.rawConnectivityState.ConnectivityState == connectivity.Connecting || prevAddrsCount == 0 { // Start connection attempt at first address. - b.rawConnectivityState = connectivity.Connecting - b.updateBalancerState(balancer.State{ + b.rawConnectivityState = balancer.SubConnState{ ConnectivityState: connectivity.Connecting, - Picker: &picker{err: balancer.ErrNoSubConnAvailable}, - }) + } + b.updateBalancerState() b.requestConnection() - } else if b.rawConnectivityState == connectivity.TransientFailure { + } else if b.rawConnectivityState.ConnectivityState == connectivity.TransientFailure { // If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until // we're READY. See A62. b.requestConnection() @@ -320,7 +320,7 @@ func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state b func (b *pickfirstBalancer) Close() { b.serializer.TrySchedule(func(_ context.Context) { b.closeSubConns() - b.rawConnectivityState = connectivity.Shutdown + b.rawConnectivityState = balancer.SubConnState{ConnectivityState: connectivity.Shutdown} }) b.serializerCancel() <-b.serializer.Done() @@ -330,7 +330,7 @@ func (b *pickfirstBalancer) Close() { // by the idlePicker and clientConn so access to variables should be synchronized. func (b *pickfirstBalancer) ExitIdle() { b.serializer.TrySchedule(func(_ context.Context) { - if b.rawConnectivityState == connectivity.Idle { + if b.rawConnectivityState.ConnectivityState == connectivity.Idle { b.requestConnection() } }) @@ -418,7 +418,7 @@ func (b *pickfirstBalancer) shutdownRemaining(selected *scData) { // attempted. // Only executed in the context of a serializer callback. func (b *pickfirstBalancer) requestConnection() { - if !b.addressList.isValid() || b.rawConnectivityState == connectivity.Shutdown { + if !b.addressList.isValid() || b.rawConnectivityState.ConnectivityState == connectivity.Shutdown { return } curAddr := b.addressList.currentAddress() @@ -435,11 +435,11 @@ func (b *pickfirstBalancer) requestConnection() { // The LB policy remains in TRANSIENT_FAILURE until a new resolver // update is received. b.addressList.reset() - b.rawConnectivityState = connectivity.TransientFailure - b.updateBalancerState(balancer.State{ + b.rawConnectivityState = balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, - Picker: &picker{err: fmt.Errorf("failed to create a new subConn: %v", err)}, - }) + ConnectionError: fmt.Errorf("failed to create a new subConn: %v", err), + } + b.updateBalancerState() return } b.subConns.Set(curAddr, sd) @@ -485,7 +485,9 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses) return } - b.rawConnectivityState = connectivity.Ready + b.rawConnectivityState = balancer.SubConnState{ + ConnectivityState: connectivity.Ready, + } hl := &healthListener{ scData: sd, pb: b, @@ -501,15 +503,14 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon // If the LB policy is READY, and it receives a subchannel state change, // it means that the READY subchannel has failed. - if b.rawConnectivityState == connectivity.Ready && state.ConnectivityState != connectivity.Ready { + if b.rawConnectivityState.ConnectivityState == connectivity.Ready && state.ConnectivityState != connectivity.Ready { // Once a transport fails, the balancer enters IDLE and starts from // the first address when the picker is used. b.addressList.reset() - b.rawConnectivityState = connectivity.Idle - b.updateBalancerState(balancer.State{ + b.rawConnectivityState = balancer.SubConnState{ ConnectivityState: connectivity.Idle, - Picker: &idlePicker{exitIdle: b.ExitIdle}, - }) + } + b.updateBalancerState() return } @@ -520,12 +521,11 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon // If it's in TRANSIENT_FAILURE, stay in TRANSIENT_FAILURE until // it's READY. See A62. // If the balancer is already in CONNECTING, no update is needed. - if b.rawConnectivityState == connectivity.Idle { - b.rawConnectivityState = connectivity.Connecting - b.updateBalancerState(balancer.State{ + if b.rawConnectivityState.ConnectivityState == connectivity.Idle { + b.rawConnectivityState = balancer.SubConnState{ ConnectivityState: connectivity.Connecting, - Picker: &picker{err: balancer.ErrNoSubConnAvailable}, - }) + } + b.updateBalancerState() } case connectivity.TransientFailure: sd.lastErr = state.ConnectionError @@ -551,11 +551,10 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon return } b.addressList.reset() - b.rawConnectivityState = connectivity.Idle - b.updateBalancerState(balancer.State{ + b.rawConnectivityState = balancer.SubConnState{ ConnectivityState: connectivity.Idle, - Picker: &idlePicker{exitIdle: b.ExitIdle}, - }) + } + b.updateBalancerState() } return } @@ -564,11 +563,11 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon switch state.ConnectivityState { case connectivity.TransientFailure: sd.lastErr = state.ConnectionError - b.rawConnectivityState = connectivity.TransientFailure - b.updateBalancerState(balancer.State{ + b.rawConnectivityState = balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, - Picker: &picker{err: state.ConnectionError}, - }) + ConnectionError: state.ConnectionError, + } + b.updateBalancerState() // We don't need to request re-resolution since the subconn already does // that before reporting TRANSIENT_FAILURE. // TODO: #7534 - Move re-resolution requests from subconn into pick_first. @@ -592,50 +591,75 @@ func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData) { return } switch state.ConnectivityState { - case connectivity.Ready: - b.updateBalancerState(balancer.State{ - ConnectivityState: connectivity.Ready, - Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}}, - }) - case connectivity.TransientFailure: - b.updateBalancerState(balancer.State{ - ConnectivityState: connectivity.TransientFailure, - Picker: &picker{err: fmt.Errorf("health check failure: %v", state.ConnectionError)}, - }) + case connectivity.Ready, connectivity.TransientFailure, connectivity.Connecting: + b.updateBalancerState() case connectivity.Idle: // Wait for the raw connectivity state to report IDLE. default: - // If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until - // we're READY. See A62. - if b.concludedState == connectivity.TransientFailure { - return - } - // The health check will report CONNECTING once the raw connectivity state - // changes to READY. We can avoid sending a new picker since the balancer - // would already be in CONNECTING. - if state.ConnectivityState == connectivity.Connecting && b.concludedState == connectivity.Connecting { - return + } +} + +func (b *pickfirstBalancer) updateBalancerState() { + // If the connectivity state is not READY, use it. + if b.rawConnectivityState.ConnectivityState != connectivity.Ready { + switch b.rawConnectivityState.ConnectivityState { + case connectivity.Idle: + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.Idle, + Picker: &idlePicker{exitIdle: b.ExitIdle}, + }) + case connectivity.Connecting: + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.Connecting, + Picker: &picker{err: balancer.ErrNoSubConnAvailable}, + }) + case connectivity.TransientFailure: + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: &picker{err: b.rawConnectivityState.ConnectionError}, + }) + default: } - b.updateBalancerState(balancer.State{ - ConnectivityState: state.ConnectivityState, + return + } + // Use the health check state. + curAddr := b.addressList.currentAddress() + val, found := b.subConns.Get(curAddr) + if !found { + b.logger.Errorf("SubConn not found for address %q even though connectivity is READY", curAddr) + return + } + scd := val.(*scData) + switch scd.healthState.ConnectivityState { + case connectivity.Idle: + // Let the raw connectivity state report IDLE. + case connectivity.Connecting: + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.Connecting, Picker: &picker{err: balancer.ErrNoSubConnAvailable}, }) + case connectivity.TransientFailure: + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: &picker{err: fmt.Errorf("heath check error: %v", scd.healthState.ConnectionError)}, + }) + case connectivity.Ready: + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &picker{result: balancer.PickResult{SubConn: scd.subConn}}, + }) + default: } } -func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) { - b.concludedState = newState.ConnectivityState - b.cc.UpdateState(newState) -} - // Only executed in the context of a serializer callback. func (b *pickfirstBalancer) endFirstPass(lastErr error) { b.firstPass = false - b.rawConnectivityState = connectivity.TransientFailure - b.updateBalancerState(balancer.State{ + b.rawConnectivityState = balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, - Picker: &picker{err: lastErr}, - }) + ConnectionError: lastErr, + } + b.updateBalancerState() // Start re-connecting all the subconns that are already in IDLE. for _, v := range b.subConns.Values() { sd := v.(*scData)