Skip to content

Commit

Permalink
Enable health check in pf without API changes
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Aug 24, 2024
1 parent bcfe4ae commit 0b1fd7e
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 22 deletions.
2 changes: 1 addition & 1 deletion balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,5 +470,5 @@ type Producer any
// HealthListener is used to listen to subConn state updates.
type HealthListener interface {
// OnStateChange is called when the health check state changes.
OnStateChange(connectivity.State, error)
OnStateChange(SubConnState)
}
26 changes: 14 additions & 12 deletions balancer/pickfirst/pickfirst.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ const (

type pickfirstBuilder struct{}

func (pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
b := &pickfirstBalancer{cc: cc}
func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
b := &pickfirstBalancer{
cc: cc,
}
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
return b
}
Expand Down Expand Up @@ -90,12 +92,9 @@ type healthListener struct {
balancer *pickfirstBalancer
}

func (hl *healthListener) OnStateChange(state connectivity.State, err error) {
fmt.Printf("State update from health listener for subconn %v: %v, %v\n", hl.subConn, state, err)
hl.balancer.updateSubConnState(hl.subConn, balancer.SubConnState{
ConnectivityState: state,
ConnectionError: err,
})
func (hl *healthListener) OnStateChange(state balancer.SubConnState) {
fmt.Printf("State update from health listener for subconn %v: %v\n", hl.subConn, state)
hl.balancer.updateSubConnState(hl.subConn, state, true)
}

func (b *pickfirstBalancer) ResolverError(err error) {
Expand Down Expand Up @@ -190,10 +189,9 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
}
subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
b.updateSubConnState(subConn, state)
b.updateSubConnState(subConn, state, false)
},
HealthCheckEnabled: true,
HealthStateListener: hl,
HealthStateListener: hl,
})
if err != nil {
if b.logger.V(2) {
Expand Down Expand Up @@ -223,7 +221,7 @@ func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state b
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)
}

func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState, fromHealthCheck bool) {
if b.logger.V(2) {
b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state)
}
Expand All @@ -240,6 +238,10 @@ func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state b

switch state.ConnectivityState {
case connectivity.Ready:
if !fromHealthCheck {
// Wait for update from the health check.
return
}
b.cc.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &picker{result: balancer.PickResult{SubConn: subConn}},
Expand Down
4 changes: 2 additions & 2 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,12 @@ type wrappedHealthListener struct {
serializer *grpcsync.CallbackSerializer
}

func (l *wrappedHealthListener) OnStateChange(state connectivity.State, err error) {
func (l *wrappedHealthListener) OnStateChange(state balancer.SubConnState) {
l.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil {
return
}
l.delegate.OnStateChange(state, err)
l.delegate.OnStateChange(state)
})
}

Expand Down
20 changes: 16 additions & 4 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1429,9 +1429,18 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
//
// Caller must hold ac.mu.
func (ac *addrConn) startHealthCheck(ctx context.Context) {
var healthcheckManagingState bool
var healthcheckServiceEnabled bool
defer func() {
if !healthcheckManagingState || ac.scopts.HealthStateListener != nil {
if ac.scopts.HealthStateListener != nil {
ac.updateConnectivityState(connectivity.Ready, nil)
if !healthcheckServiceEnabled {
ac.scopts.HealthStateListener.OnStateChange(balancer.SubConnState{
ConnectivityState: connectivity.Ready,
})
}
return
}
if !healthcheckServiceEnabled {
ac.updateConnectivityState(connectivity.Ready, nil)
}
}()
Expand All @@ -1455,7 +1464,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
return
}

healthcheckManagingState = true
healthcheckServiceEnabled = true

// Set up the health check helper functions.
currentTr := ac.transport
Expand All @@ -1475,7 +1484,10 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
return
}
if ac.scopts.HealthStateListener != nil {
ac.scopts.HealthStateListener.OnStateChange(s, lastErr)
ac.scopts.HealthStateListener.OnStateChange(balancer.SubConnState{
ConnectivityState: s,
ConnectionError: lastErr,
})
} else {
ac.updateConnectivityState(s, lastErr)
}
Expand Down
37 changes: 34 additions & 3 deletions test/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -184,7 +186,36 @@ func setupClient(t *testing.T, c *clientConfig) (*grpc.ClientConn, *manual.Resol
return cc, r
}

const petiolePolicyName = "petiole_policy"

type petiolePolicyBuilder struct{}

// wraps the clientconn to enable health checking in the created subconns.
type wrappedCC struct {
balancer.ClientConn
}

func (w *wrappedCC) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
opts.HealthCheckEnabled = true
return w.ClientConn.NewSubConn(addrs, opts)
}

func (b *petiolePolicyBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &petiolePolicy{
balancer.Get(pickfirst.Name).Build(&wrappedCC{cc}, opts),
}
}

func (b *petiolePolicyBuilder) Name() string {
return petiolePolicyName
}

type petiolePolicy struct {
balancer.Balancer
}

func (s) TestHealthCheckWatchStateChange(t *testing.T) {
balancer.Register(&petiolePolicyBuilder{})
_, lis, ts := setupServer(t, nil)

// The table below shows the expected series of addrConn connectivity transitions when server
Expand All @@ -204,12 +235,12 @@ func (s) TestHealthCheckWatchStateChange(t *testing.T) {
cc, r := setupClient(t, nil)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseServiceConfig(t, r, `{
ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{
"healthCheckConfig": {
"serviceName": "foo"
},
"loadBalancingConfig": [{"pick_first":{}}]
}`)})
"loadBalancingConfig": [{"%s":{}}]
}`, petiolePolicyName))})

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down

0 comments on commit 0b1fd7e

Please sign in to comment.