Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SWATCH-3295: Normalize the payg metrics to use the same tags and format #4159

Merged
merged 7 commits into from
Feb 11, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package org.candlepin.subscriptions.event;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.redhat.swatch.configuration.registry.MetricId;
import com.redhat.swatch.configuration.registry.SubscriptionDefinition;
import com.redhat.swatch.configuration.registry.Variant;
import com.redhat.swatch.configuration.util.ProductTagLookupParams;
Expand Down Expand Up @@ -397,8 +398,9 @@ private void updateIngestedUsageCounterFor(Event event, String tag, Measurement
counter.tag("billing_provider", event.getBillingProvider().value());
}
counter
.tag("metric_id", MetricId.tryGetValueFromString(measurement.getMetricId()))
.withRegistry(meterRegistry)
.withTags("product", tag, "metric_id", measurement.getMetricId())
.withTags("product", tag)
.increment(measurement.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.candlepin.subscriptions.db.model.BillingProvider;
import org.candlepin.subscriptions.db.model.Granularity;
import org.candlepin.subscriptions.db.model.HardwareMeasurementType;
Expand All @@ -33,18 +34,16 @@
import org.candlepin.subscriptions.db.model.Usage;
import org.candlepin.subscriptions.json.TallySummary;
import org.candlepin.subscriptions.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Service;

/** Component that produces tally snapshot summary messages given a list of tally snapshots. */
@Slf4j
@Service
public class SnapshotSummaryProducer {
private static final Logger log = LoggerFactory.getLogger(SnapshotSummaryProducer.class);

private final String tallySummaryTopic;
private final KafkaTemplate<String, TallySummary> tallySummaryKafkaTemplate;
Expand Down Expand Up @@ -72,7 +71,7 @@ public void produceTallySummaryMessages(Map<String, List<TallySnapshot>> newAndU
and measurement types other than Total
when we transmit the tally summary message to the BillableUsage component. */
snapshots.stream()
.filter(this::filterByHourlyAndNotAnySnapshots)
.filter(SnapshotSummaryProducer::filterByHourlyAndNotAnySnapshots)
.map(
snapshot -> {
removeTotalMeasurements(snapshot);
Expand All @@ -90,16 +89,7 @@ public void produceTallySummaryMessages(Map<String, List<TallySnapshot>> newAndU
log.info("Produced {} TallySummary messages", totalTallies);
}

private boolean filterByHourlyAndNotAnySnapshots(TallySnapshot snapshot) {
return snapshot.getGranularity().equals(Granularity.HOURLY)
&& !snapshot.getServiceLevel().equals(ServiceLevel._ANY)
&& !snapshot.getUsage().equals(Usage._ANY)
&& !snapshot.getBillingProvider().equals(BillingProvider._ANY)
&& !snapshot.getBillingAccountId().equals(ResourceUtils.ANY)
&& hasMeasurements(snapshot);
}

private void removeTotalMeasurements(TallySnapshot snapshot) {
public static void removeTotalMeasurements(TallySnapshot snapshot) {
if (Objects.nonNull(snapshot.getTallyMeasurements())) {
snapshot
.getTallyMeasurements()
Expand All @@ -109,14 +99,29 @@ private void removeTotalMeasurements(TallySnapshot snapshot) {
}
}

public static boolean filterByHourlyAndNotAnySnapshots(TallySnapshot snapshot) {
return filterByGranularityAndNotAnySnapshots(snapshot, Granularity.HOURLY.getValue());
}

public static boolean filterByGranularityAndNotAnySnapshots(
TallySnapshot snapshot, String granularity) {
return snapshot.getGranularity() != null
&& snapshot.getGranularity().toString().equalsIgnoreCase(granularity)
&& !ServiceLevel._ANY.equals(snapshot.getServiceLevel())
&& !Usage._ANY.equals(snapshot.getUsage())
&& !BillingProvider._ANY.equals(snapshot.getBillingProvider())
&& !ResourceUtils.ANY.equals(snapshot.getBillingAccountId())
&& hasMeasurements(snapshot);
}

/**
* Validates TallySnapshot measurements to make sure that it has all the information required by
* the RH marketplace API. Any issues will be logged.
*
* @param snapshot the TallySnapshot to validate.
* @return true if the TallySnapshot is valid, false otherwise.
*/
private boolean hasMeasurements(TallySnapshot snapshot) {
public static boolean hasMeasurements(TallySnapshot snapshot) {
if (!Objects.nonNull(snapshot.getTallyMeasurements())
|| snapshot.getTallyMeasurements().isEmpty()) {
log.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
*/
package org.candlepin.subscriptions.tally;

import static org.candlepin.subscriptions.tally.SnapshotSummaryProducer.removeTotalMeasurements;

import com.redhat.swatch.configuration.registry.MetricId;
import com.redhat.swatch.configuration.registry.SubscriptionDefinition;
import com.redhat.swatch.configuration.registry.Variant;
import io.micrometer.core.annotation.Timed;
Expand All @@ -32,19 +35,15 @@
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.candlepin.clock.ApplicationClock;
import org.candlepin.subscriptions.ApplicationProperties;
import org.candlepin.subscriptions.db.TallyStateRepository;
import org.candlepin.subscriptions.db.model.Granularity;
import org.candlepin.subscriptions.db.model.HardwareMeasurementType;
import org.candlepin.subscriptions.db.model.ServiceLevel;
import org.candlepin.subscriptions.db.model.TallySnapshot;
import org.candlepin.subscriptions.db.model.TallyState;
import org.candlepin.subscriptions.db.model.TallyStateKey;
import org.candlepin.subscriptions.db.model.Usage;
import org.candlepin.subscriptions.event.EventController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.retry.support.RetryTemplate;
Expand All @@ -53,10 +52,11 @@
import org.springframework.transaction.annotation.Transactional;

/** Provides the logic for updating Tally snapshots. */
@Slf4j
@Component
public class TallySnapshotController {

private static final Logger log = LoggerFactory.getLogger(TallySnapshotController.class);
protected static final String TALLIED_USAGE_TOTAL_METRIC = "swatch_tally_tallied_usage_total";

private final ApplicationProperties appProps;
private final InventoryAccountUsageCollector usageCollector;
Expand Down Expand Up @@ -118,43 +118,6 @@ public void produceSnapshotsForOrg(String orgId) {
recordTallyCount(snapshots);
}

protected void recordTallyCount(List<TallySnapshot> savedSnapshots) {
var count = Counter.builder("swatch_tally_tallied_usage_total").withRegistry(meterRegistry);
// Only increment the counter at the finest granularity level to prevent over-counting
savedSnapshots.parallelStream()
.filter(this::isDuplicateSnap)
.forEach(
snap ->
snap.getTallyMeasurements().entrySet().stream()
// Filter out TOTAL measurement types since those are aggregates and we don't
// want to double count
.filter(
entry ->
!entry
.getKey()
.getMeasurementType()
.equals(HardwareMeasurementType.TOTAL))
.forEach(
entry -> {
var c =
count.withTags(
"product",
snap.getProductId(),
"metric_id",
entry.getKey().getMetricId(),
"billing_provider_id",
snap.getBillingProvider().getValue());
c.increment(entry.getValue());
}));
}

protected boolean isDuplicateSnap(TallySnapshot snap) {
return SubscriptionDefinition.isFinestGranularity(
snap.getProductId(), snap.getGranularity().toString())
&& snap.getServiceLevel() != ServiceLevel._ANY
&& snap.getUsage() != Usage._ANY;
}

// Because we want to ensure that our DB operations have been completed before
// any messages are sent, message sending must be done outside a DB transaction
// boundary. We use Propagation.NEVER here so that if this method should ever be called
Expand Down Expand Up @@ -239,6 +202,39 @@ public void produceHourlySnapshotsForOrg(String orgId) {
}
}

private void recordTallyCount(List<TallySnapshot> snapshots) {
var count = Counter.builder(TALLIED_USAGE_TOTAL_METRIC).withRegistry(meterRegistry);
// Only increment the counter at the finest granularity level to prevent over-counting
snapshots.stream()
.filter(this::filterByFinestGranularityAndNotAnySnapshots)
.map(
snapshot -> {
removeTotalMeasurements(snapshot);
return snapshot;
})
.forEach(
snap ->
snap.getTallyMeasurements()
.entrySet()
.forEach(
entry -> {
var c =
count.withTags(
"product",
snap.getProductId(),
"metric_id",
MetricId.tryGetValueFromString(entry.getKey().getMetricId()),
"billing_provider",
snap.getBillingProvider().getValue());
c.increment(entry.getValue());
}));
}

private boolean filterByFinestGranularityAndNotAnySnapshots(TallySnapshot snapshot) {
return SnapshotSummaryProducer.filterByGranularityAndNotAnySnapshots(
snapshot, SubscriptionDefinition.getFinestGranularity(snapshot.getProductId()));
}

private boolean isCombiningRollupStrategySupported(
Map.Entry<OffsetDateTime, AccountUsageCalculation> usageCalculations) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.redhat.swatch.configuration.registry.MetricId;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import jakarta.persistence.EntityManager;
Expand Down Expand Up @@ -622,7 +623,9 @@ private Optional<Meter> getIngestedUsageMetric(
m ->
INGESTED_USAGE_METRIC.equals(m.getId().getName())
&& productTag.equals(m.getId().getTag("product"))
&& metricId.equals(m.getId().getTag("metric_id"))
&& MetricId.fromString(metricId)
.getValue()
.equals(m.getId().getTag("metric_id"))
&& billingProvider.equals(m.getId().getTag("billing_provider")))
.findFirst();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
package org.candlepin.subscriptions.tally;

import static org.candlepin.subscriptions.tally.TallySnapshotController.TALLIED_USAGE_TOTAL_METRIC;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
Expand Down Expand Up @@ -77,15 +78,11 @@ void produceSnapshotsForOrg() {
controller.produceSnapshotsForOrg("123");

var counter =
Counter.builder("swatch_tally_tallied_usage_total")
.tags(
"product",
"RHEL for x86",
"billing_provider_id",
BillingProvider.RED_HAT.getValue())
Counter.builder(TALLIED_USAGE_TOTAL_METRIC)
.tags("product", "RHEL for x86", "billing_provider", BillingProvider.RED_HAT.getValue())
.withRegistry(registry);

for (var s : Set.of("CORES", "SOCKETS")) {
for (var s : Set.of("Cores", "Sockets")) {
var c = counter.withTag("metric_id", s);
assertEquals(10.0, c.count());
}
Expand Down Expand Up @@ -128,11 +125,11 @@ void produceHourlySnapshotsForOrg() {
controller.produceHourlySnapshotsForOrg("123");

var counter =
Counter.builder("swatch_tally_tallied_usage_total")
.tags("product", "rosa", "billing_provider_id", BillingProvider.RED_HAT.getValue())
Counter.builder(TALLIED_USAGE_TOTAL_METRIC)
.tags("product", "rosa", "billing_provider", BillingProvider.RED_HAT.getValue())
.withRegistry(registry);

for (var s : Set.of("CORES", "SOCKETS")) {
for (var s : Set.of("Cores", "Sockets")) {
var c = counter.withTag("metric_id", s);
assertEquals(10.0, c.count());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package com.redhat.swatch.billable.usage.kafka.streams;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.redhat.swatch.configuration.registry.MetricId;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.quarkus.arc.profile.UnlessBuildProfile;
Expand Down Expand Up @@ -116,7 +117,11 @@ private void traceAggregate(

counter
.withRegistry(meterRegistry)
.withTags("product", key.key().getProductId(), "metric_id", key.key().getMetricId())
.withTags(
"product",
key.key().getProductId(),
"metric_id",
MetricId.tryGetValueFromString(key.key().getMetricId()))
.increment(aggregate.getTotalValue().doubleValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ public BillableUsage produceMonthlyBillable(BillableUsage usage)

if (usageCalc.getRemittedValue() > 0) {
createRemittance(usage, usageCalc, contractCoverage);
updateUsageMeter(usage, usageCalc, contractAmount);
} else {
log.debug("Nothing to remit. Remittance record will not be created.");
}
updateUsageMeter(usage, contractCoverage.getTotal(), usageCalc.getBillableValue());

// There were issues with transmitting usage to AWS since the cost event timestamps were in the
// past. This modification allows us to send usage to AWS if we get it during the current hour
Expand Down Expand Up @@ -250,7 +250,10 @@ private void createRemittance(
usage.setUuid(newRemittance.getUuid());
}

private void updateUsageMeter(BillableUsage usage, double contractCoverage, double billable) {
private void updateUsageMeter(
BillableUsage usage,
BillableUsageCalculation usageCalc,
Quantity<BillingUnit> contractAmount) {
if (usage.getProductId() == null
|| usage.getMetricId() == null
|| usage.getBillingProvider() == null
Expand All @@ -261,24 +264,17 @@ private void updateUsageMeter(BillableUsage usage, double contractCoverage, doub
new ArrayList<>(
List.of(
"product", usage.getProductId(),
"metric_id", usage.getMetricId(),
"metric_id", MetricId.tryGetValueFromString(usage.getMetricId()),
"billing_provider", usage.getBillingProvider().value(),
"status", usage.getStatus().value()));
double coverage =
usage.getCurrentTotal() * usage.getBillingFactor() > contractCoverage
? contractCoverage
: usage.getCurrentTotal() * usage.getBillingFactor();
if (coverage > 0) {
Counter.builder(COVERED_USAGE_METRIC)
.withRegistry(meterRegistry)
.withTags(tags.toArray(new String[0]))
.increment(coverage);
}
if (billable > 0) {
Counter.builder(BILLABLE_USAGE_METRIC)
.withRegistry(meterRegistry)
.withTags(tags.toArray(new String[0]))
.increment(billable);
}
Counter.builder(COVERED_USAGE_METRIC)
.withRegistry(meterRegistry)
.withTags(tags.toArray(new String[0]))
.increment(contractAmount.toMetricUnits());

Counter.builder(BILLABLE_USAGE_METRIC)
.withRegistry(meterRegistry)
.withTags(tags.toArray(new String[0]))
.increment(usageCalc.getRemittedValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@ public Quantity<U> positiveOrZero() {
return this;
}

public double toMetricUnits() {
return value / unit.getBillingFactor();
}

public <T extends Unit> Quantity<T> to(T targetUnit) {
var valueInMetricUnits = value / unit.getBillingFactor();
var valueInMetricUnits = toMetricUnits();
var valueInTargetUnits = valueInMetricUnits * targetUnit.getBillingFactor();
return new Quantity<>(valueInTargetUnits, targetUnit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ quarkus.http.access-log.pattern=combined
quarkus.management.enabled=true
quarkus.management.port=9000
quarkus.management.root-path=/
quarkus.micrometer.export.json.enabled=true

# database configuration
quarkus.datasource.db-kind=postgresql
Expand Down
Loading