Skip to content

Commit

Permalink
Revert "Update pf lofic for combining health and raw conn updates"
Browse files Browse the repository at this point in the history
This reverts commit 2ef3b40.
  • Loading branch information
arjan-bal committed Sep 6, 2024
1 parent 2ef3b40 commit d0322c2
Showing 1 changed file with 76 additions and 100 deletions.
176 changes: 76 additions & 100 deletions balancer/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func (pickfirstBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions
subConns: resolver.NewAddressMap(),
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
rawConnectivityState: balancer.SubConnState{ConnectivityState: connectivity.Connecting},
concludedState: connectivity.Connecting,
rawConnectivityState: connectivity.Connecting,
healthCheckOpts: opts.HealthCheckOptions,
}
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
Expand Down Expand Up @@ -175,7 +176,8 @@ type pickfirstBalancer struct {
// subconn state updates.
serializer *grpcsync.CallbackSerializer
serializerCancel func()
rawConnectivityState balancer.SubConnState
concludedState connectivity.State
rawConnectivityState connectivity.State
subConns *resolver.AddressMap // scData for active subonns mapped by address.
addressList addressList
firstPass bool
Expand All @@ -193,12 +195,12 @@ func (b *pickfirstBalancer) resolverError(err error) {
if b.logger.V(2) {
b.logger.Infof("Received error from the name resolver: %v", err)
}
if b.rawConnectivityState.ConnectivityState == connectivity.Shutdown {
if b.rawConnectivityState == connectivity.Shutdown {
return
}
// The picker will not change since the balancer does not currently
// report an error.
if b.rawConnectivityState.ConnectivityState != connectivity.TransientFailure {
if b.rawConnectivityState != connectivity.TransientFailure {
if b.logger.V(2) {
b.logger.Infof("Ignoring resolver error because balancer is using a previous good update.")
}
Expand All @@ -207,11 +209,11 @@ func (b *pickfirstBalancer) resolverError(err error) {

b.closeSubConns()
b.addressList.updateEndpointList(nil)
b.rawConnectivityState = balancer.SubConnState{
b.rawConnectivityState = connectivity.TransientFailure
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: fmt.Errorf("name resolver error: %v", err),
}
b.updateBalancerState()
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
}

func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
Expand All @@ -228,16 +230,13 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
// updateClientConnState handles clientConn state changes.
// Only executed in the context of a serializer callback.
func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState) error {
if b.rawConnectivityState.ConnectivityState == connectivity.Shutdown {
if b.rawConnectivityState == connectivity.Shutdown {
return errBalancerClosed
}
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
// Cleanup state pertaining to the previous resolver state.
// Treat an empty address list like an error by calling b.ResolverError.
b.rawConnectivityState = balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
// The connection error will be set by resolver error next.
}
b.rawConnectivityState = connectivity.TransientFailure
b.resolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
Expand Down Expand Up @@ -284,7 +283,7 @@ func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState
prevAddr := b.addressList.currentAddress()
prevAddrsCount := b.addressList.size()
b.addressList.updateEndpointList(newEndpoints)
if b.rawConnectivityState.ConnectivityState == connectivity.Ready && b.addressList.seekTo(prevAddr) {
if b.rawConnectivityState == connectivity.Ready && b.addressList.seekTo(prevAddr) {
return nil
}

Expand All @@ -296,14 +295,15 @@ func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState
// we should still enter CONNECTING because the sticky TF behaviour mentioned
// in A62 applies only when the TRANSIENT_FAILURE is reported due to connectivity
// failures.
if b.rawConnectivityState.ConnectivityState == connectivity.Ready || b.rawConnectivityState.ConnectivityState == connectivity.Connecting || prevAddrsCount == 0 {
if b.rawConnectivityState == connectivity.Ready || b.rawConnectivityState == connectivity.Connecting || prevAddrsCount == 0 {
// Start connection attempt at first address.
b.rawConnectivityState = balancer.SubConnState{
b.rawConnectivityState = connectivity.Connecting
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Connecting,
}
b.updateBalancerState()
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
b.requestConnection()
} else if b.rawConnectivityState.ConnectivityState == connectivity.TransientFailure {
} else if b.rawConnectivityState == connectivity.TransientFailure {
// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
// we're READY. See A62.
b.requestConnection()
Expand All @@ -320,7 +320,7 @@ func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state b
func (b *pickfirstBalancer) Close() {
b.serializer.TrySchedule(func(_ context.Context) {
b.closeSubConns()
b.rawConnectivityState = balancer.SubConnState{ConnectivityState: connectivity.Shutdown}
b.rawConnectivityState = connectivity.Shutdown
})
b.serializerCancel()
<-b.serializer.Done()
Expand All @@ -330,7 +330,7 @@ func (b *pickfirstBalancer) Close() {
// by the idlePicker and clientConn so access to variables should be synchronized.
func (b *pickfirstBalancer) ExitIdle() {
b.serializer.TrySchedule(func(_ context.Context) {
if b.rawConnectivityState.ConnectivityState == connectivity.Idle {
if b.rawConnectivityState == connectivity.Idle {
b.requestConnection()
}
})
Expand Down Expand Up @@ -418,7 +418,7 @@ func (b *pickfirstBalancer) shutdownRemaining(selected *scData) {
// attempted.
// Only executed in the context of a serializer callback.
func (b *pickfirstBalancer) requestConnection() {
if !b.addressList.isValid() || b.rawConnectivityState.ConnectivityState == connectivity.Shutdown {
if !b.addressList.isValid() || b.rawConnectivityState == connectivity.Shutdown {
return
}
curAddr := b.addressList.currentAddress()
Expand All @@ -435,11 +435,11 @@ func (b *pickfirstBalancer) requestConnection() {
// The LB policy remains in TRANSIENT_FAILURE until a new resolver
// update is received.
b.addressList.reset()
b.rawConnectivityState = balancer.SubConnState{
b.rawConnectivityState = connectivity.TransientFailure
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: fmt.Errorf("failed to create a new subConn: %v", err),
}
b.updateBalancerState()
Picker: &picker{err: fmt.Errorf("failed to create a new subConn: %v", err)},
})
return
}
b.subConns.Set(curAddr, sd)
Expand Down Expand Up @@ -485,9 +485,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
return
}
b.rawConnectivityState = balancer.SubConnState{
ConnectivityState: connectivity.Ready,
}
b.rawConnectivityState = connectivity.Ready
hl := &healthListener{
scData: sd,
pb: b,
Expand All @@ -503,14 +501,15 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon

// If the LB policy is READY, and it receives a subchannel state change,
// it means that the READY subchannel has failed.
if b.rawConnectivityState.ConnectivityState == connectivity.Ready && state.ConnectivityState != connectivity.Ready {
if b.rawConnectivityState == connectivity.Ready && state.ConnectivityState != connectivity.Ready {
// Once a transport fails, the balancer enters IDLE and starts from
// the first address when the picker is used.
b.addressList.reset()
b.rawConnectivityState = balancer.SubConnState{
b.rawConnectivityState = connectivity.Idle
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Idle,
}
b.updateBalancerState()
Picker: &idlePicker{exitIdle: b.ExitIdle},
})
return
}

Expand All @@ -521,11 +520,12 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon
// If it's in TRANSIENT_FAILURE, stay in TRANSIENT_FAILURE until
// it's READY. See A62.
// If the balancer is already in CONNECTING, no update is needed.
if b.rawConnectivityState.ConnectivityState == connectivity.Idle {
b.rawConnectivityState = balancer.SubConnState{
if b.rawConnectivityState == connectivity.Idle {
b.rawConnectivityState = connectivity.Connecting
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Connecting,
}
b.updateBalancerState()
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
}
case connectivity.TransientFailure:
sd.lastErr = state.ConnectionError
Expand All @@ -551,10 +551,11 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon
return
}
b.addressList.reset()
b.rawConnectivityState = balancer.SubConnState{
b.rawConnectivityState = connectivity.Idle
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Idle,
}
b.updateBalancerState()
Picker: &idlePicker{exitIdle: b.ExitIdle},
})
}
return
}
Expand All @@ -563,11 +564,11 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon
switch state.ConnectivityState {
case connectivity.TransientFailure:
sd.lastErr = state.ConnectionError
b.rawConnectivityState = balancer.SubConnState{
b.rawConnectivityState = connectivity.TransientFailure
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: state.ConnectionError,
}
b.updateBalancerState()
Picker: &picker{err: state.ConnectionError},
})
// We don't need to request re-resolution since the subconn already does
// that before reporting TRANSIENT_FAILURE.
// TODO: #7534 - Move re-resolution requests from subconn into pick_first.
Expand All @@ -591,75 +592,50 @@ func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData) {
return
}
switch state.ConnectivityState {
case connectivity.Ready, connectivity.TransientFailure, connectivity.Connecting:
b.updateBalancerState()
case connectivity.Ready:
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
})
case connectivity.TransientFailure:
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("health check failure: %v", state.ConnectionError)},
})
case connectivity.Idle:
// Wait for the raw connectivity state to report IDLE.
default:
}
}

