Skip to content

Commit

Permalink
remove start function in the generic producer
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Sep 5, 2024
1 parent 8aa3604 commit 6d26e43
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 34 deletions.
8 changes: 1 addition & 7 deletions balancer/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,8 @@ func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
return nil, err
}
// Start the health check service if its configured.
closeHealSvcProd := func() {}
if internal.EnableHealthCheckViaProducer != nil {
closeHealSvcProd = internal.EnableHealthCheckViaProducer.(func(balancer.HealthCheckOptions, balancer.SubConn) func())(b.healthCheckOpts, sc)
}
closeGenProd := producer.StartHealthChecking(sc)
sd.closeHealthProducers = func() {
closeHealSvcProd()
closeGenProd()
sd.closeHealthProducers = internal.EnableHealthCheckViaProducer.(func(balancer.HealthCheckOptions, balancer.SubConn) func())(b.healthCheckOpts, sc)
}
sd.subConn = sc
return sd, nil
Expand Down
25 changes: 19 additions & 6 deletions health/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (*producerBuilder) Build(cci any) (balancer.Producer, func()) {
cc: cci.(grpc.ClientConnInterface),
mu: sync.Mutex{},
connectivityState: connectivity.Idle,
healthState: balancer.SubConnState{
ConnectivityState: connectivity.Idle,
},
}
return p, sync.OnceFunc(func() {
p.mu.Lock()
Expand All @@ -79,18 +82,23 @@ type healthServiceProducer struct {
cc grpc.ClientConnInterface
mu sync.Mutex
connectivityState connectivity.State
healthState balancer.SubConnState
subConnStateListener balancer.StateListener
listener balancer.StateListener
oldListener balancer.StateListener
unregisterConnListener func()
opts *balancer.HealthCheckOptions
stopClientFn func()
running bool
}

type noOpListener struct{}
type noOpListener struct {
p *healthServiceProducer
}

func (l *noOpListener) OnStateChange(_ balancer.SubConnState) {
l.p.mu.Lock()
defer l.p.mu.Unlock()
l.p.listener.OnStateChange(l.p.healthState)
}

func EnableHealthCheck(opts balancer.HealthCheckOptions, sc balancer.SubConn) func() {
Expand All @@ -102,7 +110,7 @@ func EnableHealthCheck(opts balancer.HealthCheckOptions, sc balancer.SubConn) fu
return closeFn
}
var closeGenericProducer func()
p.listener, closeGenericProducer = producer.SwapRootListener(&noOpListener{}, sc)
p.listener, closeGenericProducer = producer.SwapRootListener(&noOpListener{p: p}, sc)
ls := &subConnStateListener{
p: p,
}
Expand All @@ -117,15 +125,20 @@ func EnableHealthCheck(opts balancer.HealthCheckOptions, sc balancer.SubConn) fu
}
}

func (p *healthServiceProducer) updateHealthStateLocked(state balancer.SubConnState) {
p.healthState = state
p.listener.OnStateChange(state)
}

func (p *healthServiceProducer) startHealthCheckLocked() {
serviceName := p.opts.ServiceName()
if p.opts.DisableHealthCheckDialOpt || !p.opts.EnableHealthCheck || serviceName == "" {
p.listener.OnStateChange(balancer.SubConnState{ConnectivityState: connectivity.Ready})
p.updateHealthStateLocked(balancer.SubConnState{ConnectivityState: connectivity.Ready})
return
}
if p.opts.HealthCheckFunc == nil {
logger.Error("Health check is requested but health check function is not set.")
p.listener.OnStateChange(balancer.SubConnState{ConnectivityState: connectivity.Ready})
p.updateHealthStateLocked(balancer.SubConnState{ConnectivityState: connectivity.Ready})
return
}
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -140,7 +153,7 @@ func (p *healthServiceProducer) startHealthCheckLocked() {
if !p.running {
return
}
p.listener.OnStateChange(balancer.SubConnState{
p.updateHealthStateLocked(balancer.SubConnState{
ConnectivityState: state,
ConnectionError: err,
})
Expand Down
28 changes: 7 additions & 21 deletions health/producer/generic_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type producer struct {
serializer *grpcsync.CallbackSerializer
rootListener balancer.StateListener
broadcastingListener *broadcastingListner
started bool
}

func RegisterListener(l balancer.StateListener, sc balancer.SubConn) func() {
Expand All @@ -79,46 +78,33 @@ func RegisterListener(l balancer.StateListener, sc balancer.SubConn) func() {
closeFn()
}
p.serializer.TrySchedule(func(ctx context.Context) {
if !p.started {
return
}
p.broadcastingListener.listeners[l] = true
l.OnStateChange(p.healthState)
})
return unregister
}

func StartHealthChecking(sc balancer.SubConn) func() {
pr, closeFn := sc.GetOrBuildProducer(producerBuilderSingleton)
p := pr.(*producer)
p.serializer.TrySchedule(func(ctx context.Context) {
if p.started {
return
}
p.started = true
p.rootListener.OnStateChange(p.healthState)
})
return closeFn
}

// Adds a Sender to beginning of the chain, gives the next sender in the chain to send
// updates.
func SwapRootListener(newListener balancer.StateListener, sc balancer.SubConn) (balancer.StateListener, func()) {
pr, closeFn := sc.GetOrBuildProducer(producerBuilderSingleton)
p := pr.(*producer)
senderCh := make(chan balancer.StateListener, 1)
p.serializer.ScheduleOr(func(ctx context.Context) {
if p.started {
logger.Error("Registering a health producer listener after the producer has already started.")
close(senderCh)
}
oldSender := p.rootListener
p.rootListener = newListener
senderCh <- oldSender
}, func() {
close(senderCh)
})
oldSender := <-senderCh
// Send an update on the root listener to allow the new producer to set
// update the state present in listener down the chain if required.
p.serializer.TrySchedule(func(ctx context.Context) {
p.rootListener.OnStateChange(balancer.SubConnState{
ConnectivityState: connectivity.Ready,
})
})
return oldSender, closeFn
}

Expand Down

0 comments on commit 6d26e43

Please sign in to comment.