Skip to content

Commit

Permalink
SWATCH-3295: Normalize the payg metrics to use the same tags and form…
Browse files Browse the repository at this point in the history
…at (#4159)

Jira issue: SWATCH-3295

Depends on
#4157

## Description
All the payg metrics should have the same tags, units and use the same
format to be used in the grafana dashboard.

This pull request addresses the following changes:
- Change format of metric ID to use the metric ID code instead of the
uppercase format
Some metrics were using one format or the another. All the metrics
should use the same one.

- Wrong billing_provider_id in "swatch_tally_tallied_usage_total", it
should be "billing_provider"
- The metric "swatch_tally_tallied_usage_total" was double counting.
Fixed by reusing the logic to exclude duplicate snaps.
- The metrics "swatch_contract_usage_total",
"swatch_billable_usage_total" and "swatch_producer_metered_total" were
expressed in billing units instead of metric units. We need to use the
metric units, so the numbers in the grafana dashboard are consistent.
- Reverts commit d831fdc. We're not
using the json format anylonger in tests.

## Testing

IQE Test MR:
https://gitlab.cee.redhat.com/insights-qe/iqe-rhsm-subscriptions-plugin/-/merge_requests/1040
  • Loading branch information
Sgitario authored Feb 11, 2025
2 parents 694734d + 2f400ac commit d300d49
Show file tree
Hide file tree
Showing 22 changed files with 249 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package org.candlepin.subscriptions.event;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.redhat.swatch.configuration.registry.MetricId;
import com.redhat.swatch.configuration.registry.SubscriptionDefinition;
import com.redhat.swatch.configuration.registry.Variant;
import com.redhat.swatch.configuration.util.ProductTagLookupParams;
Expand Down Expand Up @@ -397,8 +398,9 @@ private void updateIngestedUsageCounterFor(Event event, String tag, Measurement
counter.tag("billing_provider", event.getBillingProvider().value());
}
counter
.tag("metric_id", MetricId.tryGetValueFromString(measurement.getMetricId()))
.withRegistry(meterRegistry)
.withTags("product", tag, "metric_id", measurement.getMetricId())
.withTags("product", tag)
.increment(measurement.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.candlepin.subscriptions.db.model.BillingProvider;
import org.candlepin.subscriptions.db.model.Granularity;
import org.candlepin.subscriptions.db.model.HardwareMeasurementType;
Expand All @@ -33,18 +34,16 @@
import org.candlepin.subscriptions.db.model.Usage;
import org.candlepin.subscriptions.json.TallySummary;
import org.candlepin.subscriptions.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Service;

/** Component that produces tally snapshot summary messages given a list of tally snapshots. */
@Slf4j
@Service
public class SnapshotSummaryProducer {
private static final Logger log = LoggerFactory.getLogger(SnapshotSummaryProducer.class);

private final String tallySummaryTopic;
private final KafkaTemplate<String, TallySummary> tallySummaryKafkaTemplate;
Expand Down Expand Up @@ -72,7 +71,7 @@ public void produceTallySummaryMessages(Map<String, List<TallySnapshot>> newAndU
and measurement types other than Total
when we transmit the tally summary message to the BillableUsage component. */
snapshots.stream()
.filter(this::filterByHourlyAndNotAnySnapshots)
.filter(SnapshotSummaryProducer::filterByHourlyAndNotAnySnapshots)
.map(
snapshot -> {
removeTotalMeasurements(snapshot);
Expand All @@ -90,16 +89,7 @@ public void produceTallySummaryMessages(Map<String, List<TallySnapshot>> newAndU
log.info("Produced {} TallySummary messages", totalTallies);
}

private boolean filterByHourlyAndNotAnySnapshots(TallySnapshot snapshot) {
return snapshot.getGranularity().equals(Granularity.HOURLY)
&& !snapshot.getServiceLevel().equals(ServiceLevel._ANY)
&& !snapshot.getUsage().equals(Usage._ANY)
&& !snapshot.getBillingProvider().equals(BillingProvider._ANY)
&& !snapshot.getBillingAccountId().equals(ResourceUtils.ANY)
&& hasMeasurements(snapshot);
}

private void removeTotalMeasurements(TallySnapshot snapshot) {
public static void removeTotalMeasurements(TallySnapshot snapshot) {
if (Objects.nonNull(snapshot.getTallyMeasurements())) {
snapshot
.getTallyMeasurements()
Expand All @@ -109,14 +99,29 @@ private void removeTotalMeasurements(TallySnapshot snapshot) {
}
}

public static boolean filterByHourlyAndNotAnySnapshots(TallySnapshot snapshot) {
return filterByGranularityAndNotAnySnapshots(snapshot, Granularity.HOURLY.getValue());
}

public static boolean filterByGranularityAndNotAnySnapshots(
TallySnapshot snapshot, String granularity) {
return snapshot.getGranularity() != null
&& snapshot.getGranularity().toString().equalsIgnoreCase(granularity)
&& !ServiceLevel._ANY.equals(snapshot.getServiceLevel())
&& !Usage._ANY.equals(snapshot.getUsage())
&& !BillingProvider._ANY.equals(snapshot.getBillingProvider())
&& !ResourceUtils.ANY.equals(snapshot.getBillingAccountId())
&& hasMeasurements(snapshot);
}

/**
* Validates TallySnapshot measurements to make sure that it has all the information required by
* the RH marketplace API. Any issues will be logged.
*
* @param snapshot the TallySnapshot to validate.
* @return true if the TallySnapshot is valid, false otherwise.
*/
private boolean hasMeasurements(TallySnapshot snapshot) {
public static boolean hasMeasurements(TallySnapshot snapshot) {
if (!Objects.nonNull(snapshot.getTallyMeasurements())
|| snapshot.getTallyMeasurements().isEmpty()) {
log.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
*/
package org.candlepin.subscriptions.tally;

import static org.candlepin.subscriptions.tally.SnapshotSummaryProducer.removeTotalMeasurements;

import com.redhat.swatch.configuration.registry.MetricId;
import com.redhat.swatch.configuration.registry.SubscriptionDefinition;
import com.redhat.swatch.configuration.registry.Variant;
import io.micrometer.core.annotation.Timed;
Expand All @@ -32,19 +35,15 @@
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.candlepin.clock.ApplicationClock;
import org.candlepin.subscriptions.ApplicationProperties;
import org.candlepin.subscriptions.db.TallyStateRepository;
import org.candlepin.subscriptions.db.model.Granularity;
import org.candlepin.subscriptions.db.model.HardwareMeasurementType;
import org.candlepin.subscriptions.db.model.ServiceLevel;
import org.candlepin.subscriptions.db.model.TallySnapshot;
import org.candlepin.subscriptions.db.model.TallyState;
import org.candlepin.subscriptions.db.model.TallyStateKey;
import org.candlepin.subscriptions.db.model.Usage;
import org.candlepin.subscriptions.event.EventController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.retry.support.RetryTemplate;
Expand All @@ -53,10 +52,11 @@
import org.springframework.transaction.annotation.Transactional;

/** Provides the logic for updating Tally snapshots. */
@Slf4j
@Component
public class TallySnapshotController {

private static final Logger log = LoggerFactory.getLogger(TallySnapshotController.class);
protected static final String TALLIED_USAGE_TOTAL_METRIC = "swatch_tally_tallied_usage_total";

private final ApplicationProperties appProps;
private final InventoryAccountUsageCollector usageCollector;
Expand Down Expand Up @@ -118,43 +118,6 @@ public void produceSnapshotsForOrg(String orgId) {
recordTallyCount(snapshots);
}

protected void recordTallyCount(List<TallySnapshot> savedSnapshots) {
var count = Counter.builder("swatch_tally_tallied_usage_total").withRegistry(meterRegistry);
// Only increment the counter at the finest granularity level to prevent over-counting
savedSnapshots.parallelStream()
.filter(this::isDuplicateSnap)
.forEach(
snap ->
snap.getTallyMeasurements().entrySet().stream()
// Filter out TOTAL measurement types since those are aggregates and we don't
// want to double count
.filter(
entry ->
!entry
.getKey()
.getMeasurementType()
.equals(HardwareMeasurementType.TOTAL))
.forEach(
entry -> {
var c =
count.withTags(
"product",
snap.getProductId(),
"metric_id",
entry.getKey().getMetricId(),
"billing_provider_id",
snap.getBillingProvider().getValue());
c.increment(entry.getValue());
}));
}

protected boolean isDuplicateSnap(TallySnapshot snap) {
return SubscriptionDefinition.isFinestGranularity(
snap.getProductId(), snap.getGranularity().toString())
&& snap.getServiceLevel() != ServiceLevel._ANY
&& snap.getUsage() != Usage._ANY;
}

// Because we want to ensure that our DB operations have been completed before
// any messages are sent, message sending must be done outside a DB transaction
// boundary. We use Propagation.NEVER here so that if this method should ever be called
Expand Down Expand Up @@ -239,6 +202,39 @@ public void produceHourlySnapshotsForOrg(String orgId) {
}
}

private void recordTallyCount(List<TallySnapshot> snapshots) {
var count = Counter.builder(TALLIED_USAGE_TOTAL_METRIC).withRegistry(meterRegistry);
// Only increment the counter at the finest granularity level to prevent over-counting
snapshots.stream()
.filter(this::filterByFinestGranularityAndNotAnySnapshots)
.map(
snapshot -> {
removeTotalMeasurements(snapshot);
return snapshot;
})
.forEach(
snap ->
snap.getTallyMeasurements()
.entrySet()
.forEach(
entry -> {
var c =
count.withTags(
"product",
snap.getProductId(),
"metric_id",
MetricId.tryGetValueFromString(entry.getKey().getMetricId()),
"billing_provider",
snap.getBillingProvider().getValue());
c.increment(entry.getValue());
}));
}

private boolean filterByFinestGranularityAndNotAnySnapshots(TallySnapshot snapshot) {
return SnapshotSummaryProducer.filterByGranularityAndNotAnySnapshots(
snapshot, SubscriptionDefinition.getFinestGranularity(snapshot.getProductId()));
}

private boolean isCombiningRollupStrategySupported(
Map.Entry<OffsetDateTime, AccountUsageCalculation> usageCalculations) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.redhat.swatch.configuration.registry.MetricId;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import jakarta.persistence.EntityManager;
Expand Down Expand Up @@ -622,7 +623,9 @@ private Optional<Meter> getIngestedUsageMetric(
m ->
INGESTED_USAGE_METRIC.equals(m.getId().getName())
&& productTag.equals(m.getId().getTag("product"))
&& metricId.equals(m.getId().getTag("metric_id"))
&& MetricId.fromString(metricId)
.getValue()
.equals(m.getId().getTag("metric_id"))
&& billingProvider.equals(m.getId().getTag("billing_provider")))
.findFirst();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
package org.candlepin.subscriptions.tally;

import static org.candlepin.subscriptions.tally.TallySnapshotController.TALLIED_USAGE_TOTAL_METRIC;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
Expand Down Expand Up @@ -77,15 +78,11 @@ void produceSnapshotsForOrg() {
controller.produceSnapshotsForOrg("123");

var counter =
Counter.builder("swatch_tally_tallied_usage_total")
.tags(
"product",
"RHEL for x86",
"billing_provider_id",
BillingProvider.RED_HAT.getValue())
Counter.builder(TALLIED_USAGE_TOTAL_METRIC)
.tags("product", "RHEL for x86", "billing_provider", BillingProvider.RED_HAT.getValue())
.withRegistry(registry);

for (var s : Set.of("CORES", "SOCKETS")) {
for (var s : Set.of("Cores", "Sockets")) {
var c = counter.withTag("metric_id", s);
assertEquals(10.0, c.count());
}
Expand Down Expand Up @@ -128,11 +125,11 @@ void produceHourlySnapshotsForOrg() {
controller.produceHourlySnapshotsForOrg("123");

var counter =
Counter.builder("swatch_tally_tallied_usage_total")
.tags("product", "rosa", "billing_provider_id", BillingProvider.RED_HAT.getValue())
Counter.builder(TALLIED_USAGE_TOTAL_METRIC)
.tags("product", "rosa", "billing_provider", BillingProvider.RED_HAT.getValue())
.withRegistry(registry);

for (var s : Set.of("CORES", "SOCKETS")) {
for (var s : Set.of("Cores", "Sockets")) {
var c = counter.withTag("metric_id", s);
assertEquals(10.0, c.count());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package com.redhat.swatch.billable.usage.kafka.streams;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.redhat.swatch.configuration.registry.MetricId;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.quarkus.arc.profile.UnlessBuildProfile;
Expand Down Expand Up @@ -116,7 +117,11 @@ private void traceAggregate(

counter
.withRegistry(meterRegistry)
.withTags("product", key.key().getProductId(), "metric_id", key.key().getMetricId())
.withTags(
"product",
key.key().getProductId(),
"metric_id",
MetricId.tryGetValueFromString(key.key().getMetricId()))
.increment(aggregate.getTotalValue().doubleValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ public BillableUsage produceMonthlyBillable(BillableUsage usage)

if (usageCalc.getRemittedValue() > 0) {
createRemittance(usage, usageCalc, contractCoverage);
updateUsageMeter(usage, usageCalc, contractAmount);
} else {
log.debug("Nothing to remit. Remittance record will not be created.");
}
updateUsageMeter(usage, contractCoverage.getTotal(), usageCalc.getBillableValue());

// There were issues with transmitting usage to AWS since the cost event timestamps were in the
// past. This modification allows us to send usage to AWS if we get it during the current hour
Expand Down Expand Up @@ -250,7 +250,10 @@ private void createRemittance(
usage.setUuid(newRemittance.getUuid());
}

private void updateUsageMeter(BillableUsage usage, double contractCoverage, double billable) {
private void updateUsageMeter(
BillableUsage usage,
BillableUsageCalculation usageCalc,
Quantity<BillingUnit> contractAmount) {
if (usage.getProductId() == null
|| usage.getMetricId() == null
|| usage.getBillingProvider() == null
Expand All @@ -261,24 +264,17 @@ private void updateUsageMeter(BillableUsage usage, double contractCoverage, doub
new ArrayList<>(
List.of(
"product", usage.getProductId(),
"metric_id", usage.getMetricId(),
"metric_id", MetricId.tryGetValueFromString(usage.getMetricId()),
"billing_provider", usage.getBillingProvider().value(),
"status", usage.getStatus().value()));
double coverage =
usage.getCurrentTotal() * usage.getBillingFactor() > contractCoverage
? contractCoverage
: usage.getCurrentTotal() * usage.getBillingFactor();
if (coverage > 0) {
Counter.builder(COVERED_USAGE_METRIC)
.withRegistry(meterRegistry)
.withTags(tags.toArray(new String[0]))
.increment(coverage);
}
if (billable > 0) {
Counter.builder(BILLABLE_USAGE_METRIC)
.withRegistry(meterRegistry)
.withTags(tags.toArray(new String[0]))
.increment(billable);
}
Counter.builder(COVERED_USAGE_METRIC)
.withRegistry(meterRegistry)
.withTags(tags.toArray(new String[0]))
.increment(contractAmount.toMetricUnits());

Counter.builder(BILLABLE_USAGE_METRIC)
.withRegistry(meterRegistry)
.withTags(tags.toArray(new String[0]))
.increment(usageCalc.getRemittedValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@ public Quantity<U> positiveOrZero() {
return this;
}

public double toMetricUnits() {
return value / unit.getBillingFactor();
}

public <T extends Unit> Quantity<T> to(T targetUnit) {
var valueInMetricUnits = value / unit.getBillingFactor();
var valueInMetricUnits = toMetricUnits();
var valueInTargetUnits = valueInMetricUnits * targetUnit.getBillingFactor();
return new Quantity<>(valueInTargetUnits, targetUnit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ quarkus.http.access-log.pattern=combined
quarkus.management.enabled=true
quarkus.management.port=9000
quarkus.management.root-path=/
quarkus.micrometer.export.json.enabled=true

# database configuration
quarkus.datasource.db-kind=postgresql
Expand Down
Loading

0 comments on commit d300d49

Please sign in to comment.