Skip to content

Commit

Permalink
feat: allow consumer rate to be set as a constant on the kafka trigger (
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve Mason authored Aug 21, 2023
2 parents 4b0337a + 94a87fe commit 4890028
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public TriggerResult process(KubernetesClient client, ScaledResource resource, K
var consumerWindowSize = Optional.ofNullable(trigger.getMetadata().get("consumerWindowSize")).map(Integer::parseInt).orElse(360);
var consumerRatePercentile = Optional.ofNullable(trigger.getMetadata().get("consumerRatePercentile")).map(Double::parseDouble).orElse(99D);
var minimumConsumerRateMeasurements = Optional.ofNullable(trigger.getMetadata().get("minimumConsumerRateMeasurements")).map(Long::parseLong).orElse(3L);
var consumerMessagesPerSec = Optional.ofNullable(trigger.getMetadata().get("consumerMessagesPerSec")).map(Double::parseDouble).orElse(null);
var consumerCommitTimeout = Optional.ofNullable(trigger.getMetadata().get("consumerCommitTimeout")).map(Duration::parse).orElseGet(() -> Duration.ofMinutes(1L));

logger.debug("Requesting kafka metrics for topic={} and consumerGroupId={}", topic, consumerGroupId);
Expand All @@ -60,6 +61,7 @@ public TriggerResult process(KubernetesClient client, ScaledResource resource, K
lagModel.setTopicRatePercentile(topicRatePercentile);
lagModel.setMinimumTopicRateMeasurements(minimumTopicRateMeasurements);

lagModel.setConsumerMessagesPerSec(consumerMessagesPerSec);
lagModel.setConsumerCommitTimeout(consumerCommitTimeout);

var kafkaMetadata = KafkaMetadataCache.get(bootstrapServers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TopicConsumerStats {
private final LongSupplier clock;

Expand All @@ -34,6 +36,9 @@ public class TopicConsumerStats {
private long minimumConsumerRateMeasurements = 3;
@Getter
@Setter
private Double consumerMessagesPerSec = null;
@Getter
@Setter
private double topicRatePercentile = 99D;
@Getter
@Setter
Expand Down Expand Up @@ -113,6 +118,9 @@ public OptionalDouble getTopicRate() {
}

public OptionalDouble estimateConsumerRate(int replicaCount) {
if (consumerMessagesPerSec != null) {
return OptionalDouble.of(consumerMessagesPerSec * replicaCount);
}
if (historicalConsumerRates.getN() < minimumConsumerRateMeasurements) {
return OptionalDouble.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public class TopicConsumerStatsTest {

@ParameterizedTest
@MethodSource
public void variousScenarios(List<UpdateCallParameters> updateCalls,
ExpectedResults expectedResults) {
public void variousScenarios_dynamicConsumerRate(List<UpdateCallParameters> updateCalls,
ExpectedResults expectedResults) {
var clock = new AtomicLong(NOW.toEpochMilli());
var stats = new TopicConsumerStats("namespace", "test", clock::get);

Expand All @@ -45,7 +45,7 @@ public void variousScenarios(List<UpdateCallParameters> updateCalls,
assertThat(stats.getTopicRate(), equalTo(expectedResults.topicRate()));
}

public static Stream<Arguments> variousScenarios() {
public static Stream<Arguments> variousScenarios_dynamicConsumerRate() {
return Stream.of(
Arguments.of(noCalls(), expect(1, 0, OptionalDouble.empty(), OptionalDouble.empty())),
Arguments.of(oneCallSameOffsets(), expect(1, 0, OptionalDouble.empty(), OptionalDouble.empty())),
Expand All @@ -61,6 +61,44 @@ public static Stream<Arguments> variousScenarios() {
);
}

@ParameterizedTest
@MethodSource
public void variousScenarios_constantConsumerRate(List<UpdateCallParameters> updateCalls,
ExpectedResults expectedResults) {
var clock = new AtomicLong(NOW.toEpochMilli());
var stats = new TopicConsumerStats("namespace", "test", clock::get);

stats.setMinimumTopicRateMeasurements(0L);
stats.setMinimumConsumerRateMeasurements(0L);
stats.setConsumerCommitTimeout(Duration.ofSeconds(10L));
stats.setConsumerMessagesPerSec(4D);

for (var call : updateCalls) {
stats.update(call.replicaCount, call.consumerOffsets, call.topicEndOffsets);
clock.addAndGet(call.tickBy);
}

assertThat(stats.getLag(), equalTo(expectedResults.lag()));
assertThat(stats.estimateConsumerRate(expectedResults.replicaCount()), equalTo(expectedResults.consumerRate()));
assertThat(stats.getTopicRate(), equalTo(expectedResults.topicRate()));
}

public static Stream<Arguments> variousScenarios_constantConsumerRate() {
return Stream.of(
Arguments.of(noCalls(), expect(1, 0, OptionalDouble.of(4D), OptionalDouble.empty())),
Arguments.of(oneCallSameOffsets(), expect(1, 0, OptionalDouble.of(4D), OptionalDouble.empty())),
Arguments.of(manyOfTheSameOffset_withinCommitTimeout(), expect(1, 0, OptionalDouble.of(4D), OptionalDouble.of(0D))),
Arguments.of(manyOfTheSameOffset_outsideCommitTimeout(), expect(1, 0, OptionalDouble.of(4D), OptionalDouble.of(0D))),
Arguments.of(stuckConsumer(), expect(1, 8, OptionalDouble.of(4D), OptionalDouble.of(0.4D))),
Arguments.of(slowConsumer(), expect(1, 24, OptionalDouble.of(4D), OptionalDouble.of(16D))),
Arguments.of(keepingUpConsumer(), expect(1, 0L, OptionalDouble.of(4D), OptionalDouble.of(16D))),
Arguments.of(keepingUpConsumer(), expect(2, 0L, OptionalDouble.of(8D), OptionalDouble.of(16D))),
Arguments.of(catchingUpConsumer(), expect(1, 0, OptionalDouble.of(4D), OptionalDouble.of(4D))),
Arguments.of(catchingUpConsumer(), expect(2, 0, OptionalDouble.of(8D), OptionalDouble.of(4D))),
Arguments.of(scalingCatchingUpConsumer(), expect(2, 0, OptionalDouble.of(8D), OptionalDouble.of(8D)))
);
}

private static List<UpdateCallParameters> noCalls() {
return List.of();
}
Expand Down

0 comments on commit 4890028

Please sign in to comment.