Skip to content

Commit

Permalink
simplify serializer usage
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Aug 9, 2024
1 parent 31e8a10 commit fcb0120
Showing 1 changed file with 29 additions and 32 deletions.
61 changes: 29 additions & 32 deletions balancer/pickfirst_leaf/pickfirst_leaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,17 @@ type pickfirstBalancer struct {
}

func (b *pickfirstBalancer) ResolverError(err error) {
completion := make(chan struct{})
ch := make(chan struct{})
b.serializer.ScheduleOr(func(ctx context.Context) {
b.resolverError(err, completion)
b.resolverError(err)
close(ch)
}, func() {
close(completion)
close(ch)
})
<-completion
<-ch
}

func (b *pickfirstBalancer) resolverError(err error, completion chan struct{}) {
defer close(completion)
func (b *pickfirstBalancer) resolverError(err error) {
if b.logger.V(2) {
b.logger.Infof("Received error from the name resolver: %v", err)
}
Expand Down Expand Up @@ -181,32 +181,30 @@ func (b *pickfirstBalancer) resolverError(err error, completion chan struct{}) {
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
errCh := make(chan error, 1)
b.serializer.ScheduleOr(func(_ context.Context) {
b.updateClientConnState(state, errCh)
err := b.updateClientConnState(state)
errCh <- err
}, func() {
close(errCh)
})
return <-errCh
}

func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState, errCh chan error) {
func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState) error {
if b.state == connectivity.Shutdown {
errCh <- fmt.Errorf("balancer is already closed")
return
return fmt.Errorf("balancer is already closed")
}
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
// Cleanup state pertaining to the previous resolver state.
// Treat an empty address list like an error by calling b.ResolverError.
b.state = connectivity.TransientFailure
b.resolverError(errors.New("produced zero addresses"), make(chan struct{}))
errCh <- balancer.ErrBadResolverState
return
b.resolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
// We don't have to guard this block with the env var because ParseConfig
// already does so.
cfg, ok := state.BalancerConfig.(pfConfig)
if state.BalancerConfig != nil && !ok {
errCh <- fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)
return
return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)
}

if b.logger.V(2) {
Expand Down Expand Up @@ -240,13 +238,11 @@ func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState
if err != nil {
// This error should never happen when the state is READY if the
// index is managed correctly.
errCh <- fmt.Errorf("address index is in an invalid state: %v", err)
return
return fmt.Errorf("address index is in an invalid state: %v", err)
}
b.addressIndex.updateEndpointList(newEndpoints)
if b.addressIndex.seekTo(prevAddr) {
errCh <- nil
return
return nil
}
b.addressIndex.reset()
} else {
Expand Down Expand Up @@ -297,7 +293,7 @@ func (b *pickfirstBalancer) updateClientConnState(state balancer.ClientConnState
} else if b.state == connectivity.TransientFailure {
b.requestConnection()
}
errCh <- nil
return nil
}

// UpdateSubConnState is unused as a StateListener is always registered when
Expand All @@ -307,17 +303,19 @@ func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state b
}

func (b *pickfirstBalancer) Close() {
completion := make(chan struct{})
ch := make(chan struct{})
b.serializer.ScheduleOr(func(ctx context.Context) {
b.close(completion)
b.close()
close(ch)
}, func() {
b.close(completion)
b.close()
close(ch)
})
<-completion
<-ch
<-b.serializer.Done()
}

func (b *pickfirstBalancer) close(completion chan struct{}) {
func (b *pickfirstBalancer) close() {
b.serializerCancel()
for _, sd := range b.subConns.Values() {
sd.(*scData).subConn.Shutdown()
Expand All @@ -326,24 +324,23 @@ func (b *pickfirstBalancer) close(completion chan struct{}) {
b.subConns.Delete(k)
}
b.state = connectivity.Shutdown
close(completion)
}

// ExitIdle moves the balancer out of idle state. It can be called concurrently
// by the idlePicker and clientConn so access to variables should be synchronized.
func (b *pickfirstBalancer) ExitIdle() {
completion := make(chan struct{})
ch := make(chan struct{})
b.serializer.ScheduleOr(func(ctx context.Context) {
b.exitIdle(completion)
b.exitIdle()
close(ch)
}, func() {
close(completion)
close(ch)
})
<-completion
<-ch
}

func (b *pickfirstBalancer) exitIdle(completion chan struct{}) {
func (b *pickfirstBalancer) exitIdle() {
b.requestConnection()
close(completion)
}

// deDupAddresses ensures that each address belongs to only one endpoint.
Expand Down

0 comments on commit fcb0120

Please sign in to comment.