Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Sep 5, 2024
1 parent 08e8cb9 commit 2b2c2a3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 36 deletions.
26 changes: 15 additions & 11 deletions balancer/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,12 @@ type pickfirstBalancer struct {
}

func (b *pickfirstBalancer) ResolverError(err error) {
b.serializer.TrySchedule(func(_ context.Context) {
doneCh := make(chan error, 1)
b.serializer.ScheduleOr(func(_ context.Context) {
b.resolverError(err)
close(doneCh)
}, func() {
close(doneCh)
})
}

Expand Down Expand Up @@ -427,19 +431,19 @@ func (b *pickfirstBalancer) requestConnection() {
}

// Only executed in the context of a serializer callback.
func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubConnState) {
func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
// Previously relevant subconns can still callback with state updates.
// To prevent pickers from returning these obsolete subconns, this logic
// is included to check if the current list of active subconns includes this
// subconn.
if activeSD, found := b.subConns.Get(sd.addr); !found || activeSD != sd {
return
}
if state.ConnectivityState == connectivity.Shutdown {
if newState.ConnectivityState == connectivity.Shutdown {
return
}

if state.ConnectivityState == connectivity.Ready {
if newState.ConnectivityState == connectivity.Ready {
b.shutdownRemaining(sd)
if !b.addressList.seekTo(sd.addr) {
// This should not fail as we should have only one subconn after
Expand All @@ -457,7 +461,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon

// If the LB policy is READY, and it receives a subchannel state change,
// it means that the READY subchannel has failed.
if b.state == connectivity.Ready && state.ConnectivityState != connectivity.Ready {
if b.state == connectivity.Ready && newState.ConnectivityState != connectivity.Ready {
// Once a transport fails, the balancer enters IDLE and starts from
// the first address when the picker is used.
b.state = connectivity.Idle
Expand All @@ -470,7 +474,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon
}

if b.firstPass {
switch state.ConnectivityState {
switch newState.ConnectivityState {
case connectivity.Connecting:
// The balancer can be in either IDLE, CONNECTING or TRANSIENT_FAILURE.
// If it's in TRANSIENT_FAILURE, stay in TRANSIENT_FAILURE until
Expand All @@ -484,7 +488,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon
})
}
case connectivity.TransientFailure:
sd.lastErr = state.ConnectionError
sd.lastErr = newState.ConnectionError
// Since we're re-using common subconns while handling resolver updates,
// we could receive an out of turn TRANSIENT_FAILURE from a pass
// over the previous address list. We ignore such updates.
Expand All @@ -497,7 +501,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon
return
}
// End of the first pass.
b.endFirstPass(state.ConnectionError)
b.endFirstPass(newState.ConnectionError)
case connectivity.Idle:
// A subconn can transition from CONNECTING directly to IDLE when
// a transport is successfully created, but the connection fails before
Expand All @@ -517,12 +521,12 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, state balancer.SubCon
}

// We have finished the first pass, keep re-connecting failing subconns.
switch state.ConnectivityState {
switch newState.ConnectivityState {
case connectivity.TransientFailure:
sd.lastErr = state.ConnectionError
sd.lastErr = newState.ConnectionError
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: state.ConnectionError},
Picker: &picker{err: newState.ConnectionError},
})
// We don't need to request re-resolution since the subconn already does
// that before reporting TRANSIENT_FAILURE.
Expand Down
43 changes: 18 additions & 25 deletions balancer/pickfirstleaf/test/pickfirstleaf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,18 @@ func Test(t *testing.T) {

// setupPickFirstLeaf performs steps required for pick_first tests. It starts a
// bunch of backends exporting the TestService, creates a ClientConn to them
// with service config specifying the use of the pick_first LB policy.
// with service config specifying the use of the state_storing LB policy.
func setupPickFirstLeaf(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, *backendManager) {
t.Helper()
r := manual.NewBuilderWithScheme("whatever")
backends := make([]*stubserver.StubServer, backendCount)
addrs := make([]resolver.Address, backendCount)

for i := 0; i < backendCount; i++ {
backend := &stubserver.StubServer{
EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Logf("Started TestService backend at: %q", backend.Address)
t.Cleanup(func() { backend.S.Stop() })

backend := stubserver.StartTestService(t, nil)
t.Cleanup(func() {
backend.Stop()
})
backends[i] = backend
addrs[i] = resolver.Address{Addr: backend.Address}
}
Expand Down Expand Up @@ -115,7 +108,7 @@ func setupPickFirstLeaf(t *testing.T, backendCount int, opts ...grpc.DialOption)
}

// TestPickFirstLeaf_SimpleResolverUpdate tests the behaviour of the pick first
// policy when when given an list of addresses. The following steps are carried
// policy when given an list of addresses. The following steps are carried
// out in order:
// 1. A list of addresses are given through the resolver. Only one
// of the servers is running.
Expand All @@ -131,7 +124,7 @@ func (s) TestPickFirstLeaf_SimpleResolverUpdate_FirstServerReady(t *testing.T) {

cc, r, bm := setupPickFirstLeaf(t, 2)
addrs := bm.resolverAddrs()
stateSubscriber := &ccStateSubscriber{transitions: []connectivity.State{}}
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

r.UpdateState(resolver.State{Addresses: addrs})
Expand Down Expand Up @@ -171,7 +164,7 @@ func (s) TestPickFirstLeaf_SimpleResolverUpdate_FirstServerUnReady(t *testing.T)

cc, r, bm := setupPickFirstLeaf(t, 2)
addrs := bm.resolverAddrs()
stateSubscriber := &ccStateSubscriber{transitions: []connectivity.State{}}
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)
bm.stopAllExcept(1)

Expand Down Expand Up @@ -213,7 +206,7 @@ func (s) TestPickFirstLeaf_SimpleResolverUpdate_DuplicateAddrs(t *testing.T) {

cc, r, bm := setupPickFirstLeaf(t, 2)
addrs := bm.resolverAddrs()
stateSubscriber := &ccStateSubscriber{transitions: []connectivity.State{}}
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)
bm.stopAllExcept(1)

Expand Down Expand Up @@ -269,7 +262,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_DisjointLists(t *testing.T) {
balancer.Register(&stateStoringBalancerBuilder{balancer: balCh})
cc, r, bm := setupPickFirstLeaf(t, 4)
addrs := bm.resolverAddrs()
stateSubscriber := &ccStateSubscriber{transitions: []connectivity.State{}}
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

bm.backends[0].S.Stop()
Expand Down Expand Up @@ -330,7 +323,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_ActiveBackendInUpdatedList(t *testing
balancer.Register(&stateStoringBalancerBuilder{balancer: balCh})
cc, r, bm := setupPickFirstLeaf(t, 3)
addrs := bm.resolverAddrs()
stateSubscriber := &ccStateSubscriber{transitions: []connectivity.State{}}
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

bm.backends[0].S.Stop()
Expand Down Expand Up @@ -392,7 +385,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_InActiveBackendInUpdatedList(t *testi
balancer.Register(&stateStoringBalancerBuilder{balancer: balCh})
cc, r, bm := setupPickFirstLeaf(t, 3)
addrs := bm.resolverAddrs()
stateSubscriber := &ccStateSubscriber{transitions: []connectivity.State{}}
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

bm.backends[0].S.Stop()
Expand Down Expand Up @@ -455,7 +448,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_IdenticalLists(t *testing.T) {
balancer.Register(&stateStoringBalancerBuilder{balancer: balCh})
cc, r, bm := setupPickFirstLeaf(t, 2)
addrs := bm.resolverAddrs()
stateSubscriber := &ccStateSubscriber{transitions: []connectivity.State{}}
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

bm.backends[0].S.Stop()
Expand Down Expand Up @@ -527,7 +520,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerRestart(t *testing.T)
balancer.Register(&stateStoringBalancerBuilder{balancer: balCh})
cc, r, bm := setupPickFirstLeaf(t, 2)
addrs := bm.resolverAddrs()
stateSubscriber := &ccStateSubscriber{transitions: []connectivity.State{}}
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

// shutdown all active backends except the target.
Expand Down Expand Up @@ -591,7 +584,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerRestart(t *testing.T)
balancer.Register(&stateStoringBalancerBuilder{balancer: balCh})
cc, r, bm := setupPickFirstLeaf(t, 2)
addrs := bm.resolverAddrs()
stateSubscriber := &ccStateSubscriber{transitions: []connectivity.State{}}
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

// shutdown all active backends except the target.
Expand Down Expand Up @@ -662,7 +655,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerToFirst(t *testing.T)
balancer.Register(&stateStoringBalancerBuilder{balancer: balCh})
cc, r, bm := setupPickFirstLeaf(t, 2)
addrs := bm.resolverAddrs()
stateSubscriber := &ccStateSubscriber{transitions: []connectivity.State{}}
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

// shutdown all active backends except the target.
Expand Down Expand Up @@ -733,7 +726,7 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerToSecond(t *testing.T)
balancer.Register(&stateStoringBalancerBuilder{balancer: balCh})
cc, r, bm := setupPickFirstLeaf(t, 2)
addrs := bm.resolverAddrs()
stateSubscriber := &ccStateSubscriber{transitions: []connectivity.State{}}
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

// shutdown all active backends except the target.
Expand Down Expand Up @@ -807,7 +800,7 @@ func (s) TestPickFirstLeaf_EmptyAddressList(t *testing.T) {
cc, r, bm := setupPickFirstLeaf(t, 1)
addrs := bm.resolverAddrs()

stateSubscriber := &ccStateSubscriber{transitions: []connectivity.State{}}
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

r.UpdateState(resolver.State{Addresses: addrs})
Expand Down

0 comments on commit 2b2c2a3

Please sign in to comment.