Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Aug 9, 2024
1 parent f1daad1 commit 747bd8a
Showing 1 changed file with 218 additions and 25 deletions.
243 changes: 218 additions & 25 deletions test/pickfirst_leaf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/pickfirst"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"
Expand All @@ -39,7 +40,7 @@ import (
testpb "google.golang.org/grpc/interop/grpc_testing"
)

var pickFirstLeafServiceConfig = fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, pickfirstleaf.PickFirstLeafName)
var stateStoringServiceConfig = fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateStoringBalancerName)

const stateStoringBalancerName = "state_storing"

Expand Down Expand Up @@ -72,7 +73,7 @@ func setupPickFirstLeaf(t *testing.T, backendCount int, opts ...grpc.DialOption)
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithDefaultServiceConfig(pickFirstLeafServiceConfig),
grpc.WithDefaultServiceConfig(stateStoringServiceConfig),
}
dopts = append(dopts, opts...)
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
Expand All @@ -92,11 +93,223 @@ func setupPickFirstLeaf(t *testing.T, backendCount int, opts ...grpc.DialOption)
return cc, r, backends
}

// TestPickFirstLeaf_ResolverUpdate tests the behaviour of the new pick first
// policy when servers are brought down and resolver updates are received.
func (s) TestPickFirstLeaf_ResolverUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
balChan := make(chan *stateStoringBalancer, 1)
balancer.Register(&stateStoringBalancerBuilder{balancerChan: balChan})
tests := []struct {
name string
backendCount int
initialBackendIndexes []int
initialTargetBackendIndex int
wantScStates []connectivity.State
updatedBackendIndexes []int
updatedTargetBackendIndex int
wantScStatesPostUpdate []connectivity.State
restartConnected bool
}{
{
name: "two_server_first_ready",
backendCount: 2,
initialBackendIndexes: []int{0, 1},
initialTargetBackendIndex: 0,
wantScStates: []connectivity.State{connectivity.Ready},
},
{
name: "two_server_second_ready",
backendCount: 2,
initialBackendIndexes: []int{0, 1},
initialTargetBackendIndex: 1,
wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready},
},
{
name: "duplicate_address",
backendCount: 2,
initialBackendIndexes: []int{0, 0, 1},
initialTargetBackendIndex: 1,
wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready},
},
{
name: "disjoint_updated_addresses",
backendCount: 4,
initialBackendIndexes: []int{0, 1},
initialTargetBackendIndex: 1,
wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready},
updatedBackendIndexes: []int{2, 3},
updatedTargetBackendIndex: 3,
wantScStatesPostUpdate: []connectivity.State{connectivity.Shutdown, connectivity.Shutdown, connectivity.Shutdown, connectivity.Ready},
},
{
name: "active_backend_in_updated_list",
backendCount: 3,
initialBackendIndexes: []int{0, 1},
initialTargetBackendIndex: 1,
wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready},
updatedBackendIndexes: []int{1, 2},
updatedTargetBackendIndex: 1,
wantScStatesPostUpdate: []connectivity.State{connectivity.Shutdown, connectivity.Ready},
},
{
name: "inactive_backend_in_updated_list",
backendCount: 3,
initialBackendIndexes: []int{0, 1},
initialTargetBackendIndex: 1,
wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready},
updatedBackendIndexes: []int{0, 2},
updatedTargetBackendIndex: 0,
wantScStatesPostUpdate: []connectivity.State{connectivity.Shutdown, connectivity.Shutdown, connectivity.Ready},
},
{
name: "identical_list",
backendCount: 2,
initialBackendIndexes: []int{0, 1},
initialTargetBackendIndex: 1,
wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready},
updatedBackendIndexes: []int{0, 1},
updatedTargetBackendIndex: 1,
wantScStatesPostUpdate: []connectivity.State{connectivity.Shutdown, connectivity.Ready},
},
{
name: "first_connected_idle_reconnect",
backendCount: 2,
initialBackendIndexes: []int{0, 1},
initialTargetBackendIndex: 0,
restartConnected: true,
wantScStates: []connectivity.State{connectivity.Ready},
updatedBackendIndexes: []int{0, 1},
updatedTargetBackendIndex: 0,
wantScStatesPostUpdate: []connectivity.State{connectivity.Ready},
},
{
name: "second_connected_idle_reconnect",
backendCount: 2,
initialBackendIndexes: []int{0, 1},
initialTargetBackendIndex: 1,
restartConnected: true,
wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready},
updatedBackendIndexes: []int{0, 1},
updatedTargetBackendIndex: 1,
wantScStatesPostUpdate: []connectivity.State{connectivity.Shutdown, connectivity.Ready, connectivity.Shutdown},
},
{
name: "second_connected_idle_reconnect_first",
backendCount: 2,
initialBackendIndexes: []int{0, 1},
initialTargetBackendIndex: 1,
restartConnected: true,
wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready},
updatedBackendIndexes: []int{0, 1},
updatedTargetBackendIndex: 0,
wantScStatesPostUpdate: []connectivity.State{connectivity.Shutdown, connectivity.Shutdown, connectivity.Ready},
},
{
name: "first_connected_idle_reconnect_second",
backendCount: 2,
initialBackendIndexes: []int{0, 1},
initialTargetBackendIndex: 0,
restartConnected: true,
wantScStates: []connectivity.State{connectivity.Ready},
updatedBackendIndexes: []int{0, 1},
updatedTargetBackendIndex: 1,
wantScStatesPostUpdate: []connectivity.State{connectivity.Shutdown, connectivity.Ready},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
cc, r, backends := setupPickFirstLeaf(t, tc.backendCount)

activeBackends := []*stubserver.StubServer{}
for _, idx := range tc.initialBackendIndexes {
activeBackends = append(activeBackends, backends[idx])
}
addrs := stubBackendsToResolverAddrs(activeBackends)
r.UpdateState(resolver.State{Addresses: addrs})

// shutdown all active backends except the target.
var targetAddr resolver.Address
for idx_i, idx := range tc.initialBackendIndexes {
if idx == tc.initialTargetBackendIndex {
targetAddr = addrs[idx_i]
continue
}
backends[idx].S.Stop()
}

if err := pickfirst.CheckRPCsToBackend(ctx, cc, targetAddr); err != nil {
t.Fatal(err)
}
bal := <-balChan
scs := bal.scStates

if got, want := len(scs), len(tc.wantScStates); got != want {
t.Fatalf("len(subconns) = %d, want %d", got, want)
}

for idx := range scs {
if got, want := scs[idx].state, tc.wantScStates[idx]; got != want {
t.Errorf("subconn[%d].state = %v, want = %v", idx, got, want)
}
}

if len(tc.updatedBackendIndexes) == 0 {
return
}

// Restart all the backends.
for i, s := range backends {
if !tc.restartConnected && i == tc.initialTargetBackendIndex {
continue
}
s.S.Stop()
if err := s.StartServer(); err != nil {
t.Fatalf("Failed to re-start test backend: %v", err)
}
}

activeBackends = []*stubserver.StubServer{}
for _, idx := range tc.updatedBackendIndexes {
activeBackends = append(activeBackends, backends[idx])
}
addrs = stubBackendsToResolverAddrs(activeBackends)
r.UpdateState(resolver.State{Addresses: addrs})

// shutdown all active backends except the target.
for idx_i, idx := range tc.updatedBackendIndexes {
if idx == tc.updatedTargetBackendIndex {
targetAddr = addrs[idx_i]
continue
}
backends[idx].S.Stop()
}

if err := pickfirst.CheckRPCsToBackend(ctx, cc, targetAddr); err != nil {
t.Fatal(err)
}
scs = bal.scStates

if got, want := len(scs), len(tc.wantScStatesPostUpdate); got != want {
t.Fatalf("len(subconns) = %d, want %d", got, want)
}

for idx := range scs {
if got, want := scs[idx].state, tc.wantScStatesPostUpdate[idx]; got != want {
t.Errorf("subconn[%d].state = %v, want = %v", idx, got, want)
}
}

})
}
}

