diff --git a/internal/sink/namespaces.go b/internal/sink/namespaces.go index 420bf8474..b2f065a9e 100644 --- a/internal/sink/namespaces.go +++ b/internal/sink/namespaces.go @@ -67,11 +67,6 @@ func (a *NamespaceStore) validateEvent(ctx context.Context, event serializer.Clo // validateEventWithMeter validates a single event against a single meter func validateEventWithMeter(meter *models.Meter, ev serializer.CloudEventsKafkaPayload) *ProcessingError { - // We can skip count events with no group bys - if meter.Aggregation != models.MeterAggregationCount && len(meter.GroupBy) == 0 { - return nil - } - // Parse CloudEvents data as JSON, currently we only support JSON encoding var data interface{} err := json.Unmarshal([]byte(ev.Data), &data) @@ -79,37 +74,40 @@ func validateEventWithMeter(meter *models.Meter, ev serializer.CloudEventsKafkaP return NewProcessingError("cannot unmarshal event data as json", DEADLETTER) } - // Parse value - if meter.Aggregation != models.MeterAggregationCount { - valueRaw, err := jsonpath.JsonPathLookup(data, meter.ValueProperty) + // Parse and validate group bys + for _, groupByJsonPath := range meter.GroupBy { + groupByValue, err := jsonpath.JsonPathLookup(data, groupByJsonPath) if err != nil { - return NewProcessingError(fmt.Sprintf("event data is missing value property at %s", meter.ValueProperty), DEADLETTER) + return NewProcessingError(fmt.Sprintf("event data is missing the group by property at %s", groupByJsonPath), DEADLETTER) } - if valueRaw == nil { - return NewProcessingError("event data value cannot be null", DEADLETTER) + if groupByValue == nil { + return NewProcessingError(fmt.Sprintf("event data group by property is nil at %s", groupByJsonPath), DEADLETTER) } + } - if valueStr, ok := valueRaw.(string); ok { - _, err = strconv.ParseFloat(valueStr, 64) - if err != nil { - return NewProcessingError(fmt.Sprintf("event data value cannot be parsed as float64: %s", valueStr), DEADLETTER) - } - } else if _, ok := valueRaw.(float64); ok { + // We can skip count events as they don't have value property + if meter.Aggregation == models.MeterAggregationCount { + return nil + } - } else { - return NewProcessingError("event data value property cannot be parsed", DEADLETTER) - } + // Parse and validate value + valueRaw, err := jsonpath.JsonPathLookup(data, meter.ValueProperty) + if err != nil { + return NewProcessingError(fmt.Sprintf("event data is missing value property at %s", meter.ValueProperty), DEADLETTER) + } + if valueRaw == nil { + return NewProcessingError("event data value cannot be null", DEADLETTER) } - // Parse group bys - for _, groupByJsonPath := range meter.GroupBy { - groupByValue, err := jsonpath.JsonPathLookup(data, groupByJsonPath) + if valueStr, ok := valueRaw.(string); ok { + _, err = strconv.ParseFloat(valueStr, 64) if err != nil { - return NewProcessingError(fmt.Sprintf("event data is missing the group by property at %s", groupByJsonPath), DEADLETTER) - } - if groupByValue == nil { - return NewProcessingError(fmt.Sprintf("event data group by property is nil at %s", groupByJsonPath), DEADLETTER) + return NewProcessingError(fmt.Sprintf("event data value cannot be parsed as float64: %s", valueStr), DEADLETTER) } + } else if _, ok := valueRaw.(float64); ok { + + } else { + return NewProcessingError("event data value property cannot be parsed", DEADLETTER) } return nil diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 9dd141dc0..21356ed1c 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -211,7 +211,6 @@ func (s *Sink) flush() error { err := s.config.Deduplicator.Set(ctx, dedupeItems...) if err != nil { - logger.Error("failed to sink to redis", "err", err) return fmt.Errorf("failed to sink to redis: %s", err) } logger.Debug("succeeded to sink to redis", "buffer size", len(messages))