Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Commit

Permalink
External probe: Send metrics even if requests timeout.
Browse files Browse the repository at this point in the history
Also, add tests that would have caught this bug.

Ref: #653
PiperOrigin-RevId: 393020350
  • Loading branch information
manugarg committed Aug 26, 2021
1 parent 30fb6ab commit bd997e0
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 7 deletions.
11 changes: 11 additions & 0 deletions probes/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,17 @@ func (p *Probe) runServerProbe(ctx, startCtx context.Context) {

// Wait for receiver goroutine to exit.
wg.Wait()

// Handle requests that we have not yet received replies for: "requests" will
// contain only outstanding requests by this point.
requestsMu.Lock()
defer requestsMu.Unlock()
for _, req := range requests {
p.processProbeResult(&probeStatus{
target: req.target,
success: false,
}, p.results[req.target])
}
}

// runCommand encapsulates command executor in a variable so that we can
Expand Down
54 changes: 47 additions & 7 deletions probes/external/external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,37 @@ func setProbeOptions(p *Probe, name, value string) {

// runAndVerifyServerProbe executes a server probe and verifies the replies
// received.
func runAndVerifyServerProbe(t *testing.T, p *Probe, action string, tgts []string, total, success map[string]int64) {
func runAndVerifyServerProbe(t *testing.T, p *Probe, action string, tgts []string, total, success map[string]int64, numEventMetrics int) {
setProbeOptions(p, "action", action)

runAndVerifyProbe(t, p, tgts, total, success)

// Verify that we got all the expected EventMetrics
ems, err := testutils.MetricsFromChannel(p.dataChan, numEventMetrics, 1*time.Second)
if err != nil {
t.Error(err)
}
metricsMap := testutils.MetricsMap(ems)

// Convenient wrapper to get the last value from a series.
lastValue := func(s []*metrics.EventMetrics, metricName string) int64 {
return s[len(s)-1].Metric(metricName).(metrics.NumValue).Int64()
}

for _, tgt := range tgts {
vals := make(map[string]int64)
for _, m := range []string{"total", "success"} {
s := metricsMap[m][tgt]
if len(s) == 0 {
t.Errorf("No %s metric for target: %s", m, tgt)
continue
}
vals[m] = lastValue(s, m)
}
if vals["success"] != success[tgt] || vals["total"] != total[tgt] {
t.Errorf("Target(%s) total=%d, success=%d, wanted: total=%d, success=%d, all_metrics=%s", tgt, vals["total"], vals["success"], total[tgt], success[tgt], ems)
}
}
}

func runAndVerifyProbe(t *testing.T, p *Probe, tgts []string, total, success map[string]int64) {
Expand Down Expand Up @@ -218,31 +246,43 @@ func TestProbeServerMode(t *testing.T) {
total[tgt]++
success[tgt]++
}
runAndVerifyServerProbe(t, p, "nopayload", tgts, total, success)
t.Run("nopayload", func(t *testing.T) {
runAndVerifyServerProbe(t, p, "nopayload", tgts, total, success, 2)
})

// Payload
tgts = []string{"target3"}
for _, tgt := range tgts {
total[tgt]++
success[tgt]++
}
runAndVerifyServerProbe(t, p, "payload", tgts, total, success)
t.Run("payload", func(t *testing.T) {
// 2 metrics per target
runAndVerifyServerProbe(t, p, "payload", tgts, total, success, 1*2)
})

// Payload with error
tgts = []string{"target2", "target3"}
for _, tgt := range tgts {
total[tgt]++
}
runAndVerifyServerProbe(t, p, "payload_with_error", tgts, total, success)
t.Run("payload_with_error", func(t *testing.T) {
// 2 targets, 2 EMs per target
runAndVerifyServerProbe(t, p, "payload_with_error", tgts, total, success, 2*2)
})

// Timeout
tgts = []string{"target1", "target2", "target3"}
for _, tgt := range tgts {
total[tgt]++
}

// Reduce probe timeout to make this test pass quicker.
p.opts.Timeout = time.Second
runAndVerifyServerProbe(t, p, "timeout", tgts, total, success)
t.Run("timeout", func(t *testing.T) {
// 3 targets, 1 EM per target
runAndVerifyServerProbe(t, p, "timeout", tgts, total, success, 3*1)
})
}

func TestProbeServerRemotePipeClose(t *testing.T) {
Expand All @@ -258,7 +298,7 @@ func TestProbeServerRemotePipeClose(t *testing.T) {
}
// Reduce probe timeout to make this test pass quicker.
p.opts.Timeout = time.Second
runAndVerifyServerProbe(t, p, "pipe_server_close", tgts, total, success)
runAndVerifyServerProbe(t, p, "pipe_server_close", tgts, total, success, 1)
readError := <-readErrorCh
if readError == nil {
t.Error("Didn't get error in reading pipe")
Expand All @@ -282,7 +322,7 @@ func TestProbeServerLocalPipeClose(t *testing.T) {
// Reduce probe timeout to make this test pass quicker.
p.opts.Timeout = time.Second
p.cmdStdout.(*os.File).Close()
runAndVerifyServerProbe(t, p, "pipe_local_close", tgts, total, success)
runAndVerifyServerProbe(t, p, "pipe_local_close", tgts, total, success, 1)
readError := <-readErrorCh
if readError == nil {
t.Error("Didn't get error in reading pipe")
Expand Down

0 comments on commit bd997e0

Please sign in to comment.