Skip to content

Commit

Permalink
feat: support min and max replica counts on spec (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve Mason authored Sep 6, 2023
2 parents b858365 + dd924d0 commit ae605ca
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 17 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ metadata:
spec:
dryRun: false # set true to see what the autoscaler _would_ do
cooloffSeconds: 300
maxScaleIncrements: 1
maxScaleIncrements: 1 # how many increments to scale by, increment of 1 will allow scaling from 1->2, 2->4, 4->8
#2 will allow 1->4 but not 1->8
minReplicas: 1 # optional
maxReplicas: 16 # optional: partition count will be used if missing
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
Expand Down
4 changes: 4 additions & 0 deletions kustomize/crd/kafkapodautoscaler.brandwatch.com-v1alpha1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ spec:
type: object
type: object
type: array
minReplicas:
type: integer
maxReplicas:
type: integer
maxScaleIncrements:
type: integer
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.brandwatch.kafka_pod_autoscaler.scaledresources.GenericScaledResourceFactory;
import com.brandwatch.kafka_pod_autoscaler.triggers.TriggerProcessor;
import com.brandwatch.kafka_pod_autoscaler.v1alpha1.KafkaPodAutoscaler;
import com.brandwatch.kafka_pod_autoscaler.v1alpha1.KafkaPodAutoscalerSpec;
import com.brandwatch.kafka_pod_autoscaler.v1alpha1.KafkaPodAutoscalerStatus;
import com.brandwatch.kafka_pod_autoscaler.v1alpha1.kafkapodautoscalerspec.ScaleTargetRef;
import com.brandwatch.kafka_pod_autoscaler.v1alpha1.kafkapodautoscalerspec.TriggerDefinition;
Expand Down Expand Up @@ -106,14 +107,13 @@ public UpdateControl<KafkaPodAutoscaler> reconcile(KafkaPodAutoscaler kafkaPodAu
return replicas;
})
.max().orElse(1);
var partitionCount = getPartitionCount(kafkaPodAutoscaler);

var partitionCount = getPartitionCount(kafkaPodAutoscaler);
if (partitionCount.isPresent()) {
statusLogger.recordPartitionCount(partitionCount.getAsInt());
}
var maxScaleIncrements = kafkaPodAutoscaler.getSpec().getMaxScaleIncrements();
var finalReplicaCount = fitReplicaCount(currentReplicaCount, calculatedReplicaCount, partitionCount.orElse(calculatedReplicaCount),
maxScaleIncrements);
var finalReplicaCount = fitReplicaCount(currentReplicaCount, calculatedReplicaCount, partitionCount,
kafkaPodAutoscaler.getSpec());

statusLogger.recordCurrentReplicaCount(currentReplicaCount);
statusLogger.recordCalculatedReplicaCount(calculatedReplicaCount);
Expand Down Expand Up @@ -210,11 +210,24 @@ private com.brandwatch.kafka_pod_autoscaler.triggers.TriggerResult calculateTrig
.process(client, scaledResource, autoscaler, trigger, replicaCount);
}

