Skip to content

Commit

Permalink
backport of commit a9e7166 (#24527)
Browse files Browse the repository at this point in the history
Co-authored-by: Juana De La Cuesta <[email protected]>
  • Loading branch information
1 parent e97b625 commit b36261d
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 29 deletions.
56 changes: 27 additions & 29 deletions drivers/docker/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,43 +93,41 @@ func (h *taskHandle) Stats(ctx context.Context, interval time.Duration, compute
func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, interval time.Duration, compute cpustats.Compute) {
defer destCh.close()

timer, cancel := helper.NewSafeTimer(interval)
ticker, cancel := helper.NewSafeTicker(interval)
defer cancel()

// we need to use the streaming stats API here because our calculation for
// CPU usage depends on having the values from the previous read, which are
// not available in one-shot
statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, true)
if err != nil && err != io.EOF {
h.logger.Debug("error collecting stats from container", "error", err)
return
}
defer statsReader.Body.Close()

collectOnce := func() {
defer timer.Reset(interval)
var stats *containerapi.Stats
err := json.NewDecoder(statsReader.Body).Decode(&stats)
if err != nil && err != io.EOF {
h.logger.Debug("error decoding stats data from container", "error", err)
return
}
if stats == nil {
h.logger.Debug("error decoding stats data: stats were nil")
return
}
resourceUsage := util.DockerStatsToTaskResourceUsage(stats, compute)
destCh.send(resourceUsage)
}
var stats *containerapi.Stats

for {
select {
case <-ctx.Done():
return
case <-h.doneCh:
return
case <-timer.C:
collectOnce()
case <-ticker.C:
// we need to use the streaming stats API here because our calculation for
// CPU usage depends on having the values from the previous read, which are
// not available in one-shot. This streaming stats can be reused over time,
// but require synchronization, which restricts the interval for the metrics.
statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, true)
if err != nil && err != io.EOF {
h.logger.Debug("error collecting stats from container", "error", err)
return
}

err = json.NewDecoder(statsReader.Body).Decode(&stats)
statsReader.Body.Close()
if err != nil && err != io.EOF {
h.logger.Error("error decoding stats data from container", "error", err)
return
}

if stats == nil {
h.logger.Error("error decoding stats data: stats were nil")
return
}

resourceUsage := util.DockerStatsToTaskResourceUsage(stats, compute)
destCh.send(resourceUsage)
}
}
}
22 changes: 22 additions & 0 deletions helper/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,28 @@ func NewSafeTimer(duration time.Duration) (*time.Timer, StopFunc) {
return t, cancel
}

// NewSafeTicker creates a time.Ticker but does not panic if duration is <= 0.
//
// Returns the time.Ticker and also a StopFunc, forcing the caller to deal
// with stopping the time.Ticker to avoid leaking a goroutine.

func NewSafeTicker(duration time.Duration) (*time.Ticker, StopFunc) {
if duration <= 0 {
// Avoid panic by using the smallest positive value. This is close enough
// to the behavior of time.After(0), which this helper is intended to
// replace.
// https://go.dev/play/p/EIkm9MsPbHY
duration = 1
}

t := time.NewTicker(duration)
cancel := func() {
t.Stop()
}

return t, cancel
}

// NewStoppedTimer creates a time.Timer in a stopped state. This is useful when
// the actual wait time will computed and set later via Reset.
func NewStoppedTimer() (*time.Timer, StopFunc) {
Expand Down

0 comments on commit b36261d

Please sign in to comment.