Skip to content

Commit

Permalink
GH-3658: Fix @EmbeddedKafka for adminTimeout resolution (#3664)
Browse files Browse the repository at this point in the history
Fixes: #3658
Issue link: #3658

When `@EmbeddedKafka` is used with Spring context, the `adminTimeout` is not resolved.
Apparently when `adminTimeout` was introduced, it was covered only by the `EmbeddedKafkaCondition`.

* Extract `EmbeddedKafkaBrokerFactory` to encapsulate an `EmbeddedKafkaBroker` creation logic
(including the mentioned `adminTimeout`)
* Replace the logic in the `EmbeddedKafkaCondition` and `EmbeddedKafkaContextCustomizer`
with that new `EmbeddedKafkaBrokerFactory`, essentially, introducing a single place of truth.
* Pull `adminTimeout(int)` property to the `EmbeddedKafkaBroker` interface,
making the logic in the `EmbeddedKafkaBrokerFactory` simpler
* Add `adminTimeout` attribute verification into tests for condition, as well as Spring-based

**Auto-cherry-pick to `3.2.x`**
  • Loading branch information
artembilan authored Dec 12, 2024
1 parent fe71001 commit 0db6b81
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ default void afterPropertiesSet() {
*/
EmbeddedKafkaBroker brokerListProperty(String brokerListProperty);

/**
* Set the timeout in seconds for admin operations (e.g. topic creation, close).
* @param adminTimeout the timeout.
* @return the {@link EmbeddedKafkaBroker}
* @since 2.8.5
*/
EmbeddedKafkaBroker adminTimeout(int adminTimeout);

/**
* Get the bootstrap server addresses as a String.
* @return the bootstrap servers.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.test;

import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;

import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.util.StringUtils;

/**
* The factory to encapsulate an {@link EmbeddedKafkaBroker} creation logic.
*
* @author Artem Bilan
*
* @since 3.2.6
*/
public final class EmbeddedKafkaBrokerFactory {

private static final String TRANSACTION_STATE_LOG_REPLICATION_FACTOR = "transaction.state.log.replication.factor";

/**
* Create an {@link EmbeddedKafkaBroker} based on the {@code EmbeddedKafka} annotation.
* @param embeddedKafka the {@code EmbeddedKafka} annotation.
* @return a new {@link EmbeddedKafkaBroker} instance.
*/
public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka) {
return create(embeddedKafka, Function.identity());
}

/**
* Create an {@link EmbeddedKafkaBroker} based on the {@code EmbeddedKafka} annotation.
* @param embeddedKafka the {@code EmbeddedKafka} annotation.
* @param propertyResolver the {@link Function} for placeholders in the annotation attributes.
* @return a new {@link EmbeddedKafkaBroker} instance.
*/
@SuppressWarnings("unchecked")
public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka, Function<String, String> propertyResolver) {
String[] topics =
Arrays.stream(embeddedKafka.topics())
.map(propertyResolver)
.toArray(String[]::new);

EmbeddedKafkaBroker embeddedKafkaBroker;
if (embeddedKafka.kraft()) {
embeddedKafkaBroker = kraftBroker(embeddedKafka, topics);
}
else {
embeddedKafkaBroker = zkBroker(embeddedKafka, topics);
}
int[] ports = setupPorts(embeddedKafka);

embeddedKafkaBroker.kafkaPorts(ports)
.adminTimeout(embeddedKafka.adminTimeout());

Properties properties = new Properties();

for (String pair : embeddedKafka.brokerProperties()) {
if (!StringUtils.hasText(pair)) {
continue;
}
try {
properties.load(new StringReader(propertyResolver.apply(pair)));
}
catch (Exception ex) {
throw new IllegalStateException("Failed to load broker property from [" + pair + "]", ex);
}
}

String brokerPropertiesLocation = embeddedKafka.brokerPropertiesLocation();
if (StringUtils.hasText(brokerPropertiesLocation)) {
String propertiesLocation = propertyResolver.apply(brokerPropertiesLocation);
Resource propertiesResource = new PathMatchingResourcePatternResolver().getResource(propertiesLocation);
if (!propertiesResource.exists()) {
throw new IllegalStateException(
"Failed to load broker properties from [" + propertiesResource + "]: resource does not exist.");
}
try (InputStream in = propertiesResource.getInputStream()) {
Properties p = new Properties();
p.load(in);
p.forEach((key, value) -> properties.putIfAbsent(key, propertyResolver.apply((String) value)));
}
catch (IOException ex) {
throw new IllegalStateException("Failed to load broker properties from [" + propertiesResource + "]", ex);
}
}

properties.putIfAbsent(TRANSACTION_STATE_LOG_REPLICATION_FACTOR,
String.valueOf(Math.min(3, embeddedKafka.count())));

embeddedKafkaBroker.brokerProperties((Map<String, String>) (Map<?, ?>) properties);
String bootstrapServersProperty = embeddedKafka.bootstrapServersProperty();
if (StringUtils.hasText(bootstrapServersProperty)) {
embeddedKafkaBroker.brokerListProperty(bootstrapServersProperty);
}

// Safe to start an embedded broker eagerly before context refresh
embeddedKafkaBroker.afterPropertiesSet();

return embeddedKafkaBroker;
}

private static int[] setupPorts(EmbeddedKafka embedded) {
int[] ports = embedded.ports();
if (embedded.count() > 1 && ports.length == 1 && ports[0] == 0) {
ports = new int[embedded.count()];
}
return ports;
}

private static EmbeddedKafkaBroker kraftBroker(EmbeddedKafka embedded, String[] topics) {
return new EmbeddedKafkaKraftBroker(embedded.count(), embedded.partitions(), topics);
}

private static EmbeddedKafkaBroker zkBroker(EmbeddedKafka embedded, String[] topics) {
return new EmbeddedKafkaZKBroker(embedded.count(), embedded.controlledShutdown(), embedded.partitions(), topics)
.zkPort(embedded.zookeeperPort())
.zkConnectionTimeout(embedded.zkConnectionTimeout())
.zkSessionTimeout(embedded.zkSessionTimeout());
}

private EmbeddedKafkaBrokerFactory() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,7 @@ public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) {
return this;
}

