Skip to content

Commit

Permalink
refactor(sink): organize code
Browse files Browse the repository at this point in the history
  • Loading branch information
hekike committed Oct 17, 2023
1 parent 9f4ca15 commit 18512b2
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 28 deletions.
52 changes: 25 additions & 27 deletions internal/sink/namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,49 +67,47 @@ func (a *NamespaceStore) validateEvent(ctx context.Context, event serializer.Clo

// validateEventWithMeter validates a single event against a single meter
func validateEventWithMeter(meter *models.Meter, ev serializer.CloudEventsKafkaPayload) *ProcessingError {
// We can skip count events with no group bys
if meter.Aggregation != models.MeterAggregationCount && len(meter.GroupBy) == 0 {
return nil
}

// Parse CloudEvents data as JSON, currently we only support JSON encoding
var data interface{}
err := json.Unmarshal([]byte(ev.Data), &data)
if err != nil {
return NewProcessingError("cannot unmarshal event data as json", DEADLETTER)
}

// Parse value
if meter.Aggregation != models.MeterAggregationCount {
valueRaw, err := jsonpath.JsonPathLookup(data, meter.ValueProperty)
// Parse and validate group bys
for _, groupByJsonPath := range meter.GroupBy {
groupByValue, err := jsonpath.JsonPathLookup(data, groupByJsonPath)
if err != nil {
return NewProcessingError(fmt.Sprintf("event data is missing value property at %s", meter.ValueProperty), DEADLETTER)
return NewProcessingError(fmt.Sprintf("event data is missing the group by property at %s", groupByJsonPath), DEADLETTER)
}
if valueRaw == nil {
return NewProcessingError("event data value cannot be null", DEADLETTER)
if groupByValue == nil {
return NewProcessingError(fmt.Sprintf("event data group by property is nil at %s", groupByJsonPath), DEADLETTER)
}
}

if valueStr, ok := valueRaw.(string); ok {
_, err = strconv.ParseFloat(valueStr, 64)
if err != nil {
return NewProcessingError(fmt.Sprintf("event data value cannot be parsed as float64: %s", valueStr), DEADLETTER)
}
} else if _, ok := valueRaw.(float64); ok {
// We can skip count events as they don't have value property
if meter.Aggregation == models.MeterAggregationCount {
return nil
}

} else {
return NewProcessingError("event data value property cannot be parsed", DEADLETTER)
}
// Parse and validate value
valueRaw, err := jsonpath.JsonPathLookup(data, meter.ValueProperty)
if err != nil {
return NewProcessingError(fmt.Sprintf("event data is missing value property at %s", meter.ValueProperty), DEADLETTER)
}
if valueRaw == nil {
return NewProcessingError("event data value cannot be null", DEADLETTER)
}

// Parse group bys
for _, groupByJsonPath := range meter.GroupBy {
groupByValue, err := jsonpath.JsonPathLookup(data, groupByJsonPath)
if valueStr, ok := valueRaw.(string); ok {
_, err = strconv.ParseFloat(valueStr, 64)
if err != nil {
return NewProcessingError(fmt.Sprintf("event data is missing the group by property at %s", groupByJsonPath), DEADLETTER)
}
if groupByValue == nil {
return NewProcessingError(fmt.Sprintf("event data group by property is nil at %s", groupByJsonPath), DEADLETTER)
return NewProcessingError(fmt.Sprintf("event data value cannot be parsed as float64: %s", valueStr), DEADLETTER)
}
} else if _, ok := valueRaw.(float64); ok {

} else {
return NewProcessingError("event data value property cannot be parsed", DEADLETTER)
}

return nil
Expand Down
1 change: 0 additions & 1 deletion internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ func (s *Sink) flush() error {

err := s.config.Deduplicator.Set(ctx, dedupeItems...)
if err != nil {
logger.Error("failed to sink to redis", "err", err)
return fmt.Errorf("failed to sink to redis: %s", err)
}
logger.Debug("succeeded to sink to redis", "buffer size", len(messages))
Expand Down

0 comments on commit 18512b2

Please sign in to comment.