static int fitReplicaCount(int currentReplicaCount, int idealReplicaCount, int partitionCount, int maxScaleIncrements) {
static int fitReplicaCount(int currentReplicaCount, int idealReplicaCount, OptionalInt partitionCount, KafkaPodAutoscalerSpec spec) {
int minReplicas = Optional.ofNullable(spec.getMinReplicas()).orElse(1);
int maxReplicas = Optional.ofNullable(spec.getMaxReplicas()).orElse(partitionCount.orElse(Math.max(currentReplicaCount, idealReplicaCount)));
if (minReplicas < 1) {
throw new IllegalArgumentException("minReplicas cannot be less than 1: " + minReplicas);
}
if (partitionCount.isPresent() && maxReplicas > partitionCount.getAsInt()) {
throw new IllegalArgumentException("maxReplicas cannot be greater than the number of partitions on the topic: "
+ minReplicas + " (partitions=" + partitionCount + ")");
}
if (idealReplicaCount == 0) {
return 1;
return Math.max(currentReplicaCount, minReplicas);
}
var allowedReplicaCounts = partitionCountCache.get(partitionCount);
idealReplicaCount = Math.max(idealReplicaCount, minReplicas);
idealReplicaCount = Math.min(idealReplicaCount, maxReplicas);

var maxScaleIncrements = spec.getMaxScaleIncrements();
var allowedReplicaCounts = partitionCountCache.get(partitionCount.orElse(maxReplicas));
var currentReplicaCountIndex = findReplicaCountIndex(currentReplicaCount, allowedReplicaCounts);
var newReplicaCountIndex = findReplicaCountIndex(idealReplicaCount, allowedReplicaCounts);

Expand All @@ -223,7 +236,7 @@ static int fitReplicaCount(int currentReplicaCount, int idealReplicaCount, int p
if (currentReplicaCountIndex > newReplicaCountIndex) {
newReplicaCountIndex = Math.max(currentReplicaCountIndex - maxScaleIncrements, 0);
} else {
newReplicaCountIndex = Math.min(currentReplicaCountIndex + maxScaleIncrements, partitionCount);
newReplicaCountIndex = Math.min(currentReplicaCountIndex + maxScaleIncrements, partitionCount.orElse(maxReplicas));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ public class KafkaPodAutoscalerSpec implements KubernetesResource {
private List<TriggerDefinition> triggers = Serialization.unmarshal("[]", List.class);
@Getter
@Setter
@JsonProperty("minReplicas")
@JsonSetter(nulls = Nulls.SKIP)
private Integer minReplicas = null;
@Getter
@Setter
@JsonProperty("maxReplicas")
@JsonSetter(nulls = Nulls.SKIP)
private Integer maxReplicas = null;
@Getter
@Setter
@JsonProperty("maxScaleIncrements")
@JsonSetter(nulls = Nulls.SKIP)
private int maxScaleIncrements = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.stream.Stream;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -409,22 +410,31 @@ public void canScale_withKafkaConfig_multipleStaticTriggers() {

@ParameterizedTest
@MethodSource
public void fitReplicaCount(int currentReplicaCount, int idealReplicaCount, int partitionCount, int maxScaleIncrements,
public void fitReplicaCount(int currentReplicaCount, int idealReplicaCount, OptionalInt partitionCount,
Integer minReplicas, Integer maxReplicas, int maxScaleIncrements,
int expectedResult) {
assertThat(KafkaPodAutoscalerReconciler.fitReplicaCount(currentReplicaCount, idealReplicaCount, partitionCount, maxScaleIncrements))
var spec = mock(KafkaPodAutoscalerSpec.class);
when(spec.getMinReplicas()).thenReturn(minReplicas);
when(spec.getMaxReplicas()).thenReturn(maxReplicas);
when(spec.getMaxScaleIncrements()).thenReturn(maxScaleIncrements);
assertThat(KafkaPodAutoscalerReconciler.fitReplicaCount(currentReplicaCount, idealReplicaCount, partitionCount, spec))
.isEqualTo(expectedResult);
}

public static Stream<Arguments> fitReplicaCount() {
return Stream.of(
// Scaling up
Arguments.of(1, 1, 16, 1000, 1),
Arguments.of(1, 3, 16, 1000, 4),
Arguments.of(4, 200, 16, 1000, 16),
Arguments.of(1, 3, 16, 1, 2),
Arguments.of(1, 1, OptionalInt.of(16), null, null, 1000, 1),
Arguments.of(1, 3, OptionalInt.of(16), null, null, 1000, 4),
Arguments.of(4, 200, OptionalInt.of(16), null, null, 1000, 16),
Arguments.of(4, 200, OptionalInt.of(16), null, 8, 1000, 8),
Arguments.of(1, 3, OptionalInt.of(16), null, null, 1, 2),
Arguments.of(1, 3, OptionalInt.empty(), null, null, 1, 3),
// Scaling down
Arguments.of(16, 3, 16, 1000, 4),
Arguments.of(16, 3, 16, 1, 8)
Arguments.of(16, 3, OptionalInt.of(16), null, null, 1000, 4),
Arguments.of(16, 3, OptionalInt.of(16), 8, null, 1000, 8),
Arguments.of(16, 3, OptionalInt.of(16), null, null, 1, 8),
Arguments.of(16, 3, OptionalInt.empty(), null, null, 1, 8)
);
}

Expand Down

0 comments on commit ae605ca

Please sign in to comment.