diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/testing.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/testing.adoc index 7063d513cd..25ce59d764 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/testing.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/testing.adoc @@ -3,6 +3,16 @@ The `spring-kafka-test` jar contains some useful utilities to assist with testing your applications. +[[ekb]] +== EmbeddedKafkaBroker + +Two implementations are provided: + +* `EmbeddedKafkaZKBroker` - legacy implementation which starts an embedded `Zookeeper` instance. +* `EmbeddedKafkaKraftBroker` - (default) uses Kraft instead of `Zookeeper` in combined controller and broker modes (since 3.1) + +There are several techniques to configure the broker as discussed in the following sections. + [[ktu]] == KafkaTestUtils @@ -47,7 +57,7 @@ If this is not possible for some reason, note that the `consumeFromEmbeddedTopic Since it does not have access to the consumer properties, you must use the overloaded method that takes a `seekToEnd` boolean parameter to seek to the end instead of the beginning. ==== -A JUnit 4 `@Rule` wrapper for the `EmbeddedKafkaBroker` is provided to create an embedded Kafka and an embedded Zookeeper server. +A JUnit 4 `@Rule` wrapper for the `EmbeddedKafkaZKBroker` is provided to create an embedded Kafka and an embedded Zookeeper server. (See xref:testing.adoc#embedded-kafka-annotation[@EmbeddedKafka Annotation] for information about using `@EmbeddedKafka` with JUnit 5). The following listing shows the signatures of those methods: @@ -72,6 +82,8 @@ public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... } ---- +NOTE: The `EmbeddedKafkaKraftBroker` is not supported with JUnit4. + The `EmbeddedKafkaBroker` class has a utility method that lets you consume for all the topics it created. The following example shows how to use it: @@ -162,7 +174,7 @@ You can use the same broker for multiple test classes with something similar to ---- public final class EmbeddedKafkaHolder { - private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, false) + private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false) .brokerListProperty("spring.kafka.bootstrap-servers"); private static boolean started; @@ -216,7 +228,8 @@ In addition, these properties can be provided: - `spring.kafka.embedded.ports` - ports (comma-separated value) for every Kafka broker to start, `0` if random port is a preferred; the number of values must be equal to the `count` mentioned above; - `spring.kafka.embedded.topics` - topics (comma-separated value) to create in the started Kafka cluster; - `spring.kafka.embedded.partitions` - number of partitions to provision for the created topics; -- `spring.kafka.embedded.broker.properties.location` - the location of the file for additional Kafka broker configuration properties; the value of this property must follow the Spring resource abstraction pattern. +- `spring.kafka.embedded.broker.properties.location` - the location of the file for additional Kafka broker configuration properties; the value of this property must follow the Spring resource abstraction pattern; +- `spring.kafka.embedded.kraft` - when false, use an `EmbeddedKafkaZKBroker` instead of an `EmbeddedKafkaKraftBroker`. Essentially these properties mimic some of the `@EmbeddedKafka` attributes. @@ -283,6 +296,8 @@ public class KafkaStreamsTests { Starting with version 2.2.4, you can also use the `@EmbeddedKafka` annotation to specify the Kafka ports property. +Starting with version 3.1; set the `kraft` property to `false` to use an `EmbeddedKafkaZKBroker` instead of an `EmbeddedKafkaKraftBroker`. + The following example sets the `topics`, `brokerProperties`, and `brokerPropertiesLocation` attributes of `@EmbeddedKafka` support property placeholder resolutions: [source, java] diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 0c66801352..6f7fd5daf5 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -7,8 +7,13 @@ This section covers the changes made from version 3.0 to version 3.1. For changes in earlier version, see xref:appendix/change-history.adoc[Change History]. -[[x30-kafka-client]] +[[x31-kafka-client]] === Kafka Client Version This version requires the 3.5.1 `kafka-clients`. +[[x31-ekb]] +=== EmbeddedKafkaBroker + +An additional implementation is now provided to use `Kraft` instead of Zookeeper. +See <> for more information. diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java index 16061761dd..9c5b74d868 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,169 +16,60 @@ package org.springframework.kafka.test; -import java.io.File; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.net.InetSocketAddress; -import java.nio.file.Files; -import java.time.Duration; -import java.util.AbstractMap.SimpleEntry; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; -import java.util.stream.Collectors; -import org.apache.commons.logging.LogFactory; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.common.utils.Exit; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.metadata.BrokerState; -import org.apache.zookeeper.client.ZKClientConfig; -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ZooKeeperServer; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; -import org.springframework.core.log.LogAccessor; -import org.springframework.kafka.test.core.BrokerAddress; -import org.springframework.retry.backoff.ExponentialBackOffPolicy; -import org.springframework.retry.policy.SimpleRetryPolicy; -import org.springframework.retry.support.RetryTemplate; -import org.springframework.util.Assert; -import kafka.cluster.EndPoint; import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.CoreUtils; -import kafka.utils.TestUtils; -import kafka.zk.ZkFourLetterWords; -import kafka.zookeeper.ZooKeeperClient; /** - * An embedded Kafka Broker(s) and Zookeeper manager. - * This class is intended to be used in the unit tests. - * - * @author Marius Bogoevici - * @author Artem Bilan * @author Gary Russell - * @author Kamill Sokol - * @author Elliot Kennedy - * @author Nakul Mishra - * @author Pawel Lozinski - * @author Adrian Chlebosz + * @since 3.1 * - * @since 2.2 */ -public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean { - - private static final String BROKER_NEEDED = "Broker must be started before this method can be called"; - - private static final String LOOPBACK = "127.0.0.1"; - - private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(EmbeddedKafkaBroker.class)); // NOSONAR +public interface EmbeddedKafkaBroker extends InitializingBean, DisposableBean { - public static final String BEAN_NAME = "embeddedKafka"; + int DEFAULT_ADMIN_TIMEOUT = 10; - public static final String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers"; + /** + * Set explicit ports on which the kafka brokers will listen. Useful when running an + * embedded broker that you want to access from other processes. + * @param ports the ports. + * @return the {@link EmbeddedKafkaBroker}. + */ + EmbeddedKafkaBroker kafkaPorts(int... ports); - public static final String SPRING_EMBEDDED_ZOOKEEPER_CONNECT = "spring.embedded.zookeeper.connect"; + String BEAN_NAME = "embeddedKafka"; /** * Set the value of this property to a property name that should be set to the list of * embedded broker addresses instead of {@value #SPRING_EMBEDDED_KAFKA_BROKERS}. */ - public static final String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property"; - - public static final int DEFAULT_ADMIN_TIMEOUT = 10; - - public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000; - - public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = DEFAULT_ZK_SESSION_TIMEOUT; + String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property"; - private final int count; + String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers"; - private final boolean controlledShutdown; + String BROKER_NEEDED = "Broker must be started before this method can be called"; - private final Set topics; - - private final int partitionsPerTopic; - - private final List kafkaServers = new ArrayList<>(); - - private final Map brokerProperties = new HashMap<>(); - - private EmbeddedZookeeper zookeeper; - - private String zkConnect; - - private int zkPort; - - private int[] kafkaPorts; - - private Duration adminTimeout = Duration.ofSeconds(DEFAULT_ADMIN_TIMEOUT); - - private int zkConnectionTimeout = DEFAULT_ZK_CONNECTION_TIMEOUT; - - private int zkSessionTimeout = DEFAULT_ZK_SESSION_TIMEOUT; - - private String brokerListProperty = "spring.kafka.bootstrap-servers"; - - private volatile ZooKeeperClient zooKeeperClient; - - public EmbeddedKafkaBroker(int count) { - this(count, false); - } + String LOOPBACK = "127.0.0.1"; /** - * Create embedded Kafka brokers. - * @param count the number of brokers. - * @param controlledShutdown passed into TestUtils.createBrokerConfig. - * @param topics the topics to create (2 partitions per). + * Get the topics. + * @return the topics. */ - public EmbeddedKafkaBroker(int count, boolean controlledShutdown, String... topics) { - this(count, controlledShutdown, 2, topics); + Set getTopics(); + + @Override + default void destroy() { } - /** - * Create embedded Kafka brokers listening on random ports. - * @param count the number of brokers. - * @param controlledShutdown passed into TestUtils.createBrokerConfig. - * @param partitions partitions per topic. - * @param topics the topics to create. - */ - public EmbeddedKafkaBroker(int count, boolean controlledShutdown, int partitions, String... topics) { - this.count = count; - this.kafkaPorts = new int[this.count]; // random ports by default. - this.controlledShutdown = controlledShutdown; - if (topics != null) { - this.topics = new HashSet<>(Arrays.asList(topics)); - } - else { - this.topics = new HashSet<>(); - } - this.partitionsPerTopic = partitions; + @Override + default void afterPropertiesSet() { } /** @@ -188,671 +79,104 @@ public EmbeddedKafkaBroker(int count, boolean controlledShutdown, int partitions * @return this for chaining configuration. * @see KafkaConfig */ - public EmbeddedKafkaBroker brokerProperties(Map properties) { - this.brokerProperties.putAll(properties); - return this; - } - - /** - * Specify a broker property. - * @param property the property name. - * @param value the value. - * @return the {@link EmbeddedKafkaBroker}. - */ - public EmbeddedKafkaBroker brokerProperty(String property, Object value) { - this.brokerProperties.put(property, value); - return this; - } - - /** - * Set explicit ports on which the kafka brokers will listen. Useful when running an - * embedded broker that you want to access from other processes. - * @param ports the ports. - * @return the {@link EmbeddedKafkaBroker}. - */ - public EmbeddedKafkaBroker kafkaPorts(int... ports) { - Assert.isTrue(ports.length == this.count, "A port must be provided for each instance [" - + this.count + "], provided: " + Arrays.toString(ports) + ", use 0 for a random port"); - this.kafkaPorts = Arrays.copyOf(ports, ports.length); - return this; - } + EmbeddedKafkaBroker brokerProperties(Map properties); /** * Set the system property with this name to the list of broker addresses. * Defaults to {@code spring.kafka.bootstrap-servers} for Spring Boot - * compatibility, since 3.0.10. + * compatibility. * @param brokerListProperty the brokerListProperty to set * @return this broker. - * @since 2.3 - */ - public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) { - this.brokerListProperty = brokerListProperty; - return this; - } - - /** - * Set an explicit port for the embedded Zookeeper. - * @param port the port. - * @return the {@link EmbeddedKafkaBroker}. - * @since 2.3 */ - public EmbeddedKafkaBroker zkPort(int port) { - this.zkPort = port; - return this; - } - - /** - * Get the port that the embedded Zookeeper is running on or will run on. - * @return the port. - * @since 2.3 - */ - public int getZkPort() { - return this.zookeeper != null ? this.zookeeper.getPort() : this.zkPort; - } - - /** - * Set the port to run the embedded Zookeeper on (default random). - * @param zkPort the port. - * @since 2.3 - */ - public void setZkPort(int zkPort) { - this.zkPort = zkPort; - } - - /** - * Set the timeout in seconds for admin operations (e.g. topic creation, close). - * @param adminTimeout the timeout. - * @return the {@link EmbeddedKafkaBroker} - * @since 2.8.5 - */ - public EmbeddedKafkaBroker adminTimeout(int adminTimeout) { - this.adminTimeout = Duration.ofSeconds(adminTimeout); - return this; - } + EmbeddedKafkaBroker brokerListProperty(String brokerListProperty); /** - * Set the timeout in seconds for admin operations (e.g. topic creation, close). - * Default 10 seconds. - * @param adminTimeout the timeout. - * @since 2.2 + * Get the bootstrap server addresses as a String. + * @return the bootstrap servers. */ - public void setAdminTimeout(int adminTimeout) { - this.adminTimeout = Duration.ofSeconds(adminTimeout); - } - - /** - * Set connection timeout for the client to the embedded Zookeeper. - * @param zkConnectionTimeout the connection timeout, - * @return the {@link EmbeddedKafkaBroker}. - * @since 2.4 - */ - public synchronized EmbeddedKafkaBroker zkConnectionTimeout(int zkConnectionTimeout) { - this.zkConnectionTimeout = zkConnectionTimeout; - return this; - } - - /** - * Set session timeout for the client to the embedded Zookeeper. - * @param zkSessionTimeout the session timeout. - * @return the {@link EmbeddedKafkaBroker}. - * @since 2.4 - */ - public synchronized EmbeddedKafkaBroker zkSessionTimeout(int zkSessionTimeout) { - this.zkSessionTimeout = zkSessionTimeout; - return this; - } - - @Override - public void afterPropertiesSet() { - overrideExitMethods(); - try { - this.zookeeper = new EmbeddedZookeeper(this.zkPort); - } - catch (IOException | InterruptedException e) { - throw new IllegalStateException("Failed to create embedded Zookeeper", e); - } - this.zkConnect = LOOPBACK + ":" + this.zookeeper.getPort(); - this.kafkaServers.clear(); - boolean userLogDir = this.brokerProperties.get(KafkaConfig.LogDirProp()) != null && this.count == 1; - for (int i = 0; i < this.count; i++) { - Properties brokerConfigProperties = createBrokerProperties(i); - brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000"); - brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000"); - brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1"); - brokerConfigProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(), - String.valueOf(Long.MAX_VALUE)); - this.brokerProperties.forEach(brokerConfigProperties::put); - if (!this.brokerProperties.containsKey(KafkaConfig.NumPartitionsProp())) { - brokerConfigProperties.setProperty(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic); - } - if (!userLogDir) { - logDir(brokerConfigProperties); - } - KafkaServer server = TestUtils.createServer(new KafkaConfig(brokerConfigProperties), Time.SYSTEM); - this.kafkaServers.add(server); - if (this.kafkaPorts[i] == 0) { - this.kafkaPorts[i] = TestUtils.boundPort(server, SecurityProtocol.PLAINTEXT); - } - } - createKafkaTopics(this.topics); - if (this.brokerListProperty == null) { - this.brokerListProperty = System.getProperty(BROKER_LIST_PROPERTY); - } - if (this.brokerListProperty != null) { - System.setProperty(this.brokerListProperty, getBrokersAsString()); - } - System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString()); - System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString()); - } - - private void logDir(Properties brokerConfigProperties) { - try { - brokerConfigProperties.put(KafkaConfig.LogDirProp(), - Files.createTempDirectory("spring.kafka." + UUID.randomUUID()).toString()); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private void overrideExitMethods() { - String exitMsg = "Exit.%s(%d, %s) called"; - Exit.setExitProcedure((statusCode, message) -> { - if (logger.isDebugEnabled()) { - logger.debug(new RuntimeException(), String.format(exitMsg, "exit", statusCode, message)); - } - else { - logger.warn(String.format(exitMsg, "exit", statusCode, message)); - } - }); - Exit.setHaltProcedure((statusCode, message) -> { - if (logger.isDebugEnabled()) { - logger.debug(new RuntimeException(), String.format(exitMsg, "halt", statusCode, message)); - } - else { - logger.warn(String.format(exitMsg, "halt", statusCode, message)); - } - }); - } - - private Properties createBrokerProperties(int i) { - return TestUtils.createBrokerConfig(i, this.zkConnect, this.controlledShutdown, - true, this.kafkaPorts[i], - scala.Option.apply(null), - scala.Option.apply(null), - scala.Option.apply(null), - true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1, false, - this.partitionsPerTopic, (short) this.count, false); - } + String getBrokersAsString(); /** * Add topics to the existing broker(s) using the configured number of partitions. * The broker(s) must be running. * @param topicsToAdd the topics. */ - public void addTopics(String... topicsToAdd) { - Assert.notNull(this.zookeeper, BROKER_NEEDED); - HashSet set = new HashSet<>(Arrays.asList(topicsToAdd)); - createKafkaTopics(set); - this.topics.addAll(set); - } + void addTopics(String... topicsToAdd); /** * Add topics to the existing broker(s). * The broker(s) must be running. * @param topicsToAdd the topics. - * @since 2.2 */ - public void addTopics(NewTopic... topicsToAdd) { - Assert.notNull(this.zookeeper, BROKER_NEEDED); - for (NewTopic topic : topicsToAdd) { - Assert.isTrue(this.topics.add(topic.name()), () -> "topic already exists: " + topic); - Assert.isTrue(topic.replicationFactor() <= this.count - && (topic.replicasAssignments() == null - || topic.replicasAssignments().size() <= this.count), - () -> "Embedded kafka does not support the requested replication factor: " + topic); - } - - doWithAdmin(admin -> createTopics(admin, Arrays.asList(topicsToAdd))); - } + void addTopics(NewTopic... topicsToAdd); /** - * Create topics in the existing broker(s) using the configured number of partitions. - * @param topicsToCreate the topics. - */ - private void createKafkaTopics(Set topicsToCreate) { - doWithAdmin(admin -> { - createTopics(admin, - topicsToCreate.stream() - .map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count)) - .collect(Collectors.toList())); - }); - } - - private void createTopics(AdminClient admin, List newTopics) { - CreateTopicsResult createTopics = admin.createTopics(newTopics); - try { - createTopics.all().get(this.adminTimeout.getSeconds(), TimeUnit.SECONDS); - } - catch (Exception e) { - throw new KafkaException(e); - } - } - - /** - * Add topics to the existing broker(s) using the configured number of partitions. + * Add topics to the existing broker(s) and returning a map of results. * The broker(s) must be running. * @param topicsToAdd the topics. * @return the results; null values indicate success. - * @since 2.5.4 */ - public Map addTopicsWithResults(String... topicsToAdd) { - Assert.notNull(this.zookeeper, BROKER_NEEDED); - HashSet set = new HashSet<>(Arrays.asList(topicsToAdd)); - this.topics.addAll(set); - return createKafkaTopicsWithResults(set); - } + Map addTopicsWithResults(NewTopic... topicsToAdd); /** - * Add topics to the existing broker(s) and returning a map of results. + * Add topics to the existing broker(s) using the configured number of partitions. * The broker(s) must be running. * @param topicsToAdd the topics. * @return the results; null values indicate success. - * @since 2.5.4 - */ - public Map addTopicsWithResults(NewTopic... topicsToAdd) { - Assert.notNull(this.zookeeper, BROKER_NEEDED); - for (NewTopic topic : topicsToAdd) { - Assert.isTrue(this.topics.add(topic.name()), () -> "topic already exists: " + topic); - Assert.isTrue(topic.replicationFactor() <= this.count - && (topic.replicasAssignments() == null - || topic.replicasAssignments().size() <= this.count), - () -> "Embedded kafka does not support the requested replication factor: " + topic); - } - - return doWithAdminFunction(admin -> createTopicsWithResults(admin, Arrays.asList(topicsToAdd))); - } - - /** - * Create topics in the existing broker(s) using the configured number of partitions - * and returning a map of results. - * @param topicsToCreate the topics. - * @return the results; null values indicate success. - * @since 2.5.4 */ - private Map createKafkaTopicsWithResults(Set topicsToCreate) { - return doWithAdminFunction(admin -> { - return createTopicsWithResults(admin, - topicsToCreate.stream() - .map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count)) - .collect(Collectors.toList())); - }); - } - - private Map createTopicsWithResults(AdminClient admin, List newTopics) { - CreateTopicsResult createTopics = admin.createTopics(newTopics); - Map results = new HashMap<>(); - createTopics.values() - .entrySet() - .stream() - .map(entry -> { - Exception result; - try { - entry.getValue().get(this.adminTimeout.getSeconds(), TimeUnit.SECONDS); - result = null; - } - catch (InterruptedException | ExecutionException | TimeoutException e) { - result = e; - } - return new SimpleEntry<>(entry.getKey(), result); - }) - .forEach(entry -> results.put(entry.getKey(), entry.getValue())); - return results; - } - - /** - * Create an {@link AdminClient}; invoke the callback and reliably close the admin. - * @param callback the callback. - */ - public void doWithAdmin(java.util.function.Consumer callback) { - Map adminConfigs = new HashMap<>(); - adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString()); - try (AdminClient admin = AdminClient.create(adminConfigs)) { - callback.accept(admin); - } - } - - /** - * Create an {@link AdminClient}; invoke the callback and reliably close the admin. - * @param callback the callback. - * @param the function return type. - * @return a map of results. - * @since 2.5.4 - */ - public T doWithAdminFunction(Function callback) { - Map adminConfigs = new HashMap<>(); - adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString()); - try (AdminClient admin = AdminClient.create(adminConfigs)) { - return callback.apply(admin); - } - } - - @Override - public void destroy() { - System.getProperties().remove(this.brokerListProperty); - System.getProperties().remove(SPRING_EMBEDDED_KAFKA_BROKERS); - System.getProperties().remove(SPRING_EMBEDDED_ZOOKEEPER_CONNECT); - for (KafkaServer kafkaServer : this.kafkaServers) { - try { - if (brokerRunning(kafkaServer)) { - kafkaServer.shutdown(); - kafkaServer.awaitShutdown(); - } - } - catch (Exception e) { - // do nothing - } - try { - CoreUtils.delete(kafkaServer.config().logDirs()); - } - catch (Exception e) { - // do nothing - } - } - synchronized (this) { - if (this.zooKeeperClient != null) { - this.zooKeeperClient.close(); - } - } - try { - this.zookeeper.shutdown(); - this.zkConnect = null; - } - catch (Exception e) { - // do nothing - } - } - - private boolean brokerRunning(KafkaServer kafkaServer) { - return !kafkaServer.brokerState().equals(BrokerState.NOT_RUNNING); - } - - public Set getTopics() { - return new HashSet<>(this.topics); - } - - public List getKafkaServers() { - return this.kafkaServers; - } - - public KafkaServer getKafkaServer(int id) { - return this.kafkaServers.get(id); - } - - public EmbeddedZookeeper getZookeeper() { - return this.zookeeper; - } + Map addTopicsWithResults(String... topicsToAdd); /** - * Return the ZooKeeperClient. - * @return the client. - * @since 2.3.2 - */ - public synchronized ZooKeeperClient getZooKeeperClient() { - if (this.zooKeeperClient == null) { - this.zooKeeperClient = new ZooKeeperClient(this.zkConnect, zkSessionTimeout, zkConnectionTimeout, - 1, Time.SYSTEM, "embeddedKafkaZK", "embeddedKafkaZK", new ZKClientConfig(), "embeddedKafkaZK"); - } - return this.zooKeeperClient; - } - - public String getZookeeperConnectionString() { - return this.zkConnect; - } - - public BrokerAddress getBrokerAddress(int i) { - KafkaServer kafkaServer = this.kafkaServers.get(i); - return new BrokerAddress(LOOPBACK, kafkaServer.config().listeners().apply(0).port()); - } - - public BrokerAddress[] getBrokerAddresses() { - List addresses = new ArrayList(); - for (int kafkaPort : this.kafkaPorts) { - addresses.add(new BrokerAddress(LOOPBACK, kafkaPort)); - } - return addresses.toArray(new BrokerAddress[0]); - } - - public int getPartitionsPerTopic() { - return this.partitionsPerTopic; - } - - public void bounce(BrokerAddress brokerAddress) { - for (KafkaServer kafkaServer : getKafkaServers()) { - EndPoint endpoint = kafkaServer.config().listeners().apply(0); - if (brokerAddress.equals(new BrokerAddress(endpoint.host(), endpoint.port()))) { - kafkaServer.shutdown(); - kafkaServer.awaitShutdown(); - } - } - } - - public void restart(final int index) throws Exception { //NOSONAR - - // retry restarting repeatedly, first attempts may fail - - SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(10, // NOSONAR magic # - Collections.singletonMap(Exception.class, true)); - - ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); - backOffPolicy.setInitialInterval(100); // NOSONAR magic # - backOffPolicy.setMaxInterval(1000); // NOSONAR magic # - backOffPolicy.setMultiplier(2); // NOSONAR magic # - - RetryTemplate retryTemplate = new RetryTemplate(); - retryTemplate.setRetryPolicy(retryPolicy); - retryTemplate.setBackOffPolicy(backOffPolicy); - - - retryTemplate.execute(context -> { - this.kafkaServers.get(index).startup(); - return null; - }); - } - - public String getBrokersAsString() { - StringBuilder builder = new StringBuilder(); - for (BrokerAddress brokerAddress : getBrokerAddresses()) { - builder.append(brokerAddress.toString()).append(','); - } - return builder.substring(0, builder.length() - 1); - } - - /** - * Subscribe a consumer to all the embedded topics. + * Subscribe a consumer to one or more of the embedded topics. * @param consumer the consumer. + * @param topicsToConsume the topics. + * @param seekToEnd true to seek to the end instead of the beginning. + * @throws IllegalStateException if you attempt to consume from a topic that is not in + * the list of embedded topics. */ - public void consumeFromAllEmbeddedTopics(Consumer consumer) { - consumeFromEmbeddedTopics(consumer, this.topics.toArray(new String[0])); - } + void consumeFromEmbeddedTopics(Consumer consumer, boolean seekToEnd, String... topicsToConsume); /** - * Subscribe a consumer to all the embedded topics. - * @param seekToEnd true to seek to the end instead of the beginning. + * Subscribe a consumer to one or more of the embedded topics. * @param consumer the consumer. - * @since 2.8.2 + * @param topicsToConsume the topics. + * @throws IllegalStateException if you attempt to consume from a topic that is not in + * the list of embedded topics. */ - public void consumeFromAllEmbeddedTopics(Consumer consumer, boolean seekToEnd) { - consumeFromEmbeddedTopics(consumer, seekToEnd, this.topics.toArray(new String[0])); - } + void consumeFromEmbeddedTopics(Consumer consumer, String... topicsToConsume); /** * Subscribe a consumer to one of the embedded topics. * @param consumer the consumer. + * @param seekToEnd true to seek to the end instead of the beginning. * @param topic the topic. */ - public void consumeFromAnEmbeddedTopic(Consumer consumer, String topic) { - consumeFromEmbeddedTopics(consumer, topic); - } + void consumeFromAnEmbeddedTopic(Consumer consumer, boolean seekToEnd, String topic); /** * Subscribe a consumer to one of the embedded topics. * @param consumer the consumer. - * @param seekToEnd true to seek to the end instead of the beginning. * @param topic the topic. - * @since 2.8.2 */ - public void consumeFromAnEmbeddedTopic(Consumer consumer, boolean seekToEnd, String topic) { - consumeFromEmbeddedTopics(consumer, seekToEnd, topic); - } + void consumeFromAnEmbeddedTopic(Consumer consumer, String topic); /** - * Subscribe a consumer to one or more of the embedded topics. + * Subscribe a consumer to all the embedded topics. + * @param seekToEnd true to seek to the end instead of the beginning. * @param consumer the consumer. - * @param topicsToConsume the topics. - * @throws IllegalStateException if you attempt to consume from a topic that is not in - * the list of embedded topics (since 2.3.4). */ - public void consumeFromEmbeddedTopics(Consumer consumer, String... topicsToConsume) { - consumeFromEmbeddedTopics(consumer, false, topicsToConsume); - } + void consumeFromAllEmbeddedTopics(Consumer consumer, boolean seekToEnd); /** - * Subscribe a consumer to one or more of the embedded topics. + * Subscribe a consumer to all the embedded topics. * @param consumer the consumer. - * @param topicsToConsume the topics. - * @param seekToEnd true to seek to the end instead of the beginning. - * @throws IllegalStateException if you attempt to consume from a topic that is not in - * the list of embedded topics. - * @since 2.8.2 */ - public void consumeFromEmbeddedTopics(Consumer consumer, boolean seekToEnd, String... topicsToConsume) { - List notEmbedded = Arrays.stream(topicsToConsume) - .filter(topic -> !this.topics.contains(topic)) - .collect(Collectors.toList()); - if (notEmbedded.size() > 0) { - throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list"); - } - final AtomicReference> assigned = new AtomicReference<>(); - consumer.subscribe(Arrays.asList(topicsToConsume), new ConsumerRebalanceListener() { - - @Override - public void onPartitionsRevoked(Collection partitions) { - } - - @Override - public void onPartitionsAssigned(Collection partitions) { - assigned.set(partitions); - logger.debug(() -> "partitions assigned: " + partitions); - } - - }); - int n = 0; - while (assigned.get() == null && n++ < 600) { // NOSONAR magic # - consumer.poll(Duration.ofMillis(100)); // force assignment NOSONAR magic # - } - if (assigned.get() != null) { - logger.debug(() -> "Partitions assigned " - + assigned.get() - + "; re-seeking to " - + (seekToEnd ? "end; " : "beginning")); - if (seekToEnd) { - consumer.seekToEnd(assigned.get()); - } - else { - consumer.seekToBeginning(assigned.get()); - } - } - else { - throw new IllegalStateException("Failed to be assigned partitions from the embedded topics"); - } - logger.debug("Subscription Initiated"); - } + void consumeFromAllEmbeddedTopics(Consumer consumer); /** - * Ported from scala to allow setting the port. - * - * @author Gary Russell - * @since 2.3 + * Get the configured number of partitions per topic. + * @return the partition count. */ - public static final class EmbeddedZookeeper { - - private static final int THREE_K = 3000; - - private static final int HUNDRED = 100; - - private static final int TICK_TIME = 800; // allow a maxSessionTimeout of 20 * 800ms = 16 secs - - private final NIOServerCnxnFactory factory; - - private final ZooKeeperServer zookeeper; - - private final int port; - - private final File snapshotDir; - - private final File logDir; - - public EmbeddedZookeeper(int zkPort) throws IOException, InterruptedException { - this.snapshotDir = TestUtils.tempDir(); - this.logDir = TestUtils.tempDir(); - System.setProperty("zookeeper.forceSync", "no"); // disable fsync to ZK txn - // log in tests to avoid - // timeout - this.zookeeper = new ZooKeeperServer(this.snapshotDir, this.logDir, TICK_TIME); - this.factory = new NIOServerCnxnFactory(); - InetSocketAddress addr = new InetSocketAddress(LOOPBACK, zkPort == 0 ? TestUtils.RandomPort() : zkPort); - this.factory.configure(addr, 0); - this.factory.startup(zookeeper); - this.port = zookeeper.getClientPort(); - } - - public int getPort() { - return this.port; - } - - public File getSnapshotDir() { - return this.snapshotDir; - } - - public File getLogDir() { - return this.logDir; - } - - public void shutdown() throws IOException { - // Also shuts down ZooKeeperServer - try { - this.factory.shutdown(); - } - catch (Exception e) { - logger.error(e, "ZK shutdown failed"); - } - - int n = 0; - while (n++ < HUNDRED) { - try { - ZkFourLetterWords.sendStat(LOOPBACK, this.port, THREE_K); - Thread.sleep(HUNDRED); - } - catch (@SuppressWarnings("unused") Exception e) { - break; - } - } - if (n == HUNDRED) { - logger.debug("Zookeeper failed to stop"); - } - - try { - this.zookeeper.getZKDatabase().close(); - } - catch (Exception e) { - logger.error(e, "ZK db close failed"); - } - - Utils.delete(this.logDir); - Utils.delete(this.snapshotDir); - } - - } + int getPartitionsPerTopic(); } diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java new file mode 100644 index 0000000000..16980f62f5 --- /dev/null +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -0,0 +1,566 @@ +/* + * Copyright 2018-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.time.Duration; +import java.util.AbstractMap.SimpleEntry; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import org.springframework.core.log.LogAccessor; +import org.springframework.util.Assert; + +import kafka.server.KafkaConfig; +import kafka.testkit.KafkaClusterTestKit; +import kafka.testkit.TestKitNodes; + +/** + * An embedded Kafka Broker(s) using KRaft. + * This class is intended to be used in the unit tests. + * + * @author Marius Bogoevici + * @author Artem Bilan + * @author Gary Russell + * @author Kamill Sokol + * @author Elliot Kennedy + * @author Nakul Mishra + * @author Pawel Lozinski + * @author Adrian Chlebosz + * + * @since 3.1 + */ +public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker { + + private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(EmbeddedKafkaKraftBroker.class)); + + /** + * Set the value of this property to a property name that should be set to the list of + * embedded broker addresses instead of {@value #SPRING_EMBEDDED_KAFKA_BROKERS}. + */ + public static final String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property"; + + public static final int DEFAULT_ADMIN_TIMEOUT = 10; + + private final int count; + + private final Set topics; + + private final int partitionsPerTopic; + + private final Properties brokerProperties = new Properties(); + + private KafkaClusterTestKit cluster; + + private int[] kafkaPorts; + + private Duration adminTimeout = Duration.ofSeconds(DEFAULT_ADMIN_TIMEOUT); + + private String brokerListProperty = "spring.kafka.bootstrap-servers"; + + /** + * Create embedded Kafka brokers listening on random ports. + * @param count the number of brokers. + * @param partitions partitions per topic. + * @param topics the topics to create. + */ + public EmbeddedKafkaKraftBroker(int count, int partitions, String... topics) { + this.count = count; + this.kafkaPorts = new int[this.count]; // random ports by default. + if (topics != null) { + this.topics = new HashSet<>(Arrays.asList(topics)); + } + else { + this.topics = new HashSet<>(); + } + this.partitionsPerTopic = partitions; + } + + /** + * Specify the properties to configure Kafka Broker before start, e.g. + * {@code auto.create.topics.enable}, {@code transaction.state.log.replication.factor} etc. + * @param properties the properties to use for configuring Kafka Broker(s). + * @return this for chaining configuration. + * @see KafkaConfig + */ + @Override + public EmbeddedKafkaBroker brokerProperties(Map properties) { + this.brokerProperties.putAll(properties); + return this; + } + + /** + * Specify a broker property. + * @param property the property name. + * @param value the value. + * @return the {@link EmbeddedKafkaKraftBroker}. + */ + public EmbeddedKafkaBroker brokerProperty(String property, Object value) { + this.brokerProperties.put(property, value); + return this; + } + + /** + * Set explicit ports on which the kafka brokers will listen. Useful when running an + * embedded broker that you want to access from other processes. + * @param ports the ports. + * @return the {@link EmbeddedKafkaKraftBroker}. + */ + @Override + public EmbeddedKafkaKraftBroker kafkaPorts(int... ports) { + Assert.isTrue(ports.length == this.count, "A port must be provided for each instance [" + + this.count + "], provided: " + Arrays.toString(ports) + ", use 0 for a random port"); + this.kafkaPorts = Arrays.copyOf(ports, ports.length); + return this; + } + + /** + * Set the system property with this name to the list of broker addresses. + * @param brokerListProperty the brokerListProperty to set + * @return this broker. + * @since 2.3 + */ + @Override + public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) { + this.brokerListProperty = brokerListProperty; + return this; + } + + /** + * Set the timeout in seconds for admin operations (e.g. topic creation, close). + * @param adminTimeout the timeout. + * @return the {@link EmbeddedKafkaKraftBroker} + * @since 2.8.5 + */ + public EmbeddedKafkaBroker adminTimeout(int adminTimeout) { + this.adminTimeout = Duration.ofSeconds(adminTimeout); + return this; + } + + /** + * Set the timeout in seconds for admin operations (e.g. topic creation, close). + * Default 10 seconds. + * @param adminTimeout the timeout. + * @since 2.2 + */ + public void setAdminTimeout(int adminTimeout) { + this.adminTimeout = Duration.ofSeconds(adminTimeout); + } + + @Override + public void afterPropertiesSet() { + overrideExitMethods(); + addDefaultBrokerPropsIfAbsent(this.brokerProperties, this.count); + start(); + } + + + private void start() { + if (this.cluster != null) { + return; + } + try { + KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder() + .setCombined(true) + .setNumBrokerNodes(this.count) + .setNumControllerNodes(this.count) + .build()); + this.brokerProperties.forEach((k, v) -> clusterBuilder.setConfigProp((String) k, (String) v)); + this.cluster = clusterBuilder.build(); + } + catch (Exception ex) { + throw new IllegalStateException("Failed to create embedded cluster", ex); + } + + try { + this.cluster.format(); + this.cluster.startup(); + this.cluster.waitForReadyBrokers(); + } + catch (Exception ex) { + throw new IllegalStateException("Failed to start test Kafka cluster", ex); + } + + createKafkaTopics(this.topics); + if (this.brokerListProperty == null) { + this.brokerListProperty = System.getProperty(BROKER_LIST_PROPERTY); + } + if (this.brokerListProperty != null) { + System.setProperty(this.brokerListProperty, getBrokersAsString()); + } + System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString()); + System.setProperty(this.brokerListProperty, getBrokersAsString()); + } + + @Override + public void destroy() { + AtomicReference shutdownFailure = new AtomicReference<>(); + Utils.closeQuietly(cluster, "embedded Kafka cluster", shutdownFailure); + if (shutdownFailure.get() != null) { + throw new IllegalStateException("Failed to shut down embedded Kafka cluster", shutdownFailure.get()); + } + this.cluster = null; + } + + private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int numBrokers) { + brokerConfig.putIfAbsent(KafkaConfig.DeleteTopicEnableProp(), "true"); + brokerConfig.putIfAbsent(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0"); + brokerConfig.putIfAbsent(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(numBrokers)); + } + + private void logDir(Properties brokerConfigProperties) { + try { + brokerConfigProperties.put(KafkaConfig.LogDirProp(), + Files.createTempDirectory("spring.kafka." + UUID.randomUUID()).toString()); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private void overrideExitMethods() { + String exitMsg = "Exit.%s(%d, %s) called"; + Exit.setExitProcedure((statusCode, message) -> { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(new RuntimeException(), String.format(exitMsg, "exit", statusCode, message)); + } + else { + LOGGER.warn(String.format(exitMsg, "exit", statusCode, message)); + } + }); + Exit.setHaltProcedure((statusCode, message) -> { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(new RuntimeException(), String.format(exitMsg, "halt", statusCode, message)); + } + else { + LOGGER.warn(String.format(exitMsg, "halt", statusCode, message)); + } + }); + } + + /** + * Add topics to the existing broker(s) using the configured number of partitions. + * The broker(s) must be running. + * @param topicsToAdd the topics. + */ + @Override + public void addTopics(String... topicsToAdd) { + Assert.notNull(this.cluster, BROKER_NEEDED); + HashSet set = new HashSet<>(Arrays.asList(topicsToAdd)); + createKafkaTopics(set); + this.topics.addAll(set); + } + + /** + * Add topics to the existing broker(s). + * The broker(s) must be running. + * @param topicsToAdd the topics. + * @since 2.2 + */ + @Override + public void addTopics(NewTopic... topicsToAdd) { + Assert.notNull(this.cluster, BROKER_NEEDED); + for (NewTopic topic : topicsToAdd) { + Assert.isTrue(this.topics.add(topic.name()), () -> "topic already exists: " + topic); + Assert.isTrue(topic.replicationFactor() <= this.count + && (topic.replicasAssignments() == null + || topic.replicasAssignments().size() <= this.count), + () -> "Embedded kafka does not support the requested replication factor: " + topic); + } + + doWithAdmin(admin -> createTopics(admin, Arrays.asList(topicsToAdd))); + } + + /** + * Create topics in the existing broker(s) using the configured number of partitions. + * @param topicsToCreate the topics. + */ + private void createKafkaTopics(Set topicsToCreate) { + doWithAdmin(admin -> { + createTopics(admin, + topicsToCreate.stream() + .map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count)) + .collect(Collectors.toList())); + }); + } + + private void createTopics(AdminClient admin, List newTopics) { + CreateTopicsResult createTopics = admin.createTopics(newTopics); + try { + createTopics.all().get(this.adminTimeout.getSeconds(), TimeUnit.SECONDS); + } + catch (Exception e) { + throw new KafkaException(e); + } + } + + /** + * Add topics to the existing broker(s) using the configured number of partitions. + * The broker(s) must be running. + * @param topicsToAdd the topics. + * @return the results; null values indicate success. + * @since 2.5.4 + */ + @Override + public Map addTopicsWithResults(String... topicsToAdd) { + Assert.notNull(this.cluster, BROKER_NEEDED); + HashSet set = new HashSet<>(Arrays.asList(topicsToAdd)); + this.topics.addAll(set); + return createKafkaTopicsWithResults(set); + } + + /** + * Add topics to the existing broker(s) and returning a map of results. + * The broker(s) must be running. + * @param topicsToAdd the topics. + * @return the results; null values indicate success. + * @since 2.5.4 + */ + @Override + public Map addTopicsWithResults(NewTopic... topicsToAdd) { + Assert.notNull(this.cluster, BROKER_NEEDED); + for (NewTopic topic : topicsToAdd) { + Assert.isTrue(this.topics.add(topic.name()), () -> "topic already exists: " + topic); + Assert.isTrue(topic.replicationFactor() <= this.count + && (topic.replicasAssignments() == null + || topic.replicasAssignments().size() <= this.count), + () -> "Embedded kafka does not support the requested replication factor: " + topic); + } + + return doWithAdminFunction(admin -> createTopicsWithResults(admin, Arrays.asList(topicsToAdd))); + } + + /** + * Create topics in the existing broker(s) using the configured number of partitions + * and returning a map of results. + * @param topicsToCreate the topics. + * @return the results; null values indicate success. + * @since 2.5.4 + */ + private Map createKafkaTopicsWithResults(Set topicsToCreate) { + return doWithAdminFunction(admin -> { + return createTopicsWithResults(admin, + topicsToCreate.stream() + .map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count)) + .collect(Collectors.toList())); + }); + } + + private Map createTopicsWithResults(AdminClient admin, List newTopics) { + CreateTopicsResult createTopics = admin.createTopics(newTopics); + Map results = new HashMap<>(); + createTopics.values() + .entrySet() + .stream() + .map(entry -> { + Exception result; + try { + entry.getValue().get(this.adminTimeout.getSeconds(), TimeUnit.SECONDS); + result = null; + } + catch (InterruptedException | ExecutionException | TimeoutException e) { + result = e; + } + return new SimpleEntry<>(entry.getKey(), result); + }) + .forEach(entry -> results.put(entry.getKey(), entry.getValue())); + return results; + } + + /** + * Create an {@link AdminClient}; invoke the callback and reliably close the admin. + * @param callback the callback. + */ + public void doWithAdmin(java.util.function.Consumer callback) { + Map adminConfigs = new HashMap<>(); + adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString()); + try (AdminClient admin = AdminClient.create(adminConfigs)) { + callback.accept(admin); + } + } + + /** + * Create an {@link AdminClient}; invoke the callback and reliably close the admin. + * @param callback the callback. + * @param the function return type. + * @return a map of results. + * @since 2.5.4 + */ + public T doWithAdminFunction(Function callback) { + Map adminConfigs = new HashMap<>(); + adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString()); + try (AdminClient admin = AdminClient.create(adminConfigs)) { + return callback.apply(admin); + } + } + + @Override + public Set getTopics() { + return new HashSet<>(this.topics); + } + + @Override + public int getPartitionsPerTopic() { + return this.partitionsPerTopic; + } + + @Override + public String getBrokersAsString() { + return (String) this.cluster.clientProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + } + + public KafkaClusterTestKit getCluster() { + return this.cluster; + } + + /** + * Subscribe a consumer to all the embedded topics. + * @param consumer the consumer. + */ + @Override + public void consumeFromAllEmbeddedTopics(Consumer consumer) { + consumeFromEmbeddedTopics(consumer, this.topics.toArray(new String[0])); + } + + /** + * Subscribe a consumer to all the embedded topics. + * @param seekToEnd true to seek to the end instead of the beginning. + * @param consumer the consumer. + * @since 2.8.2 + */ + @Override + public void consumeFromAllEmbeddedTopics(Consumer consumer, boolean seekToEnd) { + consumeFromEmbeddedTopics(consumer, seekToEnd, this.topics.toArray(new String[0])); + } + + /** + * Subscribe a consumer to one of the embedded topics. + * @param consumer the consumer. + * @param topic the topic. + */ + @Override + public void consumeFromAnEmbeddedTopic(Consumer consumer, String topic) { + consumeFromEmbeddedTopics(consumer, topic); + } + + /** + * Subscribe a consumer to one of the embedded topics. + * @param consumer the consumer. + * @param seekToEnd true to seek to the end instead of the beginning. + * @param topic the topic. + * @since 2.8.2 + */ + @Override + public void consumeFromAnEmbeddedTopic(Consumer consumer, boolean seekToEnd, String topic) { + consumeFromEmbeddedTopics(consumer, seekToEnd, topic); + } + + /** + * Subscribe a consumer to one or more of the embedded topics. + * @param consumer the consumer. + * @param topicsToConsume the topics. + * @throws IllegalStateException if you attempt to consume from a topic that is not in + * the list of embedded topics (since 2.3.4). + */ + @Override + public void consumeFromEmbeddedTopics(Consumer consumer, String... topicsToConsume) { + consumeFromEmbeddedTopics(consumer, false, topicsToConsume); + } + + /** + * Subscribe a consumer to one or more of the embedded topics. + * @param consumer the consumer. + * @param topicsToConsume the topics. + * @param seekToEnd true to seek to the end instead of the beginning. + * @throws IllegalStateException if you attempt to consume from a topic that is not in + * the list of embedded topics. + * @since 2.8.2 + */ + @Override + public void consumeFromEmbeddedTopics(Consumer consumer, boolean seekToEnd, String... topicsToConsume) { + List notEmbedded = Arrays.stream(topicsToConsume) + .filter(topic -> !this.topics.contains(topic)) + .collect(Collectors.toList()); + if (notEmbedded.size() > 0) { + throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list"); + } + final AtomicReference> assigned = new AtomicReference<>(); + consumer.subscribe(Arrays.asList(topicsToConsume), new ConsumerRebalanceListener() { + + @Override + public void onPartitionsRevoked(Collection partitions) { + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + assigned.set(partitions); + LOGGER.debug(() -> "partitions assigned: " + partitions); + } + + }); + int n = 0; + while (assigned.get() == null && n++ < 600) { // NOSONAR magic # + consumer.poll(Duration.ofMillis(100)); // force assignment NOSONAR magic # + } + if (assigned.get() != null) { + LOGGER.debug(() -> "Partitions assigned " + + assigned.get() + + "; re-seeking to " + + (seekToEnd ? "end; " : "beginning")); + if (seekToEnd) { + consumer.seekToEnd(assigned.get()); + } + else { + consumer.seekToBeginning(assigned.get()); + } + } + else { + throw new IllegalStateException("Failed to be assigned partitions from the embedded topics"); + } + LOGGER.debug("Subscription Initiated"); + } + +} diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java new file mode 100644 index 0000000000..d5192254dd --- /dev/null +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java @@ -0,0 +1,856 @@ +/* + * Copyright 2018-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.test; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.time.Duration; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.BrokerState; +import org.apache.zookeeper.client.ZKClientConfig; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; + +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.test.core.BrokerAddress; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; +import org.springframework.util.Assert; + +import kafka.cluster.EndPoint; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.CoreUtils; +import kafka.utils.TestUtils; +import kafka.zk.ZkFourLetterWords; +import kafka.zookeeper.ZooKeeperClient; + +/** + * An embedded Kafka Broker(s) and Zookeeper manager. + * This class is intended to be used in the unit tests. + * + * @author Marius Bogoevici + * @author Artem Bilan + * @author Gary Russell + * @author Kamill Sokol + * @author Elliot Kennedy + * @author Nakul Mishra + * @author Pawel Lozinski + * @author Adrian Chlebosz + * + * @since 2.2 + */ +public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker { + + private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(EmbeddedKafkaBroker.class)); // NOSONAR + + public static final String SPRING_EMBEDDED_ZOOKEEPER_CONNECT = "spring.embedded.zookeeper.connect"; + + public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000; + + public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = DEFAULT_ZK_SESSION_TIMEOUT; + + private final int count; + + private final boolean controlledShutdown; + + private final Set topics; + + private final int partitionsPerTopic; + + private final List kafkaServers = new ArrayList<>(); + + private final Map brokerProperties = new HashMap<>(); + + private EmbeddedZookeeper zookeeper; + + private String zkConnect; + + private int zkPort; + + private int[] kafkaPorts; + + private Duration adminTimeout = Duration.ofSeconds(DEFAULT_ADMIN_TIMEOUT); + + private int zkConnectionTimeout = DEFAULT_ZK_CONNECTION_TIMEOUT; + + private int zkSessionTimeout = DEFAULT_ZK_SESSION_TIMEOUT; + + private String brokerListProperty = "spring.kafka.bootstrap-servers"; + + private volatile ZooKeeperClient zooKeeperClient; + + public EmbeddedKafkaZKBroker(int count) { + this(count, false); + } + + /** + * Create embedded Kafka brokers. + * @param count the number of brokers. + * @param controlledShutdown passed into TestUtils.createBrokerConfig. + * @param topics the topics to create (2 partitions per). + */ + public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String... topics) { + this(count, controlledShutdown, 2, topics); + } + + /** + * Create embedded Kafka brokers listening on random ports. + * @param count the number of brokers. + * @param controlledShutdown passed into TestUtils.createBrokerConfig. + * @param partitions partitions per topic. + * @param topics the topics to create. + */ + public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, int partitions, String... topics) { + this.count = count; + this.kafkaPorts = new int[this.count]; // random ports by default. + this.controlledShutdown = controlledShutdown; + if (topics != null) { + this.topics = new HashSet<>(Arrays.asList(topics)); + } + else { + this.topics = new HashSet<>(); + } + this.partitionsPerTopic = partitions; + } + + /** + * Specify the properties to configure Kafka Broker before start, e.g. + * {@code auto.create.topics.enable}, {@code transaction.state.log.replication.factor} etc. + * @param properties the properties to use for configuring Kafka Broker(s). + * @return this for chaining configuration. + * @see KafkaConfig + */ + @Override + public EmbeddedKafkaBroker brokerProperties(Map properties) { + this.brokerProperties.putAll(properties); + return this; + } + + /** + * Specify a broker property. + * @param property the property name. + * @param value the value. + * @return the {@link EmbeddedKafkaBroker}. + */ + public EmbeddedKafkaBroker brokerProperty(String property, Object value) { + this.brokerProperties.put(property, value); + return this; + } + + /** + * Set explicit ports on which the kafka brokers will listen. Useful when running an + * embedded broker that you want to access from other processes. + * @param ports the ports. + * @return the {@link EmbeddedKafkaBroker}. + */ + @Override + public EmbeddedKafkaZKBroker kafkaPorts(int... ports) { + Assert.isTrue(ports.length == this.count, "A port must be provided for each instance [" + + this.count + "], provided: " + Arrays.toString(ports) + ", use 0 for a random port"); + this.kafkaPorts = Arrays.copyOf(ports, ports.length); + return this; + } + + /** + * Set the system property with this name to the list of broker addresses. + * Defaults to {@code spring.kafka.bootstrap-servers} for Spring Boot + * compatibility, since 3.0.10. + * @param brokerListProperty the brokerListProperty to set + * @return this broker. + * @since 2.3 + */ + @Override + public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) { + this.brokerListProperty = brokerListProperty; + return this; + } + + /** + * Set an explicit port for the embedded Zookeeper. + * @param port the port. + * @return the {@link EmbeddedKafkaBroker}. + * @since 2.3 + */ + public EmbeddedKafkaZKBroker zkPort(int port) { + this.zkPort = port; + return this; + } + + /** + * Get the port that the embedded Zookeeper is running on or will run on. + * @return the port. + * @since 2.3 + */ + public int getZkPort() { + return this.zookeeper != null ? this.zookeeper.getPort() : this.zkPort; + } + + /** + * Set the port to run the embedded Zookeeper on (default random). + * @param zkPort the port. + * @since 2.3 + */ + public void setZkPort(int zkPort) { + this.zkPort = zkPort; + } + + /** + * Set the timeout in seconds for admin operations (e.g. topic creation, close). + * @param adminTimeout the timeout. + * @return the {@link EmbeddedKafkaBroker} + * @since 2.8.5 + */ + public EmbeddedKafkaBroker adminTimeout(int adminTimeout) { + this.adminTimeout = Duration.ofSeconds(adminTimeout); + return this; + } + + /** + * Set the timeout in seconds for admin operations (e.g. topic creation, close). + * Default 10 seconds. + * @param adminTimeout the timeout. + * @since 2.2 + */ + public void setAdminTimeout(int adminTimeout) { + this.adminTimeout = Duration.ofSeconds(adminTimeout); + } + + /** + * Set connection timeout for the client to the embedded Zookeeper. + * @param zkConnectionTimeout the connection timeout, + * @return the {@link EmbeddedKafkaBroker}. + * @since 2.4 + */ + public synchronized EmbeddedKafkaZKBroker zkConnectionTimeout(int zkConnectionTimeout) { + this.zkConnectionTimeout = zkConnectionTimeout; + return this; + } + + /** + * Set session timeout for the client to the embedded Zookeeper. + * @param zkSessionTimeout the session timeout. + * @return the {@link EmbeddedKafkaBroker}. + * @since 2.4 + */ + public synchronized EmbeddedKafkaZKBroker zkSessionTimeout(int zkSessionTimeout) { + this.zkSessionTimeout = zkSessionTimeout; + return this; + } + + @Override + public void afterPropertiesSet() { + overrideExitMethods(); + try { + this.zookeeper = new EmbeddedZookeeper(this.zkPort); + } + catch (IOException | InterruptedException e) { + throw new IllegalStateException("Failed to create embedded Zookeeper", e); + } + this.zkConnect = LOOPBACK + ":" + this.zookeeper.getPort(); + this.kafkaServers.clear(); + boolean userLogDir = this.brokerProperties.get(KafkaConfig.LogDirProp()) != null && this.count == 1; + for (int i = 0; i < this.count; i++) { + Properties brokerConfigProperties = createBrokerProperties(i); + brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000"); + brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000"); + brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1"); + brokerConfigProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(), + String.valueOf(Long.MAX_VALUE)); + this.brokerProperties.forEach(brokerConfigProperties::put); + if (!this.brokerProperties.containsKey(KafkaConfig.NumPartitionsProp())) { + brokerConfigProperties.setProperty(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic); + } + if (!userLogDir) { + logDir(brokerConfigProperties); + } + KafkaServer server = TestUtils.createServer(new KafkaConfig(brokerConfigProperties), Time.SYSTEM); + this.kafkaServers.add(server); + if (this.kafkaPorts[i] == 0) { + this.kafkaPorts[i] = TestUtils.boundPort(server, SecurityProtocol.PLAINTEXT); + } + } + createKafkaTopics(this.topics); + if (this.brokerListProperty == null) { + this.brokerListProperty = System.getProperty(BROKER_LIST_PROPERTY); + } + if (this.brokerListProperty != null) { + System.setProperty(this.brokerListProperty, getBrokersAsString()); + } + System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString()); + System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString()); + } + + private void logDir(Properties brokerConfigProperties) { + try { + brokerConfigProperties.put(KafkaConfig.LogDirProp(), + Files.createTempDirectory("spring.kafka." + UUID.randomUUID()).toString()); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private void overrideExitMethods() { + String exitMsg = "Exit.%s(%d, %s) called"; + Exit.setExitProcedure((statusCode, message) -> { + if (logger.isDebugEnabled()) { + logger.debug(new RuntimeException(), String.format(exitMsg, "exit", statusCode, message)); + } + else { + logger.warn(String.format(exitMsg, "exit", statusCode, message)); + } + }); + Exit.setHaltProcedure((statusCode, message) -> { + if (logger.isDebugEnabled()) { + logger.debug(new RuntimeException(), String.format(exitMsg, "halt", statusCode, message)); + } + else { + logger.warn(String.format(exitMsg, "halt", statusCode, message)); + } + }); + } + + private Properties createBrokerProperties(int i) { + return TestUtils.createBrokerConfig(i, this.zkConnect, this.controlledShutdown, + true, this.kafkaPorts[i], + scala.Option.apply(null), + scala.Option.apply(null), + scala.Option.apply(null), + true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1, false, + this.partitionsPerTopic, (short) this.count, false); + } + + /** + * Add topics to the existing broker(s) using the configured number of partitions. + * The broker(s) must be running. + * @param topicsToAdd the topics. + */ + @Override + public void addTopics(String... topicsToAdd) { + Assert.notNull(this.zookeeper, BROKER_NEEDED); + HashSet set = new HashSet<>(Arrays.asList(topicsToAdd)); + createKafkaTopics(set); + this.topics.addAll(set); + } + + /** + * Add topics to the existing broker(s). + * The broker(s) must be running. + * @param topicsToAdd the topics. + * @since 2.2 + */ + @Override + public void addTopics(NewTopic... topicsToAdd) { + Assert.notNull(this.zookeeper, BROKER_NEEDED); + for (NewTopic topic : topicsToAdd) { + Assert.isTrue(this.topics.add(topic.name()), () -> "topic already exists: " + topic); + Assert.isTrue(topic.replicationFactor() <= this.count + && (topic.replicasAssignments() == null + || topic.replicasAssignments().size() <= this.count), + () -> "Embedded kafka does not support the requested replication factor: " + topic); + } + + doWithAdmin(admin -> createTopics(admin, Arrays.asList(topicsToAdd))); + } + + /** + * Create topics in the existing broker(s) using the configured number of partitions. + * @param topicsToCreate the topics. + */ + private void createKafkaTopics(Set topicsToCreate) { + doWithAdmin(admin -> { + createTopics(admin, + topicsToCreate.stream() + .map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count)) + .collect(Collectors.toList())); + }); + } + + private void createTopics(AdminClient admin, List newTopics) { + CreateTopicsResult createTopics = admin.createTopics(newTopics); + try { + createTopics.all().get(this.adminTimeout.getSeconds(), TimeUnit.SECONDS); + } + catch (Exception e) { + throw new KafkaException(e); + } + } + + /** + * Add topics to the existing broker(s) using the configured number of partitions. + * The broker(s) must be running. + * @param topicsToAdd the topics. + * @return the results; null values indicate success. + * @since 2.5.4 + */ + @Override + public Map addTopicsWithResults(String... topicsToAdd) { + Assert.notNull(this.zookeeper, BROKER_NEEDED); + HashSet set = new HashSet<>(Arrays.asList(topicsToAdd)); + this.topics.addAll(set); + return createKafkaTopicsWithResults(set); + } + + /** + * Add topics to the existing broker(s) and returning a map of results. + * The broker(s) must be running. + * @param topicsToAdd the topics. + * @return the results; null values indicate success. + * @since 2.5.4 + */ + @Override + public Map addTopicsWithResults(NewTopic... topicsToAdd) { + Assert.notNull(this.zookeeper, BROKER_NEEDED); + for (NewTopic topic : topicsToAdd) { + Assert.isTrue(this.topics.add(topic.name()), () -> "topic already exists: " + topic); + Assert.isTrue(topic.replicationFactor() <= this.count + && (topic.replicasAssignments() == null + || topic.replicasAssignments().size() <= this.count), + () -> "Embedded kafka does not support the requested replication factor: " + topic); + } + + return doWithAdminFunction(admin -> createTopicsWithResults(admin, Arrays.asList(topicsToAdd))); + } + + /** + * Create topics in the existing broker(s) using the configured number of partitions + * and returning a map of results. + * @param topicsToCreate the topics. + * @return the results; null values indicate success. + * @since 2.5.4 + */ + private Map createKafkaTopicsWithResults(Set topicsToCreate) { + return doWithAdminFunction(admin -> { + return createTopicsWithResults(admin, + topicsToCreate.stream() + .map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count)) + .collect(Collectors.toList())); + }); + } + + private Map createTopicsWithResults(AdminClient admin, List newTopics) { + CreateTopicsResult createTopics = admin.createTopics(newTopics); + Map results = new HashMap<>(); + createTopics.values() + .entrySet() + .stream() + .map(entry -> { + Exception result; + try { + entry.getValue().get(this.adminTimeout.getSeconds(), TimeUnit.SECONDS); + result = null; + } + catch (InterruptedException | ExecutionException | TimeoutException e) { + result = e; + } + return new SimpleEntry<>(entry.getKey(), result); + }) + .forEach(entry -> results.put(entry.getKey(), entry.getValue())); + return results; + } + + /** + * Create an {@link AdminClient}; invoke the callback and reliably close the admin. + * @param callback the callback. + */ + public void doWithAdmin(java.util.function.Consumer callback) { + Map adminConfigs = new HashMap<>(); + adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString()); + try (AdminClient admin = AdminClient.create(adminConfigs)) { + callback.accept(admin); + } + } + + /** + * Create an {@link AdminClient}; invoke the callback and reliably close the admin. + * @param callback the callback. + * @param the function return type. + * @return a map of results. + * @since 2.5.4 + */ + public T doWithAdminFunction(Function callback) { + Map adminConfigs = new HashMap<>(); + adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString()); + try (AdminClient admin = AdminClient.create(adminConfigs)) { + return callback.apply(admin); + } + } + + @Override + public void destroy() { + System.getProperties().remove(this.brokerListProperty); + System.getProperties().remove(SPRING_EMBEDDED_KAFKA_BROKERS); + System.getProperties().remove(SPRING_EMBEDDED_ZOOKEEPER_CONNECT); + for (KafkaServer kafkaServer : this.kafkaServers) { + try { + if (brokerRunning(kafkaServer)) { + kafkaServer.shutdown(); + kafkaServer.awaitShutdown(); + } + } + catch (Exception e) { + // do nothing + } + try { + CoreUtils.delete(kafkaServer.config().logDirs()); + } + catch (Exception e) { + // do nothing + } + } + synchronized (this) { + if (this.zooKeeperClient != null) { + this.zooKeeperClient.close(); + } + } + try { + this.zookeeper.shutdown(); + this.zkConnect = null; + } + catch (Exception e) { + // do nothing + } + } + + private boolean brokerRunning(KafkaServer kafkaServer) { + return !kafkaServer.brokerState().equals(BrokerState.NOT_RUNNING); + } + + @Override + public Set getTopics() { + return new HashSet<>(this.topics); + } + + public List getKafkaServers() { + return this.kafkaServers; + } + + public KafkaServer getKafkaServer(int id) { + return this.kafkaServers.get(id); + } + + public EmbeddedZookeeper getZookeeper() { + return this.zookeeper; + } + + /** + * Return the ZooKeeperClient. + * @return the client. + * @since 2.3.2 + */ + public synchronized ZooKeeperClient getZooKeeperClient() { + if (this.zooKeeperClient == null) { + this.zooKeeperClient = new ZooKeeperClient(this.zkConnect, zkSessionTimeout, zkConnectionTimeout, + 1, Time.SYSTEM, "embeddedKafkaZK", "embeddedKafkaZK", new ZKClientConfig(), "embeddedKafkaZK"); + } + return this.zooKeeperClient; + } + + public String getZookeeperConnectionString() { + return this.zkConnect; + } + + public BrokerAddress getBrokerAddress(int i) { + KafkaServer kafkaServer = this.kafkaServers.get(i); + return new BrokerAddress(LOOPBACK, kafkaServer.config().listeners().apply(0).port()); + } + + public BrokerAddress[] getBrokerAddresses() { + List addresses = new ArrayList(); + for (int kafkaPort : this.kafkaPorts) { + addresses.add(new BrokerAddress(LOOPBACK, kafkaPort)); + } + return addresses.toArray(new BrokerAddress[0]); + } + + @Override + public int getPartitionsPerTopic() { + return this.partitionsPerTopic; + } + + public void bounce(BrokerAddress brokerAddress) { + for (KafkaServer kafkaServer : getKafkaServers()) { + EndPoint endpoint = kafkaServer.config().listeners().apply(0); + if (brokerAddress.equals(new BrokerAddress(endpoint.host(), endpoint.port()))) { + kafkaServer.shutdown(); + kafkaServer.awaitShutdown(); + } + } + } + + public void restart(final int index) throws Exception { //NOSONAR + + // retry restarting repeatedly, first attempts may fail + + SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(10, // NOSONAR magic # + Collections.singletonMap(Exception.class, true)); + + ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); + backOffPolicy.setInitialInterval(100); // NOSONAR magic # + backOffPolicy.setMaxInterval(1000); // NOSONAR magic # + backOffPolicy.setMultiplier(2); // NOSONAR magic # + + RetryTemplate retryTemplate = new RetryTemplate(); + retryTemplate.setRetryPolicy(retryPolicy); + retryTemplate.setBackOffPolicy(backOffPolicy); + + + retryTemplate.execute(context -> { + this.kafkaServers.get(index).startup(); + return null; + }); + } + + @Override + public String getBrokersAsString() { + StringBuilder builder = new StringBuilder(); + for (BrokerAddress brokerAddress : getBrokerAddresses()) { + builder.append(brokerAddress.toString()).append(','); + } + return builder.substring(0, builder.length() - 1); + } + + /** + * Subscribe a consumer to all the embedded topics. + * @param consumer the consumer. + */ + @Override + public void consumeFromAllEmbeddedTopics(Consumer consumer) { + consumeFromEmbeddedTopics(consumer, this.topics.toArray(new String[0])); + } + + /** + * Subscribe a consumer to all the embedded topics. + * @param seekToEnd true to seek to the end instead of the beginning. + * @param consumer the consumer. + * @since 2.8.2 + */ + @Override + public void consumeFromAllEmbeddedTopics(Consumer consumer, boolean seekToEnd) { + consumeFromEmbeddedTopics(consumer, seekToEnd, this.topics.toArray(new String[0])); + } + + /** + * Subscribe a consumer to one of the embedded topics. + * @param consumer the consumer. + * @param topic the topic. + */ + @Override + public void consumeFromAnEmbeddedTopic(Consumer consumer, String topic) { + consumeFromEmbeddedTopics(consumer, topic); + } + + /** + * Subscribe a consumer to one of the embedded topics. + * @param consumer the consumer. + * @param seekToEnd true to seek to the end instead of the beginning. + * @param topic the topic. + * @since 2.8.2 + */ + @Override + public void consumeFromAnEmbeddedTopic(Consumer consumer, boolean seekToEnd, String topic) { + consumeFromEmbeddedTopics(consumer, seekToEnd, topic); + } + + /** + * Subscribe a consumer to one or more of the embedded topics. + * @param consumer the consumer. + * @param topicsToConsume the topics. + * @throws IllegalStateException if you attempt to consume from a topic that is not in + * the list of embedded topics (since 2.3.4). + */ + @Override + public void consumeFromEmbeddedTopics(Consumer consumer, String... topicsToConsume) { + consumeFromEmbeddedTopics(consumer, false, topicsToConsume); + } + + /** + * Subscribe a consumer to one or more of the embedded topics. + * @param consumer the consumer. + * @param topicsToConsume the topics. + * @param seekToEnd true to seek to the end instead of the beginning. + * @throws IllegalStateException if you attempt to consume from a topic that is not in + * the list of embedded topics. + * @since 2.8.2 + */ + @Override + public void consumeFromEmbeddedTopics(Consumer consumer, boolean seekToEnd, String... topicsToConsume) { + List notEmbedded = Arrays.stream(topicsToConsume) + .filter(topic -> !this.topics.contains(topic)) + .collect(Collectors.toList()); + if (notEmbedded.size() > 0) { + throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list"); + } + final AtomicReference> assigned = new AtomicReference<>(); + consumer.subscribe(Arrays.asList(topicsToConsume), new ConsumerRebalanceListener() { + + @Override + public void onPartitionsRevoked(Collection partitions) { + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + assigned.set(partitions); + logger.debug(() -> "partitions assigned: " + partitions); + } + + }); + int n = 0; + while (assigned.get() == null && n++ < 600) { // NOSONAR magic # + consumer.poll(Duration.ofMillis(100)); // force assignment NOSONAR magic # + } + if (assigned.get() != null) { + logger.debug(() -> "Partitions assigned " + + assigned.get() + + "; re-seeking to " + + (seekToEnd ? "end; " : "beginning")); + if (seekToEnd) { + consumer.seekToEnd(assigned.get()); + } + else { + consumer.seekToBeginning(assigned.get()); + } + } + else { + throw new IllegalStateException("Failed to be assigned partitions from the embedded topics"); + } + logger.debug("Subscription Initiated"); + } + + /** + * Ported from scala to allow setting the port. + * + * @author Gary Russell + * @since 2.3 + */ + public static final class EmbeddedZookeeper { + + private static final int THREE_K = 3000; + + private static final int HUNDRED = 100; + + private static final int TICK_TIME = 800; // allow a maxSessionTimeout of 20 * 800ms = 16 secs + + private final NIOServerCnxnFactory factory; + + private final ZooKeeperServer zookeeper; + + private final int port; + + private final File snapshotDir; + + private final File logDir; + + public EmbeddedZookeeper(int zkPort) throws IOException, InterruptedException { + this.snapshotDir = TestUtils.tempDir(); + this.logDir = TestUtils.tempDir(); + System.setProperty("zookeeper.forceSync", "no"); // disable fsync to ZK txn + // log in tests to avoid + // timeout + this.zookeeper = new ZooKeeperServer(this.snapshotDir, this.logDir, TICK_TIME); + this.factory = new NIOServerCnxnFactory(); + InetSocketAddress addr = new InetSocketAddress(LOOPBACK, zkPort == 0 ? TestUtils.RandomPort() : zkPort); + this.factory.configure(addr, 0); + this.factory.startup(zookeeper); + this.port = zookeeper.getClientPort(); + } + + public int getPort() { + return this.port; + } + + public File getSnapshotDir() { + return this.snapshotDir; + } + + public File getLogDir() { + return this.logDir; + } + + public void shutdown() throws IOException { + // Also shuts down ZooKeeperServer + try { + this.factory.shutdown(); + } + catch (Exception e) { + logger.error(e, "ZK shutdown failed"); + } + + int n = 0; + while (n++ < HUNDRED) { + try { + ZkFourLetterWords.sendStat(LOOPBACK, this.port, THREE_K); + Thread.sleep(HUNDRED); + } + catch (@SuppressWarnings("unused") Exception e) { + break; + } + } + if (n == HUNDRED) { + logger.debug("Zookeeper failed to stop"); + } + + try { + this.zookeeper.getZKDatabase().close(); + } + catch (Exception e) { + logger.error(e, "ZK db close failed"); + } + + Utils.delete(this.logDir); + Utils.delete(this.snapshotDir); + } + + } + +} diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java index a9cc666633..706d84dc69 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,6 +40,8 @@ import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaKraftBroker; +import org.springframework.kafka.test.EmbeddedKafkaZKBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.util.Assert; @@ -121,15 +123,14 @@ private boolean springTestContext(AnnotatedElement annotatedElement) { @SuppressWarnings("unchecked") private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) { - EmbeddedKafkaBroker broker; int[] ports = setupPorts(embedded); - broker = new EmbeddedKafkaBroker(embedded.count(), embedded.controlledShutdown(), - embedded.partitions(), embedded.topics()) - .zkPort(embedded.zookeeperPort()) - .kafkaPorts(ports) - .zkConnectionTimeout(embedded.zkConnectionTimeout()) - .zkSessionTimeout(embedded.zkSessionTimeout()) - .adminTimeout(embedded.adminTimeout()); + EmbeddedKafkaBroker broker; + if (embedded.kraft()) { + broker = kraftBroker(embedded, ports); + } + else { + broker = zkBroker(embedded, ports); + } Properties properties = new Properties(); for (String pair : embedded.brokerProperties()) { @@ -170,6 +171,22 @@ private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) { return broker; } + private EmbeddedKafkaBroker kraftBroker(EmbeddedKafka embedded, int[] ports) { + return new EmbeddedKafkaKraftBroker(embedded.count(), embedded.partitions(), embedded.topics()) + .kafkaPorts(ports) + .adminTimeout(embedded.adminTimeout()); + } + + private EmbeddedKafkaBroker zkBroker(EmbeddedKafka embedded, int[] ports) { + return new EmbeddedKafkaZKBroker(embedded.count(), embedded.controlledShutdown(), + embedded.partitions(), embedded.topics()) + .zkPort(embedded.zookeeperPort()) + .kafkaPorts(ports) + .zkConnectionTimeout(embedded.zkConnectionTimeout()) + .zkSessionTimeout(embedded.zkSessionTimeout()) + .adminTimeout(embedded.adminTimeout()); + } + private int[] setupPorts(EmbeddedKafka embedded) { int[] ports = embedded.ports(); if (embedded.count() > 1 && ports.length == 1 && ports[0] == 0) { diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java index ef239f1c35..907467c7b5 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java @@ -27,6 +27,7 @@ import org.springframework.core.annotation.AliasFor; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaZKBroker; import org.springframework.kafka.test.condition.EmbeddedKafkaCondition; /** @@ -165,14 +166,14 @@ * @return default {@link EmbeddedKafkaBroker#DEFAULT_ZK_CONNECTION_TIMEOUT}. * @since 2.4 */ - int zkConnectionTimeout() default EmbeddedKafkaBroker.DEFAULT_ZK_CONNECTION_TIMEOUT; + int zkConnectionTimeout() default EmbeddedKafkaZKBroker.DEFAULT_ZK_CONNECTION_TIMEOUT; /** * Timeout for internal ZK client session. * @return default {@link EmbeddedKafkaBroker#DEFAULT_ZK_SESSION_TIMEOUT}. * @since 2.4 */ - int zkSessionTimeout() default EmbeddedKafkaBroker.DEFAULT_ZK_SESSION_TIMEOUT; + int zkSessionTimeout() default EmbeddedKafkaZKBroker.DEFAULT_ZK_SESSION_TIMEOUT; /** * Timeout in seconds for admin operations (e.g. topic creation, close). @@ -181,5 +182,12 @@ */ int adminTimeout() default EmbeddedKafkaBroker.DEFAULT_ADMIN_TIMEOUT; + /** + * Use KRaft instead of Zookeeper; default true. + * @return whether to use KRaft. + * @since 3.6 + */ + boolean kraft() default true; + } diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java index 5bf6f7d578..ed8d152f9a 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,8 @@ import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.io.Resource; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaKraftBroker; +import org.springframework.kafka.test.EmbeddedKafkaZKBroker; import org.springframework.test.context.ContextCustomizer; import org.springframework.test.context.MergedContextConfiguration; import org.springframework.util.Assert; @@ -68,14 +70,23 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte .toArray(String[]::new); int[] ports = setupPorts(); - EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(this.embeddedKafka.count(), - this.embeddedKafka.controlledShutdown(), + EmbeddedKafkaBroker embeddedKafkaBroker; + if (this.embeddedKafka.kraft()) { + embeddedKafkaBroker = new EmbeddedKafkaKraftBroker(this.embeddedKafka.count(), this.embeddedKafka.partitions(), topics) - .kafkaPorts(ports) - .zkPort(this.embeddedKafka.zookeeperPort()) - .zkConnectionTimeout(this.embeddedKafka.zkConnectionTimeout()) - .zkSessionTimeout(this.embeddedKafka.zkSessionTimeout()); + .kafkaPorts(ports); + } + else { + embeddedKafkaBroker = new EmbeddedKafkaZKBroker(this.embeddedKafka.count(), + this.embeddedKafka.controlledShutdown(), + this.embeddedKafka.partitions(), + topics) + .kafkaPorts(ports) + .zkPort(this.embeddedKafka.zookeeperPort()) + .zkConnectionTimeout(this.embeddedKafka.zkConnectionTimeout()) + .zkSessionTimeout(this.embeddedKafka.zkSessionTimeout()); + } Properties properties = new Properties(); diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListener.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListener.java index 1c5956f422..0e9bf06385 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListener.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListener.java @@ -30,6 +30,8 @@ import org.springframework.core.io.Resource; import org.springframework.core.io.support.PropertiesLoaderUtils; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaKraftBroker; +import org.springframework.kafka.test.EmbeddedKafkaZKBroker; import org.springframework.util.StringUtils; /** @@ -77,6 +79,11 @@ public class GlobalEmbeddedKafkaTestExecutionListener implements TestExecutionLi */ public static final String PARTITIONS_PROPERTY_NAME = "spring.kafka.embedded.partitions"; + /** + * The number of partitions on topics to create on the embedded broker(s). + */ + public static final String KRAFT_PROPERTY_NAME = "spring.kafka.embedded.kraft"; + /** * The location for a properties file with Kafka broker configuration. */ @@ -115,11 +122,18 @@ public void testPlanExecutionStarted(TestPlan testPlan) { int[] ports = configurationParameters.get(PORTS_PROPERTY_NAME, this::ports) .orElse(new int[count]); + boolean kraft = configurationParameters.getBoolean(KRAFT_PROPERTY_NAME).orElse(true); - this.embeddedKafkaBroker = - new EmbeddedKafkaBroker(count, false, partitions, topics) - .brokerProperties(brokerProperties) - .kafkaPorts(ports); + if (kraft) { + this.embeddedKafkaBroker = new EmbeddedKafkaKraftBroker(count, partitions, topics) + .brokerProperties(brokerProperties) + .kafkaPorts(ports); + } + else { + this.embeddedKafkaBroker = new EmbeddedKafkaZKBroker(count, false, partitions, topics) + .brokerProperties(brokerProperties) + .kafkaPorts(ports); + } if (brokerListProperty != null) { this.embeddedKafkaBroker.brokerListProperty(brokerListProperty); } diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/EmbeddedKafkaRule.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/EmbeddedKafkaRule.java index 55c27662e2..0c3e6c9f7e 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/EmbeddedKafkaRule.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/EmbeddedKafkaRule.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import org.junit.rules.ExternalResource; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaZKBroker; /** * A {@link org.junit.rules.TestRule} wrapper around an {@link EmbeddedKafkaBroker}. @@ -33,7 +34,7 @@ */ public class EmbeddedKafkaRule extends ExternalResource { - private final EmbeddedKafkaBroker embeddedKafka; + private final EmbeddedKafkaZKBroker embeddedKafka; public EmbeddedKafkaRule(int count) { this(count, false); @@ -57,7 +58,7 @@ public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics * @param topics the topics to create. */ public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { - this.embeddedKafka = new EmbeddedKafkaBroker(count, controlledShutdown, partitions, topics); + this.embeddedKafka = new EmbeddedKafkaZKBroker(count, controlledShutdown, partitions, topics); } /** diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java new file mode 100644 index 0000000000..86766caa2e --- /dev/null +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java @@ -0,0 +1,40 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.test; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +import org.springframework.util.StringUtils; + +/** + * @author Gary Russell + * @since 3.1 + * + */ +public class EmbeddedKafkaKraftBrokerTests { + + @Test + void testUpDown() { + EmbeddedKafkaKraftBroker kafka = new EmbeddedKafkaKraftBroker(1, 1, "topic1"); + kafka.afterPropertiesSet(); + assertThat(StringUtils.hasText(kafka.getBrokersAsString())).isTrue(); + kafka.destroy(); + } + +} diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java similarity index 93% rename from spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaBrokerTests.java rename to spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java index 458f4e4253..eaf16d9a1c 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java @@ -25,11 +25,11 @@ * @since 2.3 * */ -public class EmbeddedKafkaBrokerTests { +public class EmbeddedKafkaZKBrokerTests { @Test void testUpDown() { - EmbeddedKafkaBroker kafka = new EmbeddedKafkaBroker(1); + EmbeddedKafkaZKBroker kafka = new EmbeddedKafkaZKBroker(1); kafka.brokerListProperty("foo.bar"); kafka.afterPropertiesSet(); assertThat(kafka.getZookeeperConnectionString()).startsWith("127"); diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/condition/EmbeddedKafkaConditionTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/condition/EmbeddedKafkaConditionTests.java index df8a99c356..27b5a93a8f 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/condition/EmbeddedKafkaConditionTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/condition/EmbeddedKafkaConditionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,8 @@ * @since 2.3 * */ -@EmbeddedKafka(bootstrapServersProperty = "my.bss.property", count = 2, controlledShutdown = true, partitions = 3) +@EmbeddedKafka(bootstrapServersProperty = "my.bss.property", count = 2, controlledShutdown = true, partitions = 3, + kraft = false) public class EmbeddedKafkaConditionTests { @Test diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java index 3757476669..297584a7ad 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,22 +103,22 @@ void testMulti() { } - @EmbeddedKafka + @EmbeddedKafka(kraft = false) private class TestWithEmbeddedKafka { } - @EmbeddedKafka + @EmbeddedKafka(kraft = false) private class SecondTestWithEmbeddedKafka { } - @EmbeddedKafka(ports = 8085, bootstrapServersProperty = "my.bss.prop") + @EmbeddedKafka(kraft = false, ports = 8085, bootstrapServersProperty = "my.bss.prop") private class TestWithEmbeddedKafkaPorts { } - @EmbeddedKafka(count = 2) + @EmbeddedKafka(kraft = false, count = 2) private class TestWithEmbeddedKafkaMulti { } diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java index 6c4803bfc0..bb75e0715d 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaZKBroker; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -58,16 +58,16 @@ public class AddressableEmbeddedBrokerTests { private Config config; @Autowired - private EmbeddedKafkaBroker broker; + private EmbeddedKafkaZKBroker broker; @Test public void testKafkaEmbedded() { assertThat(broker.getBrokersAsString()).isEqualTo("127.0.0.1:" + this.config.kafkaPort); assertThat(broker.getZkPort()).isEqualTo(this.config.zkPort); assertThat(broker.getBrokersAsString()) - .isEqualTo(System.getProperty(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS)); + .isEqualTo(System.getProperty(EmbeddedKafkaZKBroker.SPRING_EMBEDDED_KAFKA_BROKERS)); assertThat(broker.getZookeeperConnectionString()) - .isEqualTo(System.getProperty(EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT)); + .isEqualTo(System.getProperty(EmbeddedKafkaZKBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT)); } @Test @@ -98,7 +98,7 @@ public static class Config { private int zkPort; @Bean - public EmbeddedKafkaBroker broker() throws IOException { + public EmbeddedKafkaZKBroker broker() throws IOException { ServerSocket ss = ServerSocketFactory.getDefault().createServerSocket(0); this.kafkaPort = ss.getLocalPort(); ss.close(); @@ -106,7 +106,7 @@ public EmbeddedKafkaBroker broker() throws IOException { this.zkPort = ss.getLocalPort(); ss.close(); - return new EmbeddedKafkaBroker(1, true, TEST_EMBEDDED) + return new EmbeddedKafkaZKBroker(1, true, TEST_EMBEDDED) .zkPort(this.zkPort) .kafkaPorts(this.kafkaPort); } diff --git a/spring-kafka-test/src/test/resources/junit-platform.properties b/spring-kafka-test/src/test/resources/junit-platform.properties index 603c9ee2cb..0cf0bdd0f2 100644 --- a/spring-kafka-test/src/test/resources/junit-platform.properties +++ b/spring-kafka-test/src/test/resources/junit-platform.properties @@ -1,2 +1,3 @@ spring.kafka.embedded.count=2 spring.kafka.embedded.topics=topic1,topic2 +spring.kafka.embedded.kraft=false diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java index ee8101aca2..e719213169 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaZKBroker; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.stereotype.Component; import org.springframework.test.annotation.DirtiesContext; @@ -134,7 +135,7 @@ public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() { @Bean public EmbeddedKafkaBroker embeddedKafka() { - return new EmbeddedKafkaBroker(1, true, "alias.tests"); + return new EmbeddedKafkaZKBroker(1, true, "alias.tests"); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java index 1c8e32cb5b..3201621f23 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java @@ -61,6 +61,7 @@ import org.springframework.kafka.support.converter.ConversionException; import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaZKBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.Message; @@ -80,11 +81,14 @@ */ @SpringJUnitConfig @DirtiesContext -@EmbeddedKafka(partitions = 1, topics = { "blc1", "blc2", "blc3", "blc4", "blc5", "blc6", "blc6.DLT" }) +@EmbeddedKafka(kraft = false, partitions = 1, topics = { "blc1", "blc2", "blc3", "blc4", "blc5", "blc6", "blc6.DLT" }) public class BatchListenerConversionTests { private static final String DEFAULT_TEST_GROUP_ID = "blc"; + @Autowired + private EmbeddedKafkaBroker embeddedKafka; + @Autowired private Config config; @@ -99,6 +103,7 @@ public class BatchListenerConversionTests { @Test public void testBatchOfPojos(@Autowired KafkaListenerEndpointRegistry registry) throws Exception { + assertThat(this.embeddedKafka).isInstanceOf(EmbeddedKafkaZKBroker.class); assertThat(registry.getListenerContainerIds()).contains("blc1.id", "blc2.id"); doTest(this.listener1, "blc1"); doTest(this.listener2, "blc2"); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index daaccb72a0..9741aa103c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -134,6 +134,7 @@ import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaKraftBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.lang.NonNull; @@ -258,6 +259,7 @@ public class EnableKafkaIntegrationTests { @Test public void testAnonymous() { + assertThat(this.embeddedKafka).isInstanceOf(EmbeddedKafkaKraftBroker.class); MessageListenerContainer container = this.registry .getListenerContainer("org.springframework.kafka.KafkaListenerEndpointContainer#0"); List containers = KafkaTestUtils.getPropertyValue(container, "containers", List.class); @@ -917,7 +919,7 @@ public void testAddingTopics() { consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testMultiReplying"); ConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); Consumer consumer = cf.createConsumer(); - assertThat(consumer.partitionsFor("morePartitions")).hasSize(10); + await().untilAsserted(() -> assertThat(consumer.partitionsFor("morePartitions")).hasSize(10)); consumer.close(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminBadContextTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminBadContextTests.java index 35e6c458ee..bdf18ceb87 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminBadContextTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminBadContextTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaZKBroker; /** * @author Gary Russell @@ -49,7 +50,7 @@ public static class BadConfig { @Bean public EmbeddedKafkaBroker kafkaEmbedded() { - return new EmbeddedKafkaBroker(1); + return new EmbeddedKafkaZKBroker(1); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java index def0fceaff..1323b6272f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java @@ -54,10 +54,10 @@ import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.core.KafkaAdmin.NewTopics; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaZKBroker; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import org.springframework.util.ReflectionUtils; -import org.springframework.util.StringUtils; /** * @author Gary Russell @@ -263,7 +263,7 @@ public static class Config { @Bean public EmbeddedKafkaBroker kafkaEmbedded() { - return new EmbeddedKafkaBroker(3) + return new EmbeddedKafkaZKBroker(3) .brokerProperty("default.replication.factor", 2); } @@ -271,8 +271,7 @@ public EmbeddedKafkaBroker kafkaEmbedded() { public KafkaAdmin admin() { Map configs = new HashMap<>(); KafkaAdmin admin = new KafkaAdmin(configs); - admin.setBootstrapServersSupplier(() -> - StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses())); + admin.setBootstrapServersSupplier(() -> kafkaEmbedded().getBrokersAsString()); admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreate")); return admin; } @@ -280,8 +279,7 @@ public KafkaAdmin admin() { @Bean public AdminClient adminClient() { Map configs = new HashMap<>(); - configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, - StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses())); + configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString()); return AdminClient.create(configs); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java index ea6d8e604d..558bcac1e5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java @@ -52,6 +52,7 @@ import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.SerializationUtils; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaZKBroker; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -164,7 +165,7 @@ public void listen2(ConsumerRecord record) { @Bean public EmbeddedKafkaBroker embeddedKafka() { - return new EmbeddedKafkaBroker(1, true, 1, TOPIC); + return new EmbeddedKafkaZKBroker(1, true, 1, TOPIC); } @Bean diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java index 2988c589a4..327ab60f71 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -82,11 +82,11 @@ public class ExistingRetryTopicIntegrationTests { public final static String MAIN_TOPIC_WITH_NO_PARTITION_INFO = "main-topic-1"; - public final static String RETRY_TOPIC_WITH_NO_PARTITION_INFO = "main-topic-1-retry-0"; + public final static String RETRY_TOPIC_WITH_NO_PARTITION_INFO = "main-topic-1-retry-1"; public final static String MAIN_TOPIC_WITH_PARTITION_INFO = "main-topic-2"; - public final static String RETRY_TOPIC_WITH_PARTITION_INFO = "main-topic-2-retry-0"; + public final static String RETRY_TOPIC_WITH_PARTITION_INFO = "main-topic-2-retry-1"; private final static String MAIN_TOPIC_CONTAINER_FACTORY = "kafkaListenerContainerFactory"; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationManualAssignmentIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationManualAssignmentIntegrationTests.java index 3461834d54..1c282bbdbf 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationManualAssignmentIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationManualAssignmentIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,9 @@ package org.springframework.kafka.retrytopic; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -71,17 +73,24 @@ void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFact @Autowired KafkaTemplate template, @Autowired Config config) throws InterruptedException { Consumer consumer = cf.createConsumer("grp2", ""); - Map> topics = consumer.listTopics(); - assertThat(topics.keySet()).contains("RetryTopicConfigurationManualAssignmentIntegrationTests.1", - "RetryTopicConfigurationManualAssignmentIntegrationTests.1-dlt", - "RetryTopicConfigurationManualAssignmentIntegrationTests.1-retry-100", - "RetryTopicConfigurationManualAssignmentIntegrationTests.1-retry-110", - "RetryTopicConfigurationManualAssignmentIntegrationTests.2", - "RetryTopicConfigurationManualAssignmentIntegrationTests.2-dlt", - "RetryTopicConfigurationManualAssignmentIntegrationTests.2-retry-100", - "RetryTopicConfigurationManualAssignmentIntegrationTests.2-retry-110"); - template.send(TOPIC1, "foo"); - assertThat(config.latch.await(120, TimeUnit.SECONDS)).isTrue(); + try { + await().untilAsserted(() -> { + Map> topics = consumer.listTopics(); + assertThat(topics.keySet()).contains("RetryTopicConfigurationManualAssignmentIntegrationTests.1", + "RetryTopicConfigurationManualAssignmentIntegrationTests.1-dlt", + "RetryTopicConfigurationManualAssignmentIntegrationTests.1-retry-100", + "RetryTopicConfigurationManualAssignmentIntegrationTests.1-retry-110", + "RetryTopicConfigurationManualAssignmentIntegrationTests.2", + "RetryTopicConfigurationManualAssignmentIntegrationTests.2-dlt", + "RetryTopicConfigurationManualAssignmentIntegrationTests.2-retry-100", + "RetryTopicConfigurationManualAssignmentIntegrationTests.2-retry-110"); + }); + template.send(TOPIC1, "foo"); + assertThat(config.latch.await(120, TimeUnit.SECONDS)).isTrue(); + } + finally { + consumer.close(Duration.ofSeconds(10)); + } } @Configuration(proxyBeanMethods = false) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java index c449037c77..d28d506cfc 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,12 +68,15 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.support.serializer.JsonSerde; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaKraftBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import kafka.server.BrokerServer; + /** * @author Artem Bilan * @author Marius Bogoevici @@ -111,7 +114,7 @@ public class KafkaStreamsTests { private StreamsBuilderFactoryBean streamsBuilderFactoryBean; @Autowired - private EmbeddedKafkaBroker embeddedKafka; + private EmbeddedKafkaKraftBroker embeddedKafka; @Value("${streaming.topic.two}") private String streamingTopic2; @@ -122,9 +125,9 @@ public class KafkaStreamsTests { @SuppressWarnings("deprecation") @Test public void testKStreams() throws Exception { - assertThat(this.embeddedKafka.getKafkaServer(0).config().autoCreateTopicsEnable()).isFalse(); - assertThat(this.embeddedKafka.getKafkaServer(0).config().deleteTopicEnable()).isTrue(); - assertThat(this.embeddedKafka.getKafkaServer(0).config().brokerId()).isEqualTo(2); + BrokerServer broker = this.embeddedKafka.getCluster().brokers().values().stream().findFirst().orElseThrow(); + assertThat(broker.config().autoCreateTopicsEnable()).isFalse(); + assertThat(broker.config().deleteTopicEnable()).isTrue(); this.streamsBuilderFactoryBean.stop(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationIntegrationTests.java index 6bb0ad24f4..ce3935b38a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaZKBroker; import org.springframework.kafka.test.utils.KafkaTestUtils; import io.micrometer.common.KeyValues; @@ -121,7 +122,7 @@ public static class Config { @Bean EmbeddedKafkaBroker broker() { - return new EmbeddedKafkaBroker(1, true, 1, "int.observation.testT1", "int.observation.testT2"); + return new EmbeddedKafkaZKBroker(1, true, 1, "int.observation.testT1", "int.observation.testT2"); } @Bean diff --git a/spring-kafka/src/test/resources/broker.properties b/spring-kafka/src/test/resources/broker.properties index 840afec249..c8eb2eefac 100644 --- a/spring-kafka/src/test/resources/broker.properties +++ b/spring-kafka/src/test/resources/broker.properties @@ -1,2 +1 @@ delete.topic.enable = false -broker.id = ${broker.id:2}