// stateStoringBalancer stores the state of the subconns being created.
type stateStoringBalancer struct {
balancer.Balancer
mu sync.Mutex
scStates []*scState
ccState connectivity.State
}

func (b *stateStoringBalancer) Close() {
Expand All @@ -110,10 +323,7 @@ func (b *stateStoringBalancer) ExitIdle() {
}

type stateStoringBalancerBuilder struct {
}

func newStateStoringBalancerBuilder() *stateStoringBalancerBuilder {
return &stateStoringBalancerBuilder{}
balancerChan chan *stateStoringBalancer
}

func (b *stateStoringBalancerBuilder) Name() string {
Expand All @@ -123,6 +333,7 @@ func (b *stateStoringBalancerBuilder) Name() string {
func (b *stateStoringBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
bal := &stateStoringBalancer{}
bal.Balancer = balancer.Get(pickfirstleaf.PickFirstLeafName).Build(&stateStoringCCWrapper{cc, bal}, opts)
b.balancerChan <- bal
return bal
}

Expand All @@ -136,25 +347,12 @@ func (b *stateStoringBalancer) subConns() []scState {
return ret
}

func (b *stateStoringBalancer) setCCState(state connectivity.State) {
b.mu.Lock()
b.ccState = state
b.mu.Unlock()
}

func (b *stateStoringBalancer) addScState(state *scState) {
b.mu.Lock()
b.scStates = append(b.scStates, state)
b.mu.Unlock()
}

func (b *stateStoringBalancer) curCCState() connectivity.State {
b.mu.Lock()
ret := b.ccState
b.mu.Unlock()
return ret
}

type stateStoringCCWrapper struct {
balancer.ClientConn
b *stateStoringBalancer
Expand All @@ -176,11 +374,6 @@ func (ccw *stateStoringCCWrapper) NewSubConn(addrs []resolver.Address, opts bala
return ccw.ClientConn.NewSubConn(addrs, opts)
}

func (ccw *stateStoringCCWrapper) UpdateState(state balancer.State) {
ccw.b.setCCState(state.ConnectivityState)
ccw.ClientConn.UpdateState(state)
}

type scState struct {
state connectivity.State
addrs []resolver.Address
Expand Down

0 comments on commit 747bd8a

Please sign in to comment.