Skip to content

Commit

Permalink
refine behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Sep 6, 2024
1 parent d0322c2 commit 44f9543
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 14 deletions.
11 changes: 11 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,18 @@ type SubConn interface {
// indicate the shutdown operation. This may be delivered before
// in-progress RPCs are complete and the actual connection is closed.
Shutdown()
// RegisterConnectivityListner allows producers to subscribe to subchannel
// connectivity state updates. Listener will get updates only till the ClientConn
// is closed. It is not guaranteed for listeners to get an update when the
// subchannel transitions to SHUTDOWN. Listeners should be unregestered
// when they are no longer required. The listener will get called with the
// present connectivity state before receiving any other updates.
// Registering a listener multiple times without unregistering is a no-op.
RegisterConnectivityListner(StateListener)
// UnregisterConnectivityListner allows producer to stop receiving updates
// on the given listener. If the listener was not previously registered, this
// is a no-op. The listener may still receive pending updates that came
// before the unregistration request.
UnregisterConnectivityListner(StateListener)
}

Expand Down
2 changes: 2 additions & 0 deletions balancer/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
// Start the health check service if its configured.
if internal.EnableHealthCheckViaProducer != nil {
sd.closeHealthProducers = internal.EnableHealthCheckViaProducer.(func(balancer.HealthCheckOptions, balancer.SubConn) func())(b.healthCheckOpts, sc)
} else if b.healthCheckOpts.EnableHealthCheck {
b.logger.Errorf("Health check is requested but health check function is not set.")
}
sd.subConn = sc
return sd, nil
Expand Down
8 changes: 6 additions & 2 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,15 @@ func (acbw *acBalancerWrapper) RegisterConnectivityListner(l balancer.StateListe
if ctx.Err() != nil || acbw.ccb.balancer == nil {
return
}
if acbw.connectivityListeners[l] {
return
}
acbw.connectivityListeners[l] = true
acbw.ac.mu.Lock()
defer acbw.ac.mu.Unlock()
state := acbw.ac.state
acbw.ac.mu.Unlock()
l.OnStateChange(balancer.SubConnState{
ConnectivityState: acbw.ac.state,
ConnectivityState: state,
})
})
}
Expand Down
32 changes: 21 additions & 11 deletions health/genericproducer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var producerBuilderSingleton *producerBuilder

type broadcastingListner struct {
p *producer
listeners map[balancer.StateListener]bool
listeners map[balancer.StateListener]int
}

func (l *broadcastingListner) OnStateChange(scs balancer.SubConnState) {
Expand All @@ -49,7 +49,7 @@ func (*producerBuilder) Build(cci any) (balancer.Producer, func()) {
p.connectivityListener = &connectivityListener{p: p}
p.broadcastingListener = &broadcastingListner{
p: p,
listeners: make(map[balancer.StateListener]bool),
listeners: make(map[balancer.StateListener]int),
}
p.rootListener = p.broadcastingListener
return p, sync.OnceFunc(func() {
Expand Down Expand Up @@ -88,10 +88,16 @@ func (l *connectivityListener) OnStateChange(state balancer.SubConnState) {
})
}

// RegisterListener is used by health consumers to start listening for health
// updates. It returns a function to unregister the listener and manage
// ref counting. It must be called by consumers when they no longer required the
// listener.
// RegisterListener allows health consumers to start listening for health
// updates till the ClientConn is closed. It is not guaranteed for listeners to
// get an update when the subchannel transitions to SHUTDOWN.
// It returns a function to unregister the listener and manage ref counting.
// Listeners must be unregestered when they are no longer required. The listener
// will get called with the present connectivity state before receiving any
// other updates. If a listener is registered multiple times, it will receive each
// update only once. The function returned by all the registration requests must be
// called to finally unregister the listener. The listener will asynchronously
// get all the updates that were queued before the unregisteration request came.
func RegisterListener(l balancer.StateListener, sc balancer.SubConn) func() {
pr, closeFn := sc.GetOrBuildProducer(producerBuilderSingleton)
p := pr.(*producer)
Expand All @@ -104,16 +110,17 @@ func RegisterListener(l balancer.StateListener, sc balancer.SubConn) func() {
p.sc = sc
sc.RegisterConnectivityListner(p.connectivityListener)
}
p.broadcastingListener.listeners[l] = true
p.broadcastingListener.listeners[l]++
l.OnStateChange(p.healthState)
})
return unregister
}

// SwapRootListener sets the given listener as the root of the listener chain.
// It returns the previous root of the chain. The producer must process calls
// to the registered listener in a passthrough manner by calling the returned
// listener every time it received an update.
// It returns the previous root of the chain. The new root will be called with
// the present connectivity state before any other updates are sent.
// A cleanup function is also returned which must be called before the calling
// producer/LB policy shuts down.
func SwapRootListener(newListener balancer.StateListener, sc balancer.SubConn) (balancer.StateListener, func()) {
// closeFn can be called synchronously when consumer listeners are getting notified
// from the serializer.If the refcount falls to 0, it can use the producer
Expand Down Expand Up @@ -144,6 +151,9 @@ func SwapRootListener(newListener balancer.StateListener, sc balancer.SubConn) (

func (p *producer) unregisterListener(l balancer.StateListener) {
p.serializer.TrySchedule(func(_ context.Context) {
delete(p.broadcastingListener.listeners, l)
p.broadcastingListener.listeners[l]--
if p.broadcastingListener.listeners[l] == 0 {
delete(p.broadcastingListener.listeners, l)
}
})
}
8 changes: 7 additions & 1 deletion health/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func (l *connectivityStateListener) OnStateChange(newState balancer.SubConnState
// Propagate updates down the listener chain.
l.p.updateStateLocked()
}()
// Behave as as an identity function and pass on the connectivity updates
// down the listener chain.
if l.p.shutdown {
return
}
Expand Down Expand Up @@ -102,7 +104,11 @@ type healthServiceProducer struct {
}

// EnableHealthCheck enabled the health check service client to perform health
// checks for the subchannel.
// checks for the subchannel. It must be called at most once on a subchannel.
// Once the health check service is enabled, consumers can receive its updates
// by registering a listener with the generic health producer.
// It returns a cleanup function that must be called once the health checking is
// no longer required.
func EnableHealthCheck(opts balancer.HealthCheckOptions, sc balancer.SubConn) func() {
pr, closeFn := sc.GetOrBuildProducer(producerBuilderSingleton)
p := pr.(*healthServiceProducer)
Expand Down

0 comments on commit 44f9543

Please sign in to comment.