Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关于 mainLoop 方法的一些疑问 #9

Open
moonheart opened this issue Oct 31, 2022 · 0 comments
Open

关于 mainLoop 方法的一些疑问 #9

moonheart opened this issue Oct 31, 2022 · 0 comments

Comments

@moonheart
Copy link

你好,我在看源码的时候对 mainLoop 方法的实现有一些疑惑,还望解答。

按照我的理解,mainLoop 方法的作用是从 pushChannel 接收 consul 服务的变化事件,对事件进行去抖动(debounce),然后触发 pushConsulService2APIServer 方法。

第一个是在这段代码对 pushChannel 的处理中

case e := <-s.pushChannel:
log.Debugf("Receive event from push chanel : %v", e)
lastResourceUpdateTime = time.Now()
if debouncedEvents == 0 {
log.Debugf("This is the first debounced event")
startDebounce = lastResourceUpdateTime
}
timeChan = time.After(debounceAfter)
debouncedEvents++

如果持续有小于 debounceAfter 间隔的事件触发,会出现最终调用 pushConsulService2APIServer 方法的时间超过 debounceMax 的情况。这里是否有其他的考虑呢?

第二个是在这段代码中的条件判断

if eventDelay >= debounceMax || quietTime >= debounceAfter {
if debouncedEvents > 0 {
pushCounter++
log.Infof("Push debounce stable[%d] %d: %v since last change, %v since last push",
pushCounter, debouncedEvents, quietTime, eventDelay)
err := s.pushConsulService2APIServer()
if err != nil {
log.Errorf("Failed to synchronize consul services to Istio: %v", err)
// Retry if failed
s.pushChannel <- &ChangeEvent{}
}
debouncedEvents = 0
}
} else {
timeChan = time.After(debounceAfter - quietTime)
}

什么情况下会走到 else 中的分支呢?这里如果进入了 case <-timeChan,那必然是满足 eventDelay >= debounceMax || quietTime >= debounceAfter 的条件的。

基于以上,修改的代码如下

func (s *Controller) mainLoop(stop <-chan struct{}) {
	var timeChan <-chan time.Time
	var startDebounce time.Time
	var lastResourceUpdateTime time.Time
	pushCounter := 0
	debouncedEvents := 0

	for {
		select {
		case <-stop:
			break
		case e := <-s.pushChannel:
			log.Debugf("Receive event from push chanel : %v", e)
			lastResourceUpdateTime = time.Now()
			if debouncedEvents == 0 {
				log.Debugf("This is the first debounced event")
				startDebounce = lastResourceUpdateTime
			}
			eventDelay := time.Since(startDebounce)
			debouncedEvents++
			// it has been too long since the first debounced event or quiet enough since the last debounced event
			if eventDelay >= debounceMax {
				timeChan = time.After(0)
			} else {
				timeChan = time.After(debounceAfter)
			}
		case <-timeChan:
			log.Debugf("Receive event from time chanel")
			eventDelay := time.Since(startDebounce)
			quietTime := time.Since(lastResourceUpdateTime)
			pushCounter++
			log.Infof("Push debounce stable[%d] %d: %v since last change, %v since last push",
				pushCounter, debouncedEvents, quietTime, eventDelay)
			err := s.pushConsulService2APIServer()
			if err != nil {
				log.Errorf("Failed to synchronize consul services to Istio: %v", err)
				// Retry if failed
				s.pushChannel <- &ChangeEvent{}
			}
			debouncedEvents = 0
		}
	}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant