From d0322c20e2ca9ff23bf6e29fe287eaea997a9115 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Fri, 6 Sep 2024 11:43:37 +0530 Subject: [PATCH] Revert "Update pf lofic for combining health and raw conn updates" This reverts commit 2ef3b4036137848e0f462df3901c4420e6df4283. --- balancer/pickfirstleaf/pickfirstleaf.go | 176 ++++++++++-------------- 1 file changed, 76 insertions(+), 100 deletions(-) diff --git a/balancer/pickfirstleaf/pickfirstleaf.go b/balancer/pickfirstleaf/pickfirstleaf.go index 63c623a743a8..3a96741d150c 100644 --- a/balancer/pickfirstleaf/pickfirstleaf.go +++ b/balancer/pickfirstleaf/pickfirstleaf.go @@ -76,7 +76,8 @@ func (pickfirstBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions subConns: resolver.NewAddressMap(), serializer: grpcsync.NewCallbackSerializer(ctx), serializerCancel: cancel, - rawConnectivityState: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, + concludedState: connectivity.Connecting, + rawConnectivityState: connectivity.Connecting, healthCheckOpts: opts.HealthCheckOptions, } b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b)) @@ -175,7 +176,8 @@ type pickfirstBalancer struct { // subconn state updates. serializer *grpcsync.CallbackSerializer serializerCancel func() - rawConnectivityState balancer.SubConnState + concludedState connectivity.State + rawConnectivityState connectivity.State subConns *resolver.AddressMap // scData for active subonns mapped by address. addressList addressList firstPass bool @@ -193,12 +195,12 @@ func (b *pickfirstBalancer) resolverError(err error) { if b.logger.V(2) { b.logger.Infof("Received error from the name resolver: %v", err) } - if b.rawConnectivityState.ConnectivityState == connectivity.Shutdown { + if b.rawConnectivityState == connectivity.Shutdown { return } // The picker will not change since the balancer does not currently // report an error. - if b.rawConnectivityState.ConnectivityState != connectivity.TransientFailure { + if b.rawConnectivityState != connectivity.TransientFailure { if b.logger.V(2) { b.logger.Infof("Ignoring resolver error because balancer is using a previous good update.") } @@ -207,11 +209,11 @@ func (b *pickfirstBalancer) resolverError(err error) { b.closeSubConns() b.addressList.updateEndpointList(nil) - b.rawConnectivityState = balancer.SubConnState{ + b.rawConnectivityState = connectivity.TransientFailure + b.updateBalancerState(balancer.State{ ConnectivityState: connectivity.TransientFailure, - ConnectionError: fmt.Errorf("name resolver error: %v", err), - } - b.updateBalancerState() + Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)}, + }) } func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { @@ -228,16 +230,13 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState // updateClientConnState handles clientConn state changes. // Only executed in the context of a serializer callback. func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState) error { - if b.rawConnectivityState.ConnectivityState == connectivity.Shutdown { + if b.rawConnectivityState == connectivity.Shutdown { return errBalancerClosed } if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 { // Cleanup state pertaining to the previous resolver state. // Treat an empty address list like an error by calling b.ResolverError. - b.rawConnectivityState = balancer.SubConnState{ - ConnectivityState: connectivity.TransientFailure, - // The connection error will be set by resolver error next. - } + b.rawConnectivityState = connectivity.TransientFailure b.resolverError(errors.New("produced zero addresses")) return balancer.ErrBadResolverState } @@ -284,7 +283,7 @@ func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState prevAddr := b.addressList.currentAddress() prevAddrsCount := b.addressList.size() b.addressList.updateEndpointList(newEndpoints) - if b.rawConnectivityState.ConnectivityState == connectivity.Ready && b.addressList.seekTo(prevAddr) { + if b.rawConnectivityState == connectivity.Ready && b.addressList.seekTo(prevAddr) { return nil } @@ -296,14 +295,15 @@ func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState // we should still enter CONNECTING because the sticky TF behaviour mentioned // in A62 applies only when the TRANSIENT_FAILURE is reported due to connectivity // failures. - if b.rawConnectivityState.ConnectivityState == connectivity.Ready || b.rawConnectivityState.ConnectivityState == connectivity.Connecting || prevAddrsCount == 0 { + if b.rawConnectivityState == connectivity.Ready || b.rawConnectivityState == connectivity.Connecting || prevAddrsCount == 0 { // Start connection attempt at first address. - b.rawConnectivityState = balancer.SubConnState{ + b.rawConnectivityState = connectivity.Connecting + b.updateBalancerState(balancer.State{ ConnectivityState: connectivity.Connecting, - } - b.updateBalancerState() + Picker: &picker{err: balancer.ErrNoSubConnAvailable}, + }) b.requestConnection() - } else if b.rawConnectivityState.ConnectivityState == connectivity.TransientFailure { + } else if b.rawConnectivityState == connectivity.TransientFailure { // If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until // we're READY. See A62. b.requestConnection() @@ -320,7 +320,7 @@ func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state b func (b *pickfirstBalancer) Close() { b.serializer.TrySchedule(func(_ context.Context) { b.closeSubConns() - b.rawConnectivityState = balancer.SubConnState{ConnectivityState: connectivity.Shutdown} + b.rawConnectivityState = connectivity.Shutdown }) b.serializerCancel() <-b.serializer.Done() @@ -330,7 +330,7 @@ func (b *pickfirstBalancer) Close() { // by the idlePicker and clientConn so access to variables should be synchronized. func (b *pickfirstBalancer) ExitIdle() { b.serializer.TrySchedule(func(_ context.Context) { - if b.rawConnectivityState.ConnectivityState == connectivity.Idle { + if b.rawConnectivityState == connectivity.Idle { b.requestConnection() } }) @@ -418,7 +418,7 @@ func (b *pickfirstBalancer) shutdownRemaining(selected *scData) { // attempted. // Only executed in the context of a serializer callback. func (b *pickfirstBalancer) requestConnection() { - if !b.addressList.isValid() || b.rawConnectivityState.ConnectivityState == connectivity.Shutdown { + if !b.addressList.isValid() || b.rawConnectivityState == connectivity.Shutdown { return } curAddr := b.addressList.currentAddress() @@ -435,11 +435,11 @@ func (b *pickfirstBalancer) requestConnection() { // The LB policy remains in TRANSIENT_FAILURE until a new resolver // update is received. b.addressList.reset() - b.rawConnectivityState = balancer.SubConnState{ + b.rawConnectivityState = connectivity.TransientFailure + b.updateBalancerState(balancer.State{ ConnectivityState: connectivity.TransientFailure, - ConnectionError: fmt.Errorf("failed to create a new subConn: %v", err), - } - b.updateBalancerState() + Picker: &picker{err: fmt.Errorf("failed to create a new subConn: %v", err)}, + }) return } b.subConns.Set(curAddr, sd) @@ -485,9 +485,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses) return } - b.rawConnectivityState = balancer.SubConnState{ - ConnectivityState: connectivity.Ready, - } + b.rawConnectivityState = connectivity.Ready hl := &healthListener{ scData: sd, pb: b, @@ -503,14 +501,15 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon // If the LB policy is READY, and it receives a subchannel state change, // it means that the READY subchannel has failed. - if b.rawConnectivityState.ConnectivityState == connectivity.Ready && state.ConnectivityState != connectivity.Ready { + if b.rawConnectivityState == connectivity.Ready && state.ConnectivityState != connectivity.Ready { // Once a transport fails, the balancer enters IDLE and starts from // the first address when the picker is used. b.addressList.reset() - b.rawConnectivityState = balancer.SubConnState{ + b.rawConnectivityState = connectivity.Idle + b.updateBalancerState(balancer.State{ ConnectivityState: connectivity.Idle, - } - b.updateBalancerState() + Picker: &idlePicker{exitIdle: b.ExitIdle}, + }) return } @@ -521,11 +520,12 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon // If it's in TRANSIENT_FAILURE, stay in TRANSIENT_FAILURE until // it's READY. See A62. // If the balancer is already in CONNECTING, no update is needed. - if b.rawConnectivityState.ConnectivityState == connectivity.Idle { - b.rawConnectivityState = balancer.SubConnState{ + if b.rawConnectivityState == connectivity.Idle { + b.rawConnectivityState = connectivity.Connecting + b.updateBalancerState(balancer.State{ ConnectivityState: connectivity.Connecting, - } - b.updateBalancerState() + Picker: &picker{err: balancer.ErrNoSubConnAvailable}, + }) } case connectivity.TransientFailure: sd.lastErr = state.ConnectionError @@ -551,10 +551,11 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon return } b.addressList.reset() - b.rawConnectivityState = balancer.SubConnState{ + b.rawConnectivityState = connectivity.Idle + b.updateBalancerState(balancer.State{ ConnectivityState: connectivity.Idle, - } - b.updateBalancerState() + Picker: &idlePicker{exitIdle: b.ExitIdle}, + }) } return } @@ -563,11 +564,11 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon switch state.ConnectivityState { case connectivity.TransientFailure: sd.lastErr = state.ConnectionError - b.rawConnectivityState = balancer.SubConnState{ + b.rawConnectivityState = connectivity.TransientFailure + b.updateBalancerState(balancer.State{ ConnectivityState: connectivity.TransientFailure, - ConnectionError: state.ConnectionError, - } - b.updateBalancerState() + Picker: &picker{err: state.ConnectionError}, + }) // We don't need to request re-resolution since the subconn already does // that before reporting TRANSIENT_FAILURE. // TODO: #7534 - Move re-resolution requests from subconn into pick_first. @@ -591,75 +592,50 @@ func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData) { return } switch state.ConnectivityState { - case connectivity.Ready, connectivity.TransientFailure, connectivity.Connecting: - b.updateBalancerState() + case connectivity.Ready: + b.updateBalancerState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}}, + }) + case connectivity.TransientFailure: + b.updateBalancerState(balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: &picker{err: fmt.Errorf("health check failure: %v", state.ConnectionError)}, + }) case connectivity.Idle: // Wait for the raw connectivity state to report IDLE. default: - } -} - -func (b *pickfirstBalancer) updateBalancerState() { - // If the connectivity state is not READY, use it. - if b.rawConnectivityState.ConnectivityState != connectivity.Ready { - switch b.rawConnectivityState.ConnectivityState { - case connectivity.Idle: - b.cc.UpdateState(balancer.State{ - ConnectivityState: connectivity.Idle, - Picker: &idlePicker{exitIdle: b.ExitIdle}, - }) - case connectivity.Connecting: - b.cc.UpdateState(balancer.State{ - ConnectivityState: connectivity.Connecting, - Picker: &picker{err: balancer.ErrNoSubConnAvailable}, - }) - case connectivity.TransientFailure: - b.cc.UpdateState(balancer.State{ - ConnectivityState: connectivity.TransientFailure, - Picker: &picker{err: b.rawConnectivityState.ConnectionError}, - }) - default: + // If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until + // we're READY. See A62. + if b.concludedState == connectivity.TransientFailure { + return } - return - } - // Use the health check state. - curAddr := b.addressList.currentAddress() - val, found := b.subConns.Get(curAddr) - if !found { - b.logger.Errorf("SubConn not found for address %q even though connectivity is READY", curAddr) - return - } - scd := val.(*scData) - switch scd.healthState.ConnectivityState { - case connectivity.Idle: - // Let the raw connectivity state report IDLE. - case connectivity.Connecting: - b.cc.UpdateState(balancer.State{ - ConnectivityState: connectivity.Connecting, + // The health check will report CONNECTING once the raw connectivity state + // changes to READY. We can avoid sending a new picker since the balancer + // would already be in CONNECTING. + if state.ConnectivityState == connectivity.Connecting && b.concludedState == connectivity.Connecting { + return + } + b.updateBalancerState(balancer.State{ + ConnectivityState: state.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}, }) - case connectivity.TransientFailure: - b.cc.UpdateState(balancer.State{ - ConnectivityState: connectivity.TransientFailure, - Picker: &picker{err: fmt.Errorf("heath check error: %v", scd.healthState.ConnectionError)}, - }) - case connectivity.Ready: - b.cc.UpdateState(balancer.State{ - ConnectivityState: connectivity.Ready, - Picker: &picker{result: balancer.PickResult{SubConn: scd.subConn}}, - }) - default: } } +func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) { + b.concludedState = newState.ConnectivityState + b.cc.UpdateState(newState) +} + // Only executed in the context of a serializer callback. func (b *pickfirstBalancer) endFirstPass(lastErr error) { b.firstPass = false - b.rawConnectivityState = balancer.SubConnState{ + b.rawConnectivityState = connectivity.TransientFailure + b.updateBalancerState(balancer.State{ ConnectivityState: connectivity.TransientFailure, - ConnectionError: lastErr, - } - b.updateBalancerState() + Picker: &picker{err: lastErr}, + }) // Start re-connecting all the subconns that are already in IDLE. for _, v := range b.subConns.Values() { sd := v.(*scData)