Skip to content

Commit

Permalink
Simplify OD changes
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Sep 5, 2024
1 parent 75ef129 commit ca2289d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 45 deletions.
65 changes: 23 additions & 42 deletions xds/internal/balancer/outlierdetection/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,26 +361,6 @@ func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state
}
delete(b.scWrappers, scw.SubConn)
}
if scw.genericHealthProducerEnabled {
scw.listener(state)
return
}
b.scUpdateCh.Put(&scUpdate{
scw: scw,
state: state,
})
}

func (b *outlierDetectionBalancer) updateHealthState(sc balancer.SubConn, state balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
scw, ok := b.scWrappers[sc]
if !ok {
// Shouldn't happen if passed down a SubConnWrapper to child on SubConn
// creation.
b.logger.Errorf("OnStateChange called with SubConn that has no corresponding SubConnWrapper")
return
}
b.scUpdateCh.Put(&scUpdate{
scw: scw,
state: state,
Expand Down Expand Up @@ -505,7 +485,7 @@ type healthListener struct {
}

func (hl *healthListener) OnStateChange(state balancer.SubConnState) {
hl.b.updateHealthState(hl.sc, state)
hl.b.updateSubConnState(hl.sc, state)
}

func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
Expand All @@ -517,23 +497,29 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal

var sc balancer.SubConn
oldListener := opts.StateListener
opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state) }
if !genericHealthProducerEnabled {
opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state) }
}
sc, err := b.cc.NewSubConn(addrs, opts)
if err != nil {
return nil, err
}
scw := &subConnWrapper{
SubConn: sc,
addresses: addrs,
scUpdateCh: b.scUpdateCh,
listener: oldListener,
genericHealthProducerEnabled: genericHealthProducerEnabled,
SubConn: sc,
addresses: addrs,
scUpdateCh: b.scUpdateCh,
}
if genericHealthProducerEnabled {
scw.healthListener, scw.unregisterHealthListener = genericproducer.SwapRootListener(&healthListener{
healthListener, unregisterFn := genericproducer.SwapRootListener(&healthListener{
b: b,
sc: sc,
}, sc)
scw.listener = func(scs balancer.SubConnState) {
healthListener.OnStateChange(scs)
}
scw.unregisterHealthListener = unregisterFn
} else {
scw.listener = oldListener
}
b.mu.Lock()
defer b.mu.Unlock()
Expand Down Expand Up @@ -666,15 +652,12 @@ func min(x, y time.Duration) time.Duration {
func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) {
scw := u.scw
scw.latestState = u.state
if scw.ejected {
return
}
b.childMu.Lock()
defer b.childMu.Unlock()
if scw.genericHealthProducerEnabled {
scw.healthListener.OnStateChange(u.state)
} else if scw.listener != nil {
scw.listener(u.state)
if !scw.ejected {
if scw.listener != nil {
b.childMu.Lock()
scw.listener(u.state)
b.childMu.Unlock()
}
}
}

Expand All @@ -691,12 +674,10 @@ func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) {
ConnectivityState: connectivity.TransientFailure,
}
}
b.childMu.Lock()
defer b.childMu.Unlock()
if scw.genericHealthProducerEnabled {
scw.healthListener.OnStateChange(stateToUpdate)
} else if scw.listener != nil {
if scw.listener != nil {
b.childMu.Lock()
scw.listener(stateToUpdate)
b.childMu.Unlock()
}
}

Expand Down
4 changes: 1 addition & 3 deletions xds/internal/balancer/outlierdetection/subconn_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (
// to help track the latest state update from the underlying SubConn, and also
// whether or not this SubConn is ejected.
type subConnWrapper struct {
genericHealthProducerEnabled bool
unregisterHealthListener func()
healthListener balancer.StateListener
unregisterHealthListener func()
balancer.SubConn
listener func(balancer.SubConnState)

Expand Down

0 comments on commit ca2289d

Please sign in to comment.