diff --git a/probes/external/external.go b/probes/external/external.go index b0ccb15b..8bf969df 100644 --- a/probes/external/external.go +++ b/probes/external/external.go @@ -329,6 +329,13 @@ func (p *Probe) readProbeReplies(done chan struct{}) error { } +func (p *Probe) withAdditionalLabels(em *metrics.EventMetrics, target string) *metrics.EventMetrics { + for _, al := range p.opts.AdditionalLabels { + em.AddLabel(al.KeyValueForTarget(target)) + } + return em +} + func (p *Probe) defaultMetrics(target string, result *result) *metrics.EventMetrics { em := metrics.NewEventMetrics(time.Now()). AddMetric("success", metrics.NewInt(result.success)). @@ -340,15 +347,11 @@ func (p *Probe) defaultMetrics(target string, result *result) *metrics.EventMetr em.LatencyUnit = p.opts.LatencyUnit - for _, al := range p.opts.AdditionalLabels { - em.AddLabel(al.KeyValueForTarget(target)) - } - if p.opts.Validators != nil { em.AddMetric("validation_failure", result.validationFailure) } - return em + return p.withAdditionalLabels(em, target) } func (p *Probe) labels(ep endpoint.Endpoint) map[string]string { @@ -440,11 +443,11 @@ func (p *Probe) processProbeResult(ps *probeStatus, result *result) { if p.c.GetOutputMetricsOptions().GetAggregateInCloudprober() { result.payloadMetrics = p.payloadParser.AggregatedPayloadMetrics(result.payloadMetrics, ps.payload, ps.target) p.opts.LogMetrics(result.payloadMetrics) - p.dataChan <- result.payloadMetrics + p.dataChan <- p.withAdditionalLabels(result.payloadMetrics, ps.target) } else { for _, em := range p.payloadParser.PayloadMetrics(ps.payload, ps.target) { p.opts.LogMetrics(em) - p.dataChan <- em + p.dataChan <- p.withAdditionalLabels(em, ps.target) } } } diff --git a/probes/external/external_test.go b/probes/external/external_test.go index f7f27781..0b536930 100644 --- a/probes/external/external_test.go +++ b/probes/external/external_test.go @@ -36,6 +36,7 @@ import ( serverpb "github.com/google/cloudprober/probes/external/proto" "github.com/google/cloudprober/probes/external/serverutils" "github.com/google/cloudprober/probes/options" + probeconfigpb "github.com/google/cloudprober/probes/proto" "github.com/google/cloudprober/targets" "github.com/google/cloudprober/targets/endpoint" ) @@ -650,9 +651,11 @@ func TestUpdateTargets(t *testing.T) { } } -func verifyProcessedResult(t *testing.T, p *Probe, r *result, success int64, name string, val int64, payloadLabels [][2]string) { +func verifyProcessedResult(t *testing.T, p *Probe, r *result, success int64, name string, val int64, extraLabels map[string]string) { t.Helper() + t.Log(val) + testTarget := "test-target" if r.success != success { t.Errorf("r.success=%d, expected=%d", r.success, success) @@ -676,9 +679,14 @@ func verifyProcessedResult(t *testing.T, p *Probe, r *result, success int64, nam } expectedLabels := map[string]string{"ptype": "external", "probe": "testprobe", "dst": "test-target"} - for _, kv := range payloadLabels { - expectedLabels[kv[0]] = kv[1] + for k, v := range extraLabels { + expectedLabels[k] = v + } + + if len(em.LabelsKeys()) != len(expectedLabels) { + t.Errorf("Labels mismatch: got=%v, expected=%v", em.LabelsKeys(), expectedLabels) } + for key, val := range expectedLabels { if em.Label(key) != val { t.Errorf("r.payloadMetrics.Label(%s)=%s, expected=%s", key, r.payloadMetrics.Label(key), val) @@ -687,18 +695,66 @@ func verifyProcessedResult(t *testing.T, p *Probe, r *result, success int64, nam } func TestProcessProbeResult(t *testing.T) { - for _, agg := range []bool{true, false} { - - t.Run(fmt.Sprintf("With aggregation: %v", agg), func(t *testing.T) { + tests := []struct { + desc string + aggregate bool + payloads []string + additionalLabels map[string]string + wantValues []int64 + wantExtraLabels map[string]string + }{ + { + desc: "with-aggregation-enabled", + aggregate: true, + wantValues: []int64{14, 25}, + payloads: []string{"p-failures 14", "p-failures 11"}, + }, + { + desc: "with-aggregation-disabled", + aggregate: false, + payloads: []string{ + "p-failures{service=serviceA,db=dbA} 14", + "p-failures{service=serviceA,db=dbA} 11", + }, + wantValues: []int64{14, 11}, + wantExtraLabels: map[string]string{ + "service": "serviceA", + "db": "dbA", + }, + }, + { + desc: "with-additional-labels", + aggregate: false, + payloads: []string{ + "p-failures{service=serviceA,db=dbA} 14", + "p-failures{service=serviceA,db=dbA} 11", + }, + additionalLabels: map[string]string{"dc": "xx"}, + wantValues: []int64{14, 11}, + wantExtraLabels: map[string]string{ + "service": "serviceA", + "db": "dbA", + "dc": "xx", + }, + }, + } + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { p := &Probe{} opts := options.DefaultOptions() opts.ProbeConf = &configpb.ProbeConf{ OutputMetricsOptions: &payloadconfigpb.OutputMetricsOptions{ - AggregateInCloudprober: proto.Bool(agg), + AggregateInCloudprober: proto.Bool(test.aggregate), }, Command: proto.String("./testCommand"), } + for k, v := range test.additionalLabels { + opts.AdditionalLabels = append(opts.AdditionalLabels, options.ParseAdditionalLabel(&probeconfigpb.AdditionalLabel{ + Key: proto.String(k), + Value: proto.String(v), + })) + } err := p.Init("testprobe", opts) if err != nil { t.Fatal(err) @@ -710,39 +766,30 @@ func TestProcessProbeResult(t *testing.T) { latency: metrics.NewFloat(0), } - payloadMetricName := map[bool]string{ - false: "p-failures{service=serviceA,db=dbA}", - true: "p-failures", - } - payloadLabels := map[bool][][2]string{ - false: [][2]string{ - [2]string{"service", "serviceA"}, - [2]string{"db", "dbA"}, - }, - } - // First run p.processProbeResult(&probeStatus{ target: "test-target", success: true, latency: time.Millisecond, - payload: fmt.Sprintf("%s 14", payloadMetricName[agg]), + payload: test.payloads[0], }, r) - verifyProcessedResult(t, p, r, 1, "p-failures", 14, payloadLabels[agg]) + wantSuccess := int64(1) + verifyProcessedResult(t, p, r, wantSuccess, "p-failures", test.wantValues[0], test.wantExtraLabels) // Second run p.processProbeResult(&probeStatus{ target: "test-target", success: true, latency: time.Millisecond, - payload: fmt.Sprintf("%s 11", payloadMetricName[agg]), + payload: test.payloads[1], }, r) + wantSuccess++ - if agg { - verifyProcessedResult(t, p, r, 2, "p-failures", 25, payloadLabels[agg]) + if test.aggregate { + verifyProcessedResult(t, p, r, wantSuccess, "p-failures", test.wantValues[1], test.wantExtraLabels) } else { - verifyProcessedResult(t, p, r, 2, "p-failures", 11, payloadLabels[agg]) + verifyProcessedResult(t, p, r, wantSuccess, "p-failures", test.wantValues[1], test.wantExtraLabels) } }) } diff --git a/probes/options/labels.go b/probes/options/labels.go index 2e378dfd..ee6efa78 100644 --- a/probes/options/labels.go +++ b/probes/options/labels.go @@ -103,7 +103,9 @@ func (al *AdditionalLabel) KeyValueForTarget(targetName string) (key, val string return al.Key, al.valueForTarget[targetName] } -func parseAdditionalLabel(alpb *configpb.AdditionalLabel) *AdditionalLabel { +// ParseAdditionalLabel parses an additional label proto message into an +// AdditionalLabel struct. +func ParseAdditionalLabel(alpb *configpb.AdditionalLabel) *AdditionalLabel { al := &AdditionalLabel{ Key: alpb.GetKey(), } @@ -158,7 +160,7 @@ func parseAdditionalLabels(p *configpb.ProbeDef) []*AdditionalLabel { var aLabels []*AdditionalLabel for _, pb := range p.GetAdditionalLabel() { - aLabels = append(aLabels, parseAdditionalLabel(pb)) + aLabels = append(aLabels, ParseAdditionalLabel(pb)) } return aLabels diff --git a/probes/options/labels_test.go b/probes/options/labels_test.go index 1e5c8437..d3e89d60 100644 --- a/probes/options/labels_test.go +++ b/probes/options/labels_test.go @@ -97,7 +97,7 @@ func TestParseAdditionalLabel(t *testing.T) { for i, alpb := range configWithAdditionalLabels.GetAdditionalLabel() { t.Run(alpb.GetKey(), func(t *testing.T) { - al := parseAdditionalLabel(alpb) + al := ParseAdditionalLabel(alpb) if !reflect.DeepEqual(al, expected[i]) { t.Errorf("Additional labels not parsed correctly. Got=\n%#v\nWanted=\n%#v", al, expected[i]) }