Skip to content

Commit

Permalink
Update pf lofic for combining health and raw conn updates
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Sep 6, 2024
1 parent 769cbd5 commit 2ef3b40
Showing 1 changed file with 100 additions and 76 deletions.
176 changes: 100 additions & 76 deletions balancer/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ func (pickfirstBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions
subConns: resolver.NewAddressMap(),
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
concludedState: connectivity.Connecting,
rawConnectivityState: connectivity.Connecting,
rawConnectivityState: balancer.SubConnState{ConnectivityState: connectivity.Connecting},
healthCheckOpts: opts.HealthCheckOptions,
}
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
Expand Down Expand Up @@ -176,8 +175,7 @@ type pickfirstBalancer struct {
// subconn state updates.
serializer *grpcsync.CallbackSerializer
serializerCancel func()
concludedState connectivity.State
rawConnectivityState connectivity.State
rawConnectivityState balancer.SubConnState
subConns *resolver.AddressMap // scData for active subonns mapped by address.
addressList addressList
firstPass bool
Expand All @@ -195,12 +193,12 @@ func (b *pickfirstBalancer) resolverError(err error) {
if b.logger.V(2) {
b.logger.Infof("Received error from the name resolver: %v", err)
}
if b.rawConnectivityState == connectivity.Shutdown {
if b.rawConnectivityState.ConnectivityState == connectivity.Shutdown {
return
}
// The picker will not change since the balancer does not currently
// report an error.
if b.rawConnectivityState != connectivity.TransientFailure {
if b.rawConnectivityState.ConnectivityState != connectivity.TransientFailure {
if b.logger.V(2) {
b.logger.Infof("Ignoring resolver error because balancer is using a previous good update.")
}
Expand All @@ -209,11 +207,11 @@ func (b *pickfirstBalancer) resolverError(err error) {

b.closeSubConns()
b.addressList.updateEndpointList(nil)
b.rawConnectivityState = connectivity.TransientFailure
b.updateBalancerState(balancer.State{
b.rawConnectivityState = balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
ConnectionError: fmt.Errorf("name resolver error: %v", err),
}
b.updateBalancerState()
}

func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
Expand All @@ -230,13 +228,16 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
// updateClientConnState handles clientConn state changes.
// Only executed in the context of a serializer callback.
func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState) error {
if b.rawConnectivityState == connectivity.Shutdown {
if b.rawConnectivityState.ConnectivityState == connectivity.Shutdown {
return errBalancerClosed
}
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
// Cleanup state pertaining to the previous resolver state.
// Treat an empty address list like an error by calling b.ResolverError.
b.rawConnectivityState = connectivity.TransientFailure
b.rawConnectivityState = balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
// The connection error will be set by resolver error next.
}
b.resolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
Expand Down Expand Up @@ -283,7 +284,7 @@ func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState
prevAddr := b.addressList.currentAddress()
prevAddrsCount := b.addressList.size()
b.addressList.updateEndpointList(newEndpoints)
if b.rawConnectivityState == connectivity.Ready && b.addressList.seekTo(prevAddr) {
if b.rawConnectivityState.ConnectivityState == connectivity.Ready && b.addressList.seekTo(prevAddr) {
return nil
}

Expand All @@ -295,15 +296,14 @@ func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState
// we should still enter CONNECTING because the sticky TF behaviour mentioned
// in A62 applies only when the TRANSIENT_FAILURE is reported due to connectivity
// failures.
if b.rawConnectivityState == connectivity.Ready || b.rawConnectivityState == connectivity.Connecting || prevAddrsCount == 0 {
if b.rawConnectivityState.ConnectivityState == connectivity.Ready || b.rawConnectivityState.ConnectivityState == connectivity.Connecting || prevAddrsCount == 0 {
// Start connection attempt at first address.
b.rawConnectivityState = connectivity.Connecting
b.updateBalancerState(balancer.State{
b.rawConnectivityState = balancer.SubConnState{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
}
b.updateBalancerState()
b.requestConnection()
} else if b.rawConnectivityState == connectivity.TransientFailure {
} else if b.rawConnectivityState.ConnectivityState == connectivity.TransientFailure {
// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
// we're READY. See A62.
b.requestConnection()
Expand All @@ -320,7 +320,7 @@ func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state b
func (b *pickfirstBalancer) Close() {
b.serializer.TrySchedule(func(_ context.Context) {
b.closeSubConns()
b.rawConnectivityState = connectivity.Shutdown
b.rawConnectivityState = balancer.SubConnState{ConnectivityState: connectivity.Shutdown}
})
b.serializerCancel()
<-b.serializer.Done()
Expand All @@ -330,7 +330,7 @@ func (b *pickfirstBalancer) Close() {
// by the idlePicker and clientConn so access to variables should be synchronized.
func (b *pickfirstBalancer) ExitIdle() {
b.serializer.TrySchedule(func(_ context.Context) {
if b.rawConnectivityState == connectivity.Idle {
if b.rawConnectivityState.ConnectivityState == connectivity.Idle {
b.requestConnection()
}
})
Expand Down Expand Up @@ -418,7 +418,7 @@ func (b *pickfirstBalancer) shutdownRemaining(selected *scData) {
// attempted.
// Only executed in the context of a serializer callback.
func (b *pickfirstBalancer) requestConnection() {
if !b.addressList.isValid() || b.rawConnectivityState == connectivity.Shutdown {
if !b.addressList.isValid() || b.rawConnectivityState.ConnectivityState == connectivity.Shutdown {
return
}
curAddr := b.addressList.currentAddress()
Expand All @@ -435,11 +435,11 @@ func (b *pickfirstBalancer) requestConnection() {
// The LB policy remains in TRANSIENT_FAILURE until a new resolver
// update is received.
b.addressList.reset()
b.rawConnectivityState = connectivity.TransientFailure
b.updateBalancerState(balancer.State{
b.rawConnectivityState = balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("failed to create a new subConn: %v", err)},
})
ConnectionError: fmt.Errorf("failed to create a new subConn: %v", err),
}
b.updateBalancerState()
return
}
b.subConns.Set(curAddr, sd)
Expand Down Expand Up @@ -485,7 +485,9 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
return
}
b.rawConnectivityState = connectivity.Ready
b.rawConnectivityState = balancer.SubConnState{
ConnectivityState: connectivity.Ready,
}
hl := &healthListener{
scData: sd,
pb: b,
Expand All @@ -501,15 +503,14 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon

// If the LB policy is READY, and it receives a subchannel state change,
// it means that the READY subchannel has failed.
if b.rawConnectivityState == connectivity.Ready && state.ConnectivityState != connectivity.Ready {
if b.rawConnectivityState.ConnectivityState == connectivity.Ready && state.ConnectivityState != connectivity.Ready {
// Once a transport fails, the balancer enters IDLE and starts from
// the first address when the picker is used.
b.addressList.reset()
b.rawConnectivityState = connectivity.Idle
b.updateBalancerState(balancer.State{
b.rawConnectivityState = balancer.SubConnState{
ConnectivityState: connectivity.Idle,
Picker: &idlePicker{exitIdle: b.ExitIdle},
})
}
b.updateBalancerState()
return
}

Expand All @@ -520,12 +521,11 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon
// If it's in TRANSIENT_FAILURE, stay in TRANSIENT_FAILURE until
// it's READY. See A62.
// If the balancer is already in CONNECTING, no update is needed.
if b.rawConnectivityState == connectivity.Idle {
b.rawConnectivityState = connectivity.Connecting
b.updateBalancerState(balancer.State{
if b.rawConnectivityState.ConnectivityState == connectivity.Idle {
b.rawConnectivityState = balancer.SubConnState{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
}
b.updateBalancerState()
}
case connectivity.TransientFailure:
sd.lastErr = state.ConnectionError
Expand All @@ -551,11 +551,10 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon
return
}
b.addressList.reset()
b.rawConnectivityState = connectivity.Idle
b.updateBalancerState(balancer.State{
b.rawConnectivityState = balancer.SubConnState{
ConnectivityState: connectivity.Idle,
Picker: &idlePicker{exitIdle: b.ExitIdle},
})
}
b.updateBalancerState()
}
return
}
Expand All @@ -564,11 +563,11 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon
switch state.ConnectivityState {
case connectivity.TransientFailure:
sd.lastErr = state.ConnectionError
b.rawConnectivityState = connectivity.TransientFailure
b.updateBalancerState(balancer.State{
b.rawConnectivityState = balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: state.ConnectionError},
})
ConnectionError: state.ConnectionError,
}
b.updateBalancerState()
// We don't need to request re-resolution since the subconn already does
// that before reporting TRANSIENT_FAILURE.
// TODO: #7534 - Move re-resolution requests from subconn into pick_first.
Expand All @@ -592,50 +591,75 @@ func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData) {
return
}
switch state.ConnectivityState {
case connectivity.Ready:
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
})
case connectivity.TransientFailure:
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("health check failure: %v", state.ConnectionError)},
})
case connectivity.Ready, connectivity.TransientFailure, connectivity.Connecting:
b.updateBalancerState()
case connectivity.Idle:
// Wait for the raw connectivity state to report IDLE.
default:
// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
// we're READY. See A62.
if b.concludedState == connectivity.TransientFailure {
return
}
// The health check will report CONNECTING once the raw connectivity state
// changes to READY. We can avoid sending a new picker since the balancer
// would already be in CONNECTING.
if state.ConnectivityState == connectivity.Connecting && b.concludedState == connectivity.Connecting {
return
}
}

func (b *pickfirstBalancer) updateBalancerState() {
// If the connectivity state is not READY, use it.
if b.rawConnectivityState.ConnectivityState != connectivity.Ready {
switch b.rawConnectivityState.ConnectivityState {
case connectivity.Idle:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Idle,
Picker: &idlePicker{exitIdle: b.ExitIdle},
})
case connectivity.Connecting:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
case connectivity.TransientFailure:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: b.rawConnectivityState.ConnectionError},
})
default:
}
b.updateBalancerState(balancer.State{
ConnectivityState: state.ConnectivityState,
return
}
// Use the health check state.
curAddr := b.addressList.currentAddress()
val, found := b.subConns.Get(curAddr)
if !found {
b.logger.Errorf("SubConn not found for address %q even though connectivity is READY", curAddr)
return
}
scd := val.(*scData)
switch scd.healthState.ConnectivityState {
case connectivity.Idle:
// Let the raw connectivity state report IDLE.
case connectivity.Connecting:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
case connectivity.TransientFailure:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("heath check error: %v", scd.healthState.ConnectionError)},
})
case connectivity.Ready:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: scd.subConn}},
})
default:
}
}

func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) {
b.concludedState = newState.ConnectivityState
b.cc.UpdateState(newState)
}

// Only executed in the context of a serializer callback.
func (b *pickfirstBalancer) endFirstPass(lastErr error) {
b.firstPass = false
b.rawConnectivityState = connectivity.TransientFailure
b.updateBalancerState(balancer.State{
b.rawConnectivityState = balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: lastErr},
})
ConnectionError: lastErr,
}
b.updateBalancerState()
// Start re-connecting all the subconns that are already in IDLE.
for _, v := range b.subConns.Values() {
sd := v.(*scData)
Expand Down

0 comments on commit 2ef3b40

Please sign in to comment.