Skip to content

Commit

Permalink
Lower the replication factor for transactions topic
Browse files Browse the repository at this point in the history
  • Loading branch information
Dltmd202 committed Oct 27, 2024
1 parent 4a45905 commit df61740
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,12 +47,13 @@
* @author Oleg Artyomov
* @author Sergio Lourenco
* @author Pawel Lozinski
*
* @author Dltmd202
* @since 1.3
*/
class EmbeddedKafkaContextCustomizer implements ContextCustomizer {

private final EmbeddedKafka embeddedKafka;
private final String TRANSACTION_STATE_LOG_REPLICATION_FACTOR = "transaction.state.log.replication.factor";

EmbeddedKafkaContextCustomizer(EmbeddedKafka embeddedKafka) {
this.embeddedKafka = embeddedKafka;
Expand Down Expand Up @@ -121,6 +122,8 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte
}
}

properties.putIfAbsent(TRANSACTION_STATE_LOG_REPLICATION_FACTOR, String.valueOf(Math.min(3, embeddedKafka.value())));

embeddedKafkaBroker.brokerProperties((Map<String, String>) (Map<?, ?>) properties);
if (StringUtils.hasText(this.embeddedKafka.bootstrapServersProperty())) {
embeddedKafkaBroker.brokerListProperty(this.embeddedKafka.bootstrapServersProperty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.utils.KafkaTestUtils;

import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Oleg Artyomov
* @author Sergio Lourenco
* @author Artem Bilan
* @author Gary Russell
*
* @since 1.3
*/
public class EmbeddedKafkaContextCustomizerTests {
Expand Down Expand Up @@ -91,6 +92,22 @@ void testMulti() {
.matches("127.0.0.1:[0-9]+,127.0.0.1:[0-9]+");
}

@Test
void testTransactionReplicationFactor() {
EmbeddedKafka annotationWithPorts =
AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaTransactionFactor.class, EmbeddedKafka.class);
EmbeddedKafkaContextCustomizer customizer = new EmbeddedKafkaContextCustomizer(annotationWithPorts);
ConfigurableApplicationContext context = new GenericApplicationContext();
customizer.customizeContext(context, null);
context.refresh();

EmbeddedKafkaBroker embeddedKafkaBroker = context.getBean(EmbeddedKafkaBroker.class);
Map<String, Object> properties = (Map<String, Object>) KafkaTestUtils.getPropertyValue(embeddedKafkaBroker, "brokerProperties");

assertThat(properties.get("transaction.state.log.replication.factor")).isEqualTo("2");
}


@EmbeddedKafka(kraft = false)
private static final class TestWithEmbeddedKafka {

Expand All @@ -111,4 +128,9 @@ private static final class TestWithEmbeddedKafkaMulti {

}

@EmbeddedKafka(kraft = false, count = 2)
private static final class TestWithEmbeddedKafkaTransactionFactor {

}

}

0 comments on commit df61740

Please sign in to comment.