Skip to content

Commit

Permalink
GH-2978: Propagate number of partitions to KRaft broker
Browse files Browse the repository at this point in the history
Fixes: #2978

If we don't create topics manually, that can be done automatically on the broker side
according to its configuration.

For that goal the `EmbeddedKafkaKraftBroker` is missing to populate
`KafkaConfig.NumPartitionsProp(): "" + this.partitionsPerTopic` broker property from
`@EmbeddedKafka` configuration

* Propagate `partitionsPerTopic` option down to the embedded broker(s) in the `EmbeddedKafkaKraftBroker`
* Some other simple refactoring in the `EmbeddedKafkaKraftBroker`
* Verify the option propagated via new unit test in the `KafkaTestUtilsTests.topicAutomaticallyCreatedWithProperNumberOfPartitions()`
  • Loading branch information
artembilan committed Jan 11, 2024
1 parent 769da20 commit 03b3f7c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -199,7 +199,7 @@ public void setAdminTimeout(int adminTimeout) {
public void afterPropertiesSet() {
if (this.initialized.compareAndSet(false, true)) {
overrideExitMethods();
addDefaultBrokerPropsIfAbsent(this.brokerProperties, this.count);
addDefaultBrokerPropsIfAbsent();
start();
}
}
Expand Down Expand Up @@ -252,10 +252,11 @@ public void destroy() {
this.cluster = null;
}

private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int numBrokers) {
brokerConfig.putIfAbsent(KafkaConfig.DeleteTopicEnableProp(), "true");
brokerConfig.putIfAbsent(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0");
brokerConfig.putIfAbsent(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(numBrokers));
private void addDefaultBrokerPropsIfAbsent() {
this.brokerProperties.putIfAbsent(KafkaConfig.DeleteTopicEnableProp(), "true");
this.brokerProperties.putIfAbsent(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0");
this.brokerProperties.putIfAbsent(KafkaConfig.OffsetsTopicReplicationFactorProp(), "" + this.count);
this.brokerProperties.putIfAbsent(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic);
}

private void logDir(Properties brokerConfigProperties) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,32 +20,36 @@
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

import java.time.Duration;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.junit.jupiter.api.Test;

import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;

/**
* @author Gary Russell
* @author Artem Bilan
* @since 2.2.7
*
*/
@EmbeddedKafka(topics = { "singleTopic1", "singleTopic2", "singleTopic3", "singleTopic4", "singleTopic5",
"multiTopic1" })
@EmbeddedKafka(topics = {"singleTopic1", "singleTopic2", "singleTopic3", "singleTopic4", "singleTopic5",
"multiTopic1"})
public class KafkaTestUtilsTests {

@Test
void testGetSingleWithMoreThatOneTopic(EmbeddedKafkaBroker broker) {
void testGetSingleWithMoreThanOneTopic(EmbeddedKafkaBroker broker) {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
producer.send(new ProducerRecord<>("singleTopic1", 0, 1, "foo"));
Expand All @@ -64,7 +68,7 @@ void testGetSingleWithMoreThatOneTopic(EmbeddedKafkaBroker broker) {
}

@Test
void testGetSingleWithMoreThatOneTopicRecordNotThereYet(EmbeddedKafkaBroker broker) {
void testGetSingleWithMoreThanOneTopicRecordNotThereYet(EmbeddedKafkaBroker broker) {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
producer.send(new ProducerRecord<>("singleTopic4", 0, 1, "foo"));
Expand All @@ -73,7 +77,7 @@ void testGetSingleWithMoreThatOneTopicRecordNotThereYet(EmbeddedKafkaBroker brok
broker.consumeFromEmbeddedTopics(consumer, "singleTopic4", "singleTopic5");
long t1 = System.currentTimeMillis();
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() ->
KafkaTestUtils.getSingleRecord(consumer, "singleTopic5", Duration.ofSeconds(2)));
KafkaTestUtils.getSingleRecord(consumer, "singleTopic5", Duration.ofSeconds(2)));
assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(2000L);
producer.send(new ProducerRecord<>("singleTopic5", 1, "foo"));
producer.close();
Expand All @@ -97,19 +101,19 @@ public void testGetOneRecord(EmbeddedKafkaBroker broker) throws Exception {
assertThat(oneRecord.value()).isEqualTo("foo");
assertThat(KafkaTestUtils.getCurrentOffset(broker.getBrokersAsString(), "getOne", "singleTopic3", 0))
.isNotNull()
.extracting(omd -> omd.offset())
.extracting(OffsetAndMetadata::offset)
.isEqualTo(1L);
oneRecord = KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "getOne",
"singleTopic3", 0, true, true, Duration.ofSeconds(10));
assertThat(oneRecord.value()).isEqualTo("foo");
assertThat(KafkaTestUtils.getCurrentOffset(broker.getBrokersAsString(), "getOne", "singleTopic3", 0))
.isNotNull()
.extracting(omd -> omd.offset())
.extracting(OffsetAndMetadata::offset)
.isEqualTo(1L);
}

@Test
public void testMultiMinRecords(EmbeddedKafkaBroker broker) throws Exception {
public void testMultiMinRecords(EmbeddedKafkaBroker broker) {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
producer.send(new ProducerRecord<>("multiTopic1", 0, 1, "foo"));
Expand All @@ -135,16 +139,36 @@ public void testMultiMinRecords(EmbeddedKafkaBroker broker) throws Exception {
public void testGetCurrentOffsetWithAdminClient(EmbeddedKafkaBroker broker) throws Exception {
Map<String, Object> adminClientProps = Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
try (AdminClient adminClient = AdminClient.create(adminClientProps); KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps)) {
try (var adminClient = AdminClient.create(adminClientProps); var producer = new KafkaProducer<>(producerProps)) {
producer.send(new ProducerRecord<>("singleTopic3", 0, 1, "foo"));

KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "testGetCurrentOffsetWithAdminClient",
"singleTopic3", 0, false, true, Duration.ofSeconds(10));
assertThat(KafkaTestUtils.getCurrentOffset(adminClient, "testGetCurrentOffsetWithAdminClient", "singleTopic3", 0))
.isNotNull()
.extracting(omd -> omd.offset())
.extracting(OffsetAndMetadata::offset)
.isEqualTo(1L);
}
}

@Test
public void topicAutomaticallyCreatedWithProperNumberOfPartitions(EmbeddedKafkaBroker broker) throws Exception {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);

Map<String, Object> adminClientProps =
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
try (var adminClient = AdminClient.create(adminClientProps); var producer = new KafkaProducer<>(producerProps)) {
producer.send(new ProducerRecord<>("auto-topic", "test data")).get();

List<TopicPartitionInfo> partitions =
adminClient.describeTopics(List.of("auto-topic"))
.allTopicNames()
.get()
.get("auto-topic")
.partitions();

assertThat(partitions).hasSize(2);
}

}

Expand Down

0 comments on commit 03b3f7c

Please sign in to comment.