diff --git a/balancer/balancer.go b/balancer/balancer.go index df5cc3246b25..ae6df0750fae 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -152,7 +152,18 @@ type SubConn interface { // indicate the shutdown operation. This may be delivered before // in-progress RPCs are complete and the actual connection is closed. Shutdown() + // RegisterConnectivityListner allows producers to subscribe to subchannel + // connectivity state updates. Listener will get updates only till the ClientConn + // is closed. It is not guaranteed for listeners to get an update when the + // subchannel transitions to SHUTDOWN. Listeners should be unregestered + // when they are no longer required. The listener will get called with the + // present connectivity state before receiving any other updates. + // Registering a listener multiple times without unregistering is a no-op. RegisterConnectivityListner(StateListener) + // UnregisterConnectivityListner allows producer to stop receiving updates + // on the given listener. If the listener was not previously registered, this + // is a no-op. The listener may still receive pending updates that came + // before the unregistration request. UnregisterConnectivityListner(StateListener) } diff --git a/balancer/pickfirstleaf/pickfirstleaf.go b/balancer/pickfirstleaf/pickfirstleaf.go index 3a96741d150c..a654906bde9a 100644 --- a/balancer/pickfirstleaf/pickfirstleaf.go +++ b/balancer/pickfirstleaf/pickfirstleaf.go @@ -144,6 +144,8 @@ func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) { // Start the health check service if its configured. if internal.EnableHealthCheckViaProducer != nil { sd.closeHealthProducers = internal.EnableHealthCheckViaProducer.(func(balancer.HealthCheckOptions, balancer.SubConn) func())(b.healthCheckOpts, sc) + } else if b.healthCheckOpts.EnableHealthCheck { + b.logger.Errorf("Health check is requested but health check function is not set.") } sd.subConn = sc return sd, nil diff --git a/balancer_wrapper.go b/balancer_wrapper.go index 972c77ed0268..ad927d736a28 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -376,11 +376,15 @@ func (acbw *acBalancerWrapper) RegisterConnectivityListner(l balancer.StateListe if ctx.Err() != nil || acbw.ccb.balancer == nil { return } + if acbw.connectivityListeners[l] { + return + } acbw.connectivityListeners[l] = true acbw.ac.mu.Lock() - defer acbw.ac.mu.Unlock() + state := acbw.ac.state + acbw.ac.mu.Unlock() l.OnStateChange(balancer.SubConnState{ - ConnectivityState: acbw.ac.state, + ConnectivityState: state, }) }) } diff --git a/health/genericproducer/producer.go b/health/genericproducer/producer.go index 0de212375145..3e9c934a2e24 100644 --- a/health/genericproducer/producer.go +++ b/health/genericproducer/producer.go @@ -24,7 +24,7 @@ var producerBuilderSingleton *producerBuilder type broadcastingListner struct { p *producer - listeners map[balancer.StateListener]bool + listeners map[balancer.StateListener]int } func (l *broadcastingListner) OnStateChange(scs balancer.SubConnState) { @@ -49,7 +49,7 @@ func (*producerBuilder) Build(cci any) (balancer.Producer, func()) { p.connectivityListener = &connectivityListener{p: p} p.broadcastingListener = &broadcastingListner{ p: p, - listeners: make(map[balancer.StateListener]bool), + listeners: make(map[balancer.StateListener]int), } p.rootListener = p.broadcastingListener return p, sync.OnceFunc(func() { @@ -88,10 +88,16 @@ func (l *connectivityListener) OnStateChange(state balancer.SubConnState) { }) } -// RegisterListener is used by health consumers to start listening for health -// updates. It returns a function to unregister the listener and manage -// ref counting. It must be called by consumers when they no longer required the -// listener. +// RegisterListener allows health consumers to start listening for health +// updates till the ClientConn is closed. It is not guaranteed for listeners to +// get an update when the subchannel transitions to SHUTDOWN. +// It returns a function to unregister the listener and manage ref counting. +// Listeners must be unregestered when they are no longer required. The listener +// will get called with the present connectivity state before receiving any +// other updates. If a listener is registered multiple times, it will receive each +// update only once. The function returned by all the registration requests must be +// called to finally unregister the listener. The listener will asynchronously +// get all the updates that were queued before the unregisteration request came. func RegisterListener(l balancer.StateListener, sc balancer.SubConn) func() { pr, closeFn := sc.GetOrBuildProducer(producerBuilderSingleton) p := pr.(*producer) @@ -104,16 +110,17 @@ func RegisterListener(l balancer.StateListener, sc balancer.SubConn) func() { p.sc = sc sc.RegisterConnectivityListner(p.connectivityListener) } - p.broadcastingListener.listeners[l] = true + p.broadcastingListener.listeners[l]++ l.OnStateChange(p.healthState) }) return unregister } // SwapRootListener sets the given listener as the root of the listener chain. -// It returns the previous root of the chain. The producer must process calls -// to the registered listener in a passthrough manner by calling the returned -// listener every time it received an update. +// It returns the previous root of the chain. The new root will be called with +// the present connectivity state before any other updates are sent. +// A cleanup function is also returned which must be called before the calling +// producer/LB policy shuts down. func SwapRootListener(newListener balancer.StateListener, sc balancer.SubConn) (balancer.StateListener, func()) { // closeFn can be called synchronously when consumer listeners are getting notified // from the serializer.If the refcount falls to 0, it can use the producer @@ -144,6 +151,9 @@ func SwapRootListener(newListener balancer.StateListener, sc balancer.SubConn) ( func (p *producer) unregisterListener(l balancer.StateListener) { p.serializer.TrySchedule(func(_ context.Context) { - delete(p.broadcastingListener.listeners, l) + p.broadcastingListener.listeners[l]-- + if p.broadcastingListener.listeners[l] == 0 { + delete(p.broadcastingListener.listeners, l) + } }) } diff --git a/health/producer.go b/health/producer.go index cc864757f656..94a6c4507fc7 100644 --- a/health/producer.go +++ b/health/producer.go @@ -33,6 +33,8 @@ func (l *connectivityStateListener) OnStateChange(newState balancer.SubConnState // Propagate updates down the listener chain. l.p.updateStateLocked() }() + // Behave as as an identity function and pass on the connectivity updates + // down the listener chain. if l.p.shutdown { return } @@ -102,7 +104,11 @@ type healthServiceProducer struct { } // EnableHealthCheck enabled the health check service client to perform health -// checks for the subchannel. +// checks for the subchannel. It must be called at most once on a subchannel. +// Once the health check service is enabled, consumers can receive its updates +// by registering a listener with the generic health producer. +// It returns a cleanup function that must be called once the health checking is +// no longer required. func EnableHealthCheck(opts balancer.HealthCheckOptions, sc balancer.SubConn) func() { pr, closeFn := sc.GetOrBuildProducer(producerBuilderSingleton) p := pr.(*healthServiceProducer)