/**
* Set the timeout in seconds for admin operations (e.g. topic creation, close).
* @param adminTimeout the timeout.
* @return the {@link EmbeddedKafkaKraftBroker}
* @since 2.8.5
*/
@Override
public EmbeddedKafkaBroker adminTimeout(int adminTimeout) {
this.adminTimeout = Duration.ofSeconds(adminTimeout);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,7 @@ public void setZkPort(int zkPort) {
this.zkPort = zkPort;
}

/**
* Set the timeout in seconds for admin operations (e.g. topic creation, close).
* @param adminTimeout the timeout.
* @return the {@link EmbeddedKafkaBroker}
* @since 2.8.5
*/
@Override
public EmbeddedKafkaBroker adminTimeout(int adminTimeout) {
this.adminTimeout = Duration.ofSeconds(adminTimeout);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,9 @@

package org.springframework.kafka.test.condition;

import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.lang.reflect.AnnotatedElement;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.ConditionEvaluationResult;
Expand All @@ -37,15 +32,11 @@
import org.junit.jupiter.api.extension.ParameterResolver;

import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;
import org.springframework.kafka.test.EmbeddedKafkaZKBroker;
import org.springframework.kafka.test.EmbeddedKafkaBrokerFactory;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
* JUnit5 condition for an embedded broker.
Expand Down Expand Up @@ -117,89 +108,22 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con
private boolean springTestContext(AnnotatedElement annotatedElement) {
return AnnotatedElementUtils.findAllMergedAnnotations(annotatedElement, ExtendWith.class)
.stream()
.filter(extended -> Arrays.asList(extended.value()).contains(SpringExtension.class))
.findFirst()
.isPresent();
.map(ExtendWith::value)
.flatMap(Arrays::stream)
.anyMatch(SpringExtension.class::isAssignableFrom);
}

@SuppressWarnings("unchecked")
private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
int[] ports = setupPorts(embedded);
EmbeddedKafkaBroker broker;
if (embedded.kraft()) {
broker = kraftBroker(embedded, ports);
}
else {
broker = zkBroker(embedded, ports);
}
Properties properties = new Properties();

for (String pair : embedded.brokerProperties()) {
if (!StringUtils.hasText(pair)) {
continue;
}
try {
properties.load(new StringReader(pair));
}
catch (Exception ex) {
throw new IllegalStateException("Failed to load broker property from [" + pair + "]",
ex);
}
}
if (StringUtils.hasText(embedded.brokerPropertiesLocation())) {
Resource propertiesResource = new PathMatchingResourcePatternResolver()
.getResource(embedded.brokerPropertiesLocation());
if (!propertiesResource.exists()) {
throw new IllegalStateException(
"Failed to load broker properties from [" + propertiesResource
+ "]: resource does not exist.");
}
try (InputStream in = propertiesResource.getInputStream()) {
Properties p = new Properties();
p.load(in);
p.forEach(properties::putIfAbsent);
}
catch (IOException ex) {
throw new IllegalStateException(
"Failed to load broker properties from [" + propertiesResource + "]", ex);
}
}
broker.brokerProperties((Map<String, String>) (Map<?, ?>) properties);
if (StringUtils.hasText(embedded.bootstrapServersProperty())) {
broker.brokerListProperty(embedded.bootstrapServersProperty());
}
broker.afterPropertiesSet();
return broker;
}

private EmbeddedKafkaBroker kraftBroker(EmbeddedKafka embedded, int[] ports) {
return new EmbeddedKafkaKraftBroker(embedded.count(), embedded.partitions(), embedded.topics())
.kafkaPorts(ports)
.adminTimeout(embedded.adminTimeout());
}

private EmbeddedKafkaBroker zkBroker(EmbeddedKafka embedded, int[] ports) {
return new EmbeddedKafkaZKBroker(embedded.count(), embedded.controlledShutdown(),
embedded.partitions(), embedded.topics())
.zkPort(embedded.zookeeperPort())
.kafkaPorts(ports)
.zkConnectionTimeout(embedded.zkConnectionTimeout())
.zkSessionTimeout(embedded.zkSessionTimeout())
.adminTimeout(embedded.adminTimeout());
}

private int[] setupPorts(EmbeddedKafka embedded) {
int[] ports = embedded.ports();
if (embedded.count() > 1 && ports.length == 1 && ports[0] == 0) {
ports = new int[embedded.count()];
}
return ports;
return EmbeddedKafkaBrokerFactory.create(embedded);
}

private EmbeddedKafkaBroker getBrokerFromStore(ExtensionContext context) {
return getParentStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class) == null
EmbeddedKafkaBroker embeddedKafkaBrokerFromParentStore =
getParentStore(context)
.get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class);
return embeddedKafkaBrokerFromParentStore == null
? getStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class)
: getParentStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class);
: embeddedKafkaBrokerFromParentStore;
}

private Store getStore(ExtensionContext context) {
Expand Down
Loading

0 comments on commit 0db6b81

Please sign in to comment.