func (b *pickfirstBalancer) updateBalancerState() {
// If the connectivity state is not READY, use it.
if b.rawConnectivityState.ConnectivityState != connectivity.Ready {
switch b.rawConnectivityState.ConnectivityState {
case connectivity.Idle:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Idle,
Picker: &idlePicker{exitIdle: b.ExitIdle},
})
case connectivity.Connecting:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
case connectivity.TransientFailure:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: b.rawConnectivityState.ConnectionError},
})
default:
// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
// we're READY. See A62.
if b.concludedState == connectivity.TransientFailure {
return
}
return
}
// Use the health check state.
curAddr := b.addressList.currentAddress()
val, found := b.subConns.Get(curAddr)
if !found {
b.logger.Errorf("SubConn not found for address %q even though connectivity is READY", curAddr)
return
}
scd := val.(*scData)
switch scd.healthState.ConnectivityState {
case connectivity.Idle:
// Let the raw connectivity state report IDLE.
case connectivity.Connecting:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Connecting,
// The health check will report CONNECTING once the raw connectivity state
// changes to READY. We can avoid sending a new picker since the balancer
// would already be in CONNECTING.
if state.ConnectivityState == connectivity.Connecting && b.concludedState == connectivity.Connecting {
return
}
b.updateBalancerState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
case connectivity.TransientFailure:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("heath check error: %v", scd.healthState.ConnectionError)},
})
case connectivity.Ready:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: scd.subConn}},
})
default:
}
}

func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) {
b.concludedState = newState.ConnectivityState
b.cc.UpdateState(newState)
}

// Only executed in the context of a serializer callback.
func (b *pickfirstBalancer) endFirstPass(lastErr error) {
b.firstPass = false
b.rawConnectivityState = balancer.SubConnState{
b.rawConnectivityState = connectivity.TransientFailure
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: lastErr,
}
b.updateBalancerState()
Picker: &picker{err: lastErr},
})
// Start re-connecting all the subconns that are already in IDLE.
for _, v := range b.subConns.Values() {
sd := v.(*scData)
Expand Down

0 comments on commit d0322c2

Please sign in to comment.