From ca2289de86015286f878b2dc63eeec846020e1c3 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Thu, 5 Sep 2024 20:41:37 +0530 Subject: [PATCH] Simplify OD changes --- .../balancer/outlierdetection/balancer.go | 65 +++++++------------ .../outlierdetection/subconn_wrapper.go | 4 +- 2 files changed, 24 insertions(+), 45 deletions(-) diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index 0c5c6d9b6162..7142430354ab 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -361,26 +361,6 @@ func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state } delete(b.scWrappers, scw.SubConn) } - if scw.genericHealthProducerEnabled { - scw.listener(state) - return - } - b.scUpdateCh.Put(&scUpdate{ - scw: scw, - state: state, - }) -} - -func (b *outlierDetectionBalancer) updateHealthState(sc balancer.SubConn, state balancer.SubConnState) { - b.mu.Lock() - defer b.mu.Unlock() - scw, ok := b.scWrappers[sc] - if !ok { - // Shouldn't happen if passed down a SubConnWrapper to child on SubConn - // creation. - b.logger.Errorf("OnStateChange called with SubConn that has no corresponding SubConnWrapper") - return - } b.scUpdateCh.Put(&scUpdate{ scw: scw, state: state, @@ -505,7 +485,7 @@ type healthListener struct { } func (hl *healthListener) OnStateChange(state balancer.SubConnState) { - hl.b.updateHealthState(hl.sc, state) + hl.b.updateSubConnState(hl.sc, state) } func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { @@ -517,23 +497,29 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal var sc balancer.SubConn oldListener := opts.StateListener - opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state) } + if !genericHealthProducerEnabled { + opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state) } + } sc, err := b.cc.NewSubConn(addrs, opts) if err != nil { return nil, err } scw := &subConnWrapper{ - SubConn: sc, - addresses: addrs, - scUpdateCh: b.scUpdateCh, - listener: oldListener, - genericHealthProducerEnabled: genericHealthProducerEnabled, + SubConn: sc, + addresses: addrs, + scUpdateCh: b.scUpdateCh, } if genericHealthProducerEnabled { - scw.healthListener, scw.unregisterHealthListener = genericproducer.SwapRootListener(&healthListener{ + healthListener, unregisterFn := genericproducer.SwapRootListener(&healthListener{ b: b, sc: sc, }, sc) + scw.listener = func(scs balancer.SubConnState) { + healthListener.OnStateChange(scs) + } + scw.unregisterHealthListener = unregisterFn + } else { + scw.listener = oldListener } b.mu.Lock() defer b.mu.Unlock() @@ -666,15 +652,12 @@ func min(x, y time.Duration) time.Duration { func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) { scw := u.scw scw.latestState = u.state - if scw.ejected { - return - } - b.childMu.Lock() - defer b.childMu.Unlock() - if scw.genericHealthProducerEnabled { - scw.healthListener.OnStateChange(u.state) - } else if scw.listener != nil { - scw.listener(u.state) + if !scw.ejected { + if scw.listener != nil { + b.childMu.Lock() + scw.listener(u.state) + b.childMu.Unlock() + } } } @@ -691,12 +674,10 @@ func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) { ConnectivityState: connectivity.TransientFailure, } } - b.childMu.Lock() - defer b.childMu.Unlock() - if scw.genericHealthProducerEnabled { - scw.healthListener.OnStateChange(stateToUpdate) - } else if scw.listener != nil { + if scw.listener != nil { + b.childMu.Lock() scw.listener(stateToUpdate) + b.childMu.Unlock() } } diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go index ad542c79e721..102dc8eee0b0 100644 --- a/xds/internal/balancer/outlierdetection/subconn_wrapper.go +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -30,9 +30,7 @@ import ( // to help track the latest state update from the underlying SubConn, and also // whether or not this SubConn is ejected. type subConnWrapper struct { - genericHealthProducerEnabled bool - unregisterHealthListener func() - healthListener balancer.StateListener + unregisterHealthListener func() balancer.SubConn listener func(balancer.SubConnState)