Skip to content

Commit

Permalink
fix: change tags on ConsumerGroupStats metrics (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve Mason authored Aug 15, 2023
2 parents f5b58f8 + 1ea3aa7 commit 4b0337a
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
@Slf4j
@AutoService(TriggerProcessor.class)
public class KafkaLagTriggerProcessor implements TriggerProcessor {
private static final LoadingCache<TopicConsumerGroupId, TopicConsumerStats> lagModelCache = Caffeine.newBuilder()
private static final LoadingCache<KpaId, TopicConsumerStats> lagModelCache = Caffeine.newBuilder()
.expireAfterAccess(Duration.ofMinutes(10))
.build(id -> new TopicConsumerStats(id.topic, id.consumerGroupId));
.build(id -> new TopicConsumerStats(id.namespace, id.name));

@Override
public String getType() {
Expand All @@ -32,6 +32,8 @@ public String getType() {

@Override
public TriggerResult process(KubernetesClient client, ScaledResource resource, KafkaPodAutoscaler autoscaler, TriggerDefinition trigger, int replicaCount) {
var kpaName = autoscaler.getMetadata().getName();
var kpaNamespace = autoscaler.getMetadata().getNamespace();
var topic = autoscaler.getSpec().getTopicName();
var bootstrapServers = autoscaler.getSpec().getBootstrapServers();
var consumerGroupId = requireNonNull(trigger.getMetadata().get("consumerGroupId"));
Expand All @@ -47,7 +49,7 @@ public TriggerResult process(KubernetesClient client, ScaledResource resource, K

logger.debug("Requesting kafka metrics for topic={} and consumerGroupId={}", topic, consumerGroupId);

var lagModel = lagModelCache.get(new TopicConsumerGroupId(topic, consumerGroupId));
var lagModel = lagModelCache.get(new KpaId(kpaNamespace, kpaName));

// Update these values, in case the definition changed
lagModel.setConsumerRateWindowSize(consumerWindowSize);
Expand Down Expand Up @@ -86,6 +88,6 @@ public TriggerResult process(KubernetesClient client, ScaledResource resource, K
}
}

private record TopicConsumerGroupId(String topic, String consumerGroupId) {
private record KpaId(String namespace, String name) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import lombok.Setter;

public class TopicConsumerStats {
private final String topic;
private final String consumerGroupId;
private final LongSupplier clock;

@Getter
Expand Down Expand Up @@ -49,14 +47,12 @@ public TopicConsumerStats(String topic, String consumerGroupId) {
}

@VisibleForTesting
TopicConsumerStats(String topic, String consumerGroupId, LongSupplier clock) {
this.topic = topic;
this.consumerGroupId = consumerGroupId;
TopicConsumerStats(String namespace, String name, LongSupplier clock) {
this.clock = clock;
this.historicalConsumerRates.setWindowSize(360);
this.historicalTopicRates.setWindowSize(360);

var tags = Tags.of("topic", topic, "consumerGroupId", consumerGroupId);
var tags = Tags.of("kpa-namespace", namespace, "kpa-name", name);
Metrics.gauge("kpa_topic_consumer_stats_consumer_rate_n", tags, historicalConsumerRates, SynchronizedDescriptiveStatistics::getN);
Metrics.gauge("kpa_topic_consumer_stats_consumer_rate", tags.and("percentile", "50"), historicalConsumerRates, s -> s.getPercentile(50D));
Metrics.gauge("kpa_topic_consumer_stats_consumer_rate", tags.and("percentile", "90"), historicalConsumerRates, s -> s.getPercentile(90D));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class TopicConsumerStatsTest {
public void variousScenarios(List<UpdateCallParameters> updateCalls,
ExpectedResults expectedResults) {
var clock = new AtomicLong(NOW.toEpochMilli());
var stats = new TopicConsumerStats("topic", "test", clock::get);
var stats = new TopicConsumerStats("namespace", "test", clock::get);

stats.setMinimumTopicRateMeasurements(0L);
stats.setMinimumConsumerRateMeasurements(0L);
Expand Down

0 comments on commit 4b0337a

Please sign in to comment.