Skip to content

Commit

Permalink
fix: allow different window size for the topic rates (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve Mason authored Aug 15, 2023
2 parents dc2db0d + e050df1 commit f5b58f8
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public class KafkaLagTriggerProcessor implements TriggerProcessor {
private static final LoadingCache<TopicConsumerGroupId, TopicConsumerStats> lagModelCache = Caffeine.newBuilder()
.expireAfterAccess(Duration.ofMinutes(10))
.build(id -> new TopicConsumerStats());
.build(id -> new TopicConsumerStats(id.topic, id.consumerGroupId));

@Override
public String getType() {
Expand All @@ -37,9 +37,10 @@ public TriggerResult process(KubernetesClient client, ScaledResource resource, K
var consumerGroupId = requireNonNull(trigger.getMetadata().get("consumerGroupId"));
var threshold = Integer.parseInt(requireNonNull(trigger.getMetadata().get("threshold")));
var sla = Duration.parse(Optional.ofNullable(trigger.getMetadata().get("sla")).orElse("PT10M"));
var windowSize = Optional.ofNullable(trigger.getMetadata().get("windowSize")).map(Integer::parseInt).orElse(360);
var topicRateWindowSize = Optional.ofNullable(trigger.getMetadata().get("topicRateWindowSize")).map(Integer::parseInt).orElse(90);
var topicRatePercentile = Optional.ofNullable(trigger.getMetadata().get("topicRatePercentile")).map(Double::parseDouble).orElse(99D);
var minimumTopicRateMeasurements = Optional.ofNullable(trigger.getMetadata().get("minimumTopicRateMeasurements")).map(Long::parseLong).orElse(3L);
var consumerWindowSize = Optional.ofNullable(trigger.getMetadata().get("consumerWindowSize")).map(Integer::parseInt).orElse(360);
var consumerRatePercentile = Optional.ofNullable(trigger.getMetadata().get("consumerRatePercentile")).map(Double::parseDouble).orElse(99D);
var minimumConsumerRateMeasurements = Optional.ofNullable(trigger.getMetadata().get("minimumConsumerRateMeasurements")).map(Long::parseLong).orElse(3L);
var consumerCommitTimeout = Optional.ofNullable(trigger.getMetadata().get("consumerCommitTimeout")).map(Duration::parse).orElseGet(() -> Duration.ofMinutes(1L));
Expand All @@ -49,11 +50,14 @@ public TriggerResult process(KubernetesClient client, ScaledResource resource, K
var lagModel = lagModelCache.get(new TopicConsumerGroupId(topic, consumerGroupId));

// Update these values, in case the definition changed
lagModel.setWindowSize(windowSize);
lagModel.setConsumerRateWindowSize(consumerWindowSize);
lagModel.setConsumerRatePercentile(consumerRatePercentile);
lagModel.setMinimumConsumerRateMeasurements(minimumConsumerRateMeasurements);

lagModel.setTopicRateWindowSize(topicRateWindowSize);
lagModel.setTopicRatePercentile(topicRatePercentile);
lagModel.setMinimumTopicRateMeasurements(minimumTopicRateMeasurements);

lagModel.setConsumerCommitTimeout(consumerCommitTimeout);

var kafkaMetadata = KafkaMetadataCache.get(bootstrapServers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@

import com.google.common.annotations.VisibleForTesting;

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;

public class TopicConsumerStats {
private final String topic;
private final String consumerGroupId;
private final LongSupplier clock;

@Getter
Expand All @@ -40,23 +44,37 @@ public class TopicConsumerStats {
@Setter
private Duration consumerCommitTimeout = Duration.ofMinutes(1L);

public TopicConsumerStats() {
this(System::currentTimeMillis);
public TopicConsumerStats(String topic, String consumerGroupId) {
this(topic, consumerGroupId, System::currentTimeMillis);
}

@VisibleForTesting
TopicConsumerStats(LongSupplier clock) {
TopicConsumerStats(String topic, String consumerGroupId, LongSupplier clock) {
this.topic = topic;
this.consumerGroupId = consumerGroupId;
this.clock = clock;
this.historicalConsumerRates.setWindowSize(360);
this.historicalTopicRates.setWindowSize(360);
}

public int getWindowSize() {
return this.historicalConsumerRates.getWindowSize();
var tags = Tags.of("topic", topic, "consumerGroupId", consumerGroupId);
Metrics.gauge("kpa_topic_consumer_stats_consumer_rate_n", tags, historicalConsumerRates, SynchronizedDescriptiveStatistics::getN);
Metrics.gauge("kpa_topic_consumer_stats_consumer_rate", tags.and("percentile", "50"), historicalConsumerRates, s -> s.getPercentile(50D));
Metrics.gauge("kpa_topic_consumer_stats_consumer_rate", tags.and("percentile", "90"), historicalConsumerRates, s -> s.getPercentile(90D));
Metrics.gauge("kpa_topic_consumer_stats_consumer_rate", tags.and("percentile", "95"), historicalConsumerRates, s -> s.getPercentile(95D));
Metrics.gauge("kpa_topic_consumer_stats_consumer_rate", tags.and("percentile", "99"), historicalConsumerRates, s -> s.getPercentile(99D));

Metrics.gauge("kpa_topic_consumer_stats_topic_rate_n", tags, historicalTopicRates, SynchronizedDescriptiveStatistics::getN);
Metrics.gauge("kpa_topic_consumer_stats_topic_rate", tags.and("percentile", "50"), historicalTopicRates, s -> s.getPercentile(50D));
Metrics.gauge("kpa_topic_consumer_stats_topic_rate", tags.and("percentile", "90"), historicalTopicRates, s -> s.getPercentile(90D));
Metrics.gauge("kpa_topic_consumer_stats_topic_rate", tags.and("percentile", "95"), historicalTopicRates, s -> s.getPercentile(95D));
Metrics.gauge("kpa_topic_consumer_stats_topic_rate", tags.and("percentile", "99"), historicalTopicRates, s -> s.getPercentile(99D));
}

public void setWindowSize(int windowSize) {
public void setConsumerRateWindowSize(int windowSize) {
this.historicalConsumerRates.setWindowSize(windowSize);
}

public void setTopicRateWindowSize(int windowSize) {
this.historicalTopicRates.setWindowSize(windowSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class TopicConsumerStatsTest {
public void variousScenarios(List<UpdateCallParameters> updateCalls,
ExpectedResults expectedResults) {
var clock = new AtomicLong(NOW.toEpochMilli());
var stats = new TopicConsumerStats(clock::get);
var stats = new TopicConsumerStats("topic", "test", clock::get);

stats.setMinimumTopicRateMeasurements(0L);
stats.setMinimumConsumerRateMeasurements(0L);
Expand Down

0 comments on commit f5b58f8

Please sign in to comment.