Skip to content

Commit

Permalink
GH-3662: Support Kafka 3.9.0+ in EmbeddedKafkaKraftBroker
Browse files Browse the repository at this point in the history
Fixes: #3662
Issue: #3662

Add compatibility for both Kafka 3.8.0 and 3.9.0+ by handling different method signatures for setConfigProp:
- 3.9.0+: setConfigProp(String, Object)
- 3.8.0: setConfigProp(String, String)

The change uses reflection to detect Kafka version and call appropriate method.

* Direct call to setConfigProp when using 3.8.0

* Addressing PR review

* Addressing PR review
  • Loading branch information
sobychacko authored Dec 13, 2024
1 parent 0db6b81 commit 831dba1
Showing 1 changed file with 32 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;
Expand Down Expand Up @@ -56,6 +57,8 @@

import org.springframework.core.log.LogAccessor;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;

/**
* An embedded Kafka Broker(s) using KRaft.
Expand Down Expand Up @@ -87,6 +90,23 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {

public static final int DEFAULT_ADMIN_TIMEOUT = 10;

private static final boolean IS_KAFKA_39_OR_LATER = ClassUtils.isPresent(
"org.apache.kafka.server.config.AbstractKafkaConfig", EmbeddedKafkaKraftBroker.class.getClassLoader());

private static final Method SET_CONFIG_METHOD;

static {
if (IS_KAFKA_39_OR_LATER) {
SET_CONFIG_METHOD = ReflectionUtils.findMethod(
KafkaClusterTestKit.Builder.class,
"setConfigProp",
String.class, Object.class);
}
else {
SET_CONFIG_METHOD = null;
}
}

private final int count;

private final Set<String> topics;
Expand Down Expand Up @@ -212,7 +232,7 @@ private void start() {
.setNumBrokerNodes(this.count)
.setNumControllerNodes(this.count)
.build());
this.brokerProperties.forEach((k, v) -> clusterBuilder.setConfigProp((String) k, (String) v));
this.brokerProperties.forEach((k, v) -> setConfigProperty(clusterBuilder, (String) k, v));
this.cluster = clusterBuilder.build();
}
catch (Exception ex) {
Expand All @@ -238,6 +258,17 @@ private void start() {
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
}

private static void setConfigProperty(KafkaClusterTestKit.Builder clusterBuilder, String key, Object value) {
if (IS_KAFKA_39_OR_LATER) {
// For Kafka 3.9.0+: use reflection
ReflectionUtils.invokeMethod(SET_CONFIG_METHOD, clusterBuilder, key, value);
}
else {
// For Kafka 3.8.0: direct call
clusterBuilder.setConfigProp(key, (String) value);
}
}

@Override
public void destroy() {
AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
Expand Down

0 comments on commit 831dba1

Please sign in to comment.