Skip to content

Commit

Permalink
Share logic to exclude snapshots that will increment the tallied counter
Browse files Browse the repository at this point in the history
  • Loading branch information
Sgitario committed Feb 6, 2025
1 parent cf4b340 commit 6c44867
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.candlepin.subscriptions.db.model.BillingProvider;
import org.candlepin.subscriptions.db.model.Granularity;
import org.candlepin.subscriptions.db.model.HardwareMeasurementType;
Expand All @@ -33,18 +34,16 @@
import org.candlepin.subscriptions.db.model.Usage;
import org.candlepin.subscriptions.json.TallySummary;
import org.candlepin.subscriptions.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Service;

/** Component that produces tally snapshot summary messages given a list of tally snapshots. */
@Slf4j
@Service
public class SnapshotSummaryProducer {
private static final Logger log = LoggerFactory.getLogger(SnapshotSummaryProducer.class);

private final String tallySummaryTopic;
private final KafkaTemplate<String, TallySummary> tallySummaryKafkaTemplate;
Expand Down Expand Up @@ -72,12 +71,9 @@ public void produceTallySummaryMessages(Map<String, List<TallySnapshot>> newAndU
and measurement types other than Total
when we transmit the tally summary message to the BillableUsage component. */
snapshots.stream()
.filter(this::filterByHourlyAndNotAnySnapshots)
.map(
snapshot -> {
removeTotalMeasurements(snapshot);
return snapshot;
})
.filter(
s -> filterByGranularityAndNotAnySnapshots(s, Granularity.HOURLY.getValue()))
.peek(SnapshotSummaryProducer::removeTotalMeasurements)
.sorted(Comparator.comparing(TallySnapshot::getSnapshotDate))
.map(snapshot -> summaryMapper.mapSnapshots(orgId, List.of(snapshot)))
.forEach(
Expand All @@ -90,16 +86,7 @@ public void produceTallySummaryMessages(Map<String, List<TallySnapshot>> newAndU
log.info("Produced {} TallySummary messages", totalTallies);
}

private boolean filterByHourlyAndNotAnySnapshots(TallySnapshot snapshot) {
return snapshot.getGranularity().equals(Granularity.HOURLY)
&& !snapshot.getServiceLevel().equals(ServiceLevel._ANY)
&& !snapshot.getUsage().equals(Usage._ANY)
&& !snapshot.getBillingProvider().equals(BillingProvider._ANY)
&& !snapshot.getBillingAccountId().equals(ResourceUtils.ANY)
&& hasMeasurements(snapshot);
}

private void removeTotalMeasurements(TallySnapshot snapshot) {
public static void removeTotalMeasurements(TallySnapshot snapshot) {
if (Objects.nonNull(snapshot.getTallyMeasurements())) {
snapshot
.getTallyMeasurements()
Expand All @@ -109,14 +96,25 @@ private void removeTotalMeasurements(TallySnapshot snapshot) {
}
}

public static boolean filterByGranularityAndNotAnySnapshots(
TallySnapshot snapshot, String granularity) {
return snapshot.getGranularity() != null
&& snapshot.getGranularity().toString().equalsIgnoreCase(granularity)
&& !ServiceLevel._ANY.equals(snapshot.getServiceLevel())
&& !Usage._ANY.equals(snapshot.getUsage())
&& !BillingProvider._ANY.equals(snapshot.getBillingProvider())
&& !ResourceUtils.ANY.equals(snapshot.getBillingAccountId())
&& hasMeasurements(snapshot);
}

/**
* Validates TallySnapshot measurements to make sure that it has all the information required by
* the RH marketplace API. Any issues will be logged.
*
* @param snapshot the TallySnapshot to validate.
* @return true if the TallySnapshot is valid, false otherwise.
*/
private boolean hasMeasurements(TallySnapshot snapshot) {
public static boolean hasMeasurements(TallySnapshot snapshot) {
if (!Objects.nonNull(snapshot.getTallyMeasurements())
|| snapshot.getTallyMeasurements().isEmpty()) {
log.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,15 @@
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.candlepin.clock.ApplicationClock;
import org.candlepin.subscriptions.ApplicationProperties;
import org.candlepin.subscriptions.db.TallyStateRepository;
import org.candlepin.subscriptions.db.model.Granularity;
import org.candlepin.subscriptions.db.model.HardwareMeasurementType;
import org.candlepin.subscriptions.db.model.ServiceLevel;
import org.candlepin.subscriptions.db.model.TallySnapshot;
import org.candlepin.subscriptions.db.model.TallyState;
import org.candlepin.subscriptions.db.model.TallyStateKey;
import org.candlepin.subscriptions.db.model.Usage;
import org.candlepin.subscriptions.event.EventController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.retry.support.RetryTemplate;
Expand All @@ -54,11 +50,11 @@
import org.springframework.transaction.annotation.Transactional;

/** Provides the logic for updating Tally snapshots. */
@Slf4j
@Component
public class TallySnapshotController {

protected static final String TALLIED_USAGE_TOTAL_METRIC = "swatch_tally_tallied_usage_total";
private static final Logger log = LoggerFactory.getLogger(TallySnapshotController.class);

private final ApplicationProperties appProps;
private final InventoryAccountUsageCollector usageCollector;
Expand Down Expand Up @@ -120,44 +116,6 @@ public void produceSnapshotsForOrg(String orgId) {
recordTallyCount(snapshots);
}

protected void recordTallyCount(List<TallySnapshot> savedSnapshots) {
var count = Counter.builder(TALLIED_USAGE_TOTAL_METRIC).withRegistry(meterRegistry);
// Only increment the counter at the finest granularity level to prevent over-counting
savedSnapshots.parallelStream()
.filter(this::isDuplicateSnap)
.forEach(
snap ->
snap.getTallyMeasurements().entrySet().stream()
// Filter out TOTAL measurement types since those are aggregates and we don't
// want to double count
.filter(
entry ->
!entry
.getKey()
.getMeasurementType()
.equals(HardwareMeasurementType.TOTAL))
.forEach(
entry -> {
log.info("tally COUNTING!! snap '{}', entry was {}", snap, entry);
var c =
count.withTags(
"product",
snap.getProductId(),
"metric_id",
MetricId.tryGetValueFromString(entry.getKey().getMetricId()),
"billing_provider",
snap.getBillingProvider().getValue());
c.increment(entry.getValue());
}));
}

protected boolean isDuplicateSnap(TallySnapshot snap) {
return SubscriptionDefinition.isFinestGranularity(
snap.getProductId(), snap.getGranularity().toString())
&& snap.getServiceLevel() != ServiceLevel._ANY
&& snap.getUsage() != Usage._ANY;
}

// Because we want to ensure that our DB operations have been completed before
// any messages are sent, message sending must be done outside a DB transaction
// boundary. We use Propagation.NEVER here so that if this method should ever be called
Expand Down Expand Up @@ -242,6 +200,36 @@ public void produceHourlySnapshotsForOrg(String orgId) {
}
}

private void recordTallyCount(List<TallySnapshot> snapshots) {
var count = Counter.builder(TALLIED_USAGE_TOTAL_METRIC).withRegistry(meterRegistry);
// Only increment the counter at the finest granularity level to prevent over-counting
snapshots.stream()
.filter(this::filterByFinestGranularityAndNotAnySnapshots)
.peek(SnapshotSummaryProducer::removeTotalMeasurements)
.forEach(
snap ->
snap.getTallyMeasurements()
.entrySet()
.forEach(
entry -> {
log.info("tally COUNTING!! snap '{}', entry was {}", snap, entry);
var c =
count.withTags(
"product",
snap.getProductId(),
"metric_id",
MetricId.tryGetValueFromString(entry.getKey().getMetricId()),
"billing_provider",
snap.getBillingProvider().getValue());
c.increment(entry.getValue());
}));
}

private boolean filterByFinestGranularityAndNotAnySnapshots(TallySnapshot snapshot) {
return SnapshotSummaryProducer.filterByGranularityAndNotAnySnapshots(
snapshot, SubscriptionDefinition.getFinestGranularity(snapshot.getProductId()));
}

private boolean isCombiningRollupStrategySupported(
Map.Entry<OffsetDateTime, AccountUsageCalculation> usageCalculations) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,23 @@ public boolean isPrometheusEnabled() {
}

public static boolean isFinestGranularity(String tag, String granularity) {
var subscription =
SubscriptionDefinition.lookupSubscriptionByTag(tag)
.orElseThrow(
() -> new IllegalStateException(tag + " missing in subscription configuration"));

var finestTagGranularity = subscription.getFinestGranularity().toString();
String finestTagGranularity = getFinestGranularity(tag);

// Compare as strings because granularity is represented by two different enumerations:
// org.candlepin.subscriptions.db.model.Granularity and
// com.redhat.swatch.configuration.registry.SubscriptionDefinitionGranularity
return finestTagGranularity.equalsIgnoreCase(granularity);
}

public static String getFinestGranularity(String tag) {
var subscription =
SubscriptionDefinition.lookupSubscriptionByTag(tag)
.orElseThrow(
() -> new IllegalStateException(tag + " missing in subscription configuration"));

return subscription.getFinestGranularity().toString();
}

public SubscriptionDefinitionGranularity getFinestGranularity() {
return this.isPaygEligible()
? SubscriptionDefinitionGranularity.HOURLY
Expand Down

0 comments on commit 6c44867

Please sign in to comment.