Skip to content

Commit

Permalink
No Issue : Refactor test codes and Remove useless full package name
Browse files Browse the repository at this point in the history
* Refactor test codes and remove useless full package names.

* Remove useless comments.

* Fixes lint error.

* Modify wrong description.
  • Loading branch information
chickenchickenlove authored Oct 9, 2024
1 parent 3f45fc0 commit 6ce3442
Show file tree
Hide file tree
Showing 17 changed files with 89 additions and 110 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.core.log.LogAccessor;
Expand All @@ -57,6 +58,7 @@
* @author Gary Russell
* @author Hugo Wood
* @author Artem Bilan
* @author Sanghyeok An
*/
public final class KafkaTestUtils {

Expand All @@ -82,6 +84,17 @@ public static Map<String, Object> consumerProps(String group, String autoCommit,
return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
}

/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param brokers the bootstrapServers property.
* @param group the group id.
* @return the properties.
* @since 3.3
*/
public static Map<String, Object> consumerProps(String brokers, String group) {
return consumerProps(brokers, group, "false");
}

/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
Expand Down Expand Up @@ -128,6 +141,20 @@ public static Map<String, Object> producerProps(String brokers) {
return props;
}

/**
* Set up test properties for the Kafka Streams.
* @param applicationId the applicationId for the Kafka Streams.
* @param brokers the bootstrapServers property.
* @return the properties.
* @since 3.3
*/
public static Map<String, Object> streamsProps(String applicationId, String brokers) {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
return props;
}

/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
* @author Wang ZhiYang
* @author Huijin Hong
* @author Soby Chacko
* @author Sanghyeok An
*/
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware, AsyncRepliesAware {

Expand Down Expand Up @@ -421,7 +422,7 @@ else if (this.hasMetadataParameter) {
return this.handlerMethod.invoke(message, data, ack, consumer);
}
}
catch (org.springframework.messaging.converter.MessageConversionException ex) {
catch (MessageConversionException ex) {
throw checkAckArg(ack, message, new MessageConversionException("Cannot handle message", ex));
}
catch (MethodArgumentNotValidException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -49,6 +48,7 @@
import org.springframework.kafka.streams.KafkaStreamsMicrometerListener;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

Expand All @@ -60,6 +60,7 @@
* @author Nurettin Yilmaz
* @author Artem Bilan
* @author Almog Gavra
* @author Sanghyeok An
*
* @since 2.1.5
*/
Expand Down Expand Up @@ -90,7 +91,7 @@ public void testKafkaStreamsCustomizer(@Autowired KafkaStreamsConfiguration conf
assertThat(STATE_LISTENER.getCurrentState()).isEqualTo(state);
Properties properties = configuration.asProperties();
assertThat(properties.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo(Collections.singletonList(config.broker.getBrokersAsString()));
.isEqualTo(config.broker.getBrokersAsString());
assertThat(properties.get(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG))
.isEqualTo(Foo.class);
assertThat(properties.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG))
Expand Down Expand Up @@ -163,10 +164,8 @@ public void configureTopology(Topology topology) {
@SuppressWarnings("deprecation")
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
Collections.singletonList(this.broker.getBrokersAsString()));
Map<String, Object> props =
KafkaTestUtils.streamsProps(APPLICATION_ID, this.broker.getBrokersAsString());
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, Foo.class);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1000);
return new KafkaStreamsConfiguration(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.annotation.Backoff;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -67,6 +68,7 @@
* Tests for <a href="https://github.com/spring-projects/spring-kafka/issues/1828">...</a>
*
* @author Wang Zhiyang
* @author Sanghyeok An
*
* @since 3.2
*/
Expand Down Expand Up @@ -284,9 +286,7 @@ static class KafkaProducerConfig {

@Bean
ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Map<String, Object> configProps = KafkaTestUtils.producerProps(
this.broker.getBrokersAsString());
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down Expand Up @@ -319,13 +319,8 @@ KafkaAdmin kafkaAdmin() {

@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.broker.getBrokersAsString());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"groupId");
Map<String, Object> props = KafkaTestUtils.consumerProps(
this.broker.getBrokersAsString(), "groupId");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.annotation.Backoff;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -67,6 +68,7 @@
/**
* Tests for https://github.com/spring-projects/spring-kafka/issues/1828
* @author Deepesh Verma
* @author Sanghyeok An
* @since 2.7
*/
@SpringJUnitConfig
Expand Down Expand Up @@ -276,9 +278,7 @@ public static class KafkaProducerConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Map<String, Object> configProps = KafkaTestUtils.producerProps(
this.broker.getBrokersAsString());
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down Expand Up @@ -311,13 +311,8 @@ public KafkaAdmin kafkaAdmin() {

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.broker.getBrokersAsString());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"groupId");
Map<String, Object> props = KafkaTestUtils.consumerProps(
this.broker.getBrokersAsString(), "groupId");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.annotation.Backoff;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -66,6 +67,7 @@
* Test class level non-blocking retries.
*
* @author Wang Zhiyang
* @author Sanghyeok An
*
* @since 3.2
*/
Expand Down Expand Up @@ -417,9 +419,7 @@ static class KafkaProducerConfig {

@Bean
ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Map<String, Object> configProps = KafkaTestUtils.producerProps(
this.broker.getBrokersAsString());
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down Expand Up @@ -452,13 +452,8 @@ KafkaAdmin kafkaAdmin() {

@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.broker.getBrokersAsString());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"groupId");
Map<String, Object> props = KafkaTestUtils.consumerProps(
this.broker.getBrokersAsString(), "groupId");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.GenericMessageConverter;
Expand Down Expand Up @@ -738,9 +739,7 @@ static class KafkaProducerConfig {

@Bean
ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Map<String, Object> configProps = KafkaTestUtils.producerProps(
this.broker.getBrokersAsString());
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down Expand Up @@ -783,13 +782,8 @@ NewTopics topics() {

@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.broker.getBrokersAsString());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"groupId");
Map<String, Object> props = KafkaTestUtils.consumerProps(
this.broker.getBrokersAsString(), "groupId");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.annotation.Backoff;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -66,6 +67,7 @@
/**
* @author Tomaz Fernandes
* @author Wang Zhiyang
* @author Sanghyeok An
*
* @since 2.8.4
*/
Expand Down Expand Up @@ -431,9 +433,7 @@ public static class KafkaProducerConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Map<String, Object> configProps = KafkaTestUtils.producerProps(
this.broker.getBrokersAsString());
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down Expand Up @@ -466,13 +466,8 @@ public KafkaAdmin kafkaAdmin() {

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.broker.getBrokersAsString());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"groupId");
Map<String, Object> props = KafkaTestUtils.consumerProps(
this.broker.getBrokersAsString(), "groupId");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.GenericMessageConverter;
Expand Down Expand Up @@ -765,9 +766,7 @@ public static class KafkaProducerConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Map<String, Object> configProps = KafkaTestUtils.producerProps(
this.broker.getBrokersAsString());
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down Expand Up @@ -810,13 +809,8 @@ public NewTopics topics() {

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.broker.getBrokersAsString());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"groupId");
Map<String, Object> props = KafkaTestUtils.consumerProps(
this.broker.getBrokersAsString(), "groupId");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
Expand Down
Loading

0 comments on commit 6ce3442

Please sign in to comment.