Skip to content

Commit

Permalink
Fixes in OD
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Sep 5, 2024
1 parent 98a5f05 commit 5d85ae6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
30 changes: 20 additions & 10 deletions xds/internal/balancer/outlierdetection/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,9 @@ func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state
return
}
if state.ConnectivityState == connectivity.Shutdown {
if scw.unregisterHealthListener != nil {
scw.unregisterHealthListener()
scw.unregisterHealthListener = nil
if scw.closeHealthProducerFn != nil {
scw.closeHealthProducerFn()
scw.closeHealthProducerFn = nil
}
delete(b.scWrappers, scw.SubConn)
}
Expand Down Expand Up @@ -388,9 +388,9 @@ func (b *outlierDetectionBalancer) Close() {
}

for _, scw := range b.scWrappers {
if scw.unregisterHealthListener != nil {
scw.unregisterHealthListener()
scw.unregisterHealthListener = nil
if scw.closeHealthProducerFn != nil {
scw.closeHealthProducerFn()
scw.closeHealthProducerFn = nil
}
}
}
Expand Down Expand Up @@ -499,6 +499,12 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal
oldListener := opts.StateListener
if !genericHealthProducerEnabled {
opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state) }
} else {
opts.StateListener = func(scs balancer.SubConnState) {
b.childMu.Lock()
oldListener(scs)
b.childMu.Unlock()
}
}
sc, err := b.cc.NewSubConn(addrs, opts)
if err != nil {
Expand All @@ -510,19 +516,23 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal
scUpdateCh: b.scUpdateCh,
}
if genericHealthProducerEnabled {
healthListener, unregisterFn := genericproducer.SwapRootListener(&healthListener{
oldHL, closeHealthProducerFn := genericproducer.SwapRootListener(&healthListener{
b: b,
sc: sc,
}, sc)
scw.listener = func(scs balancer.SubConnState) {
healthListener.OnStateChange(scs)
oldHL.OnStateChange(scs)
}
scw.unregisterHealthListener = unregisterFn
scw.closeHealthProducerFn = closeHealthProducerFn
} else {
scw.listener = oldListener
}
b.mu.Lock()
defer b.mu.Unlock()
b.logger.Infof("Locked mu")
defer func() {
b.logger.Infof("UnLocked mu")
b.mu.Unlock()
}()
b.scWrappers[sc] = scw
if len(addrs) != 1 {
return scw, nil
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/outlierdetection/subconn_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
// to help track the latest state update from the underlying SubConn, and also
// whether or not this SubConn is ejected.
type subConnWrapper struct {
unregisterHealthListener func()
closeHealthProducerFn func()
balancer.SubConn
listener func(balancer.SubConnState)

Expand Down

0 comments on commit 5d85ae6

Please sign in to comment.