diff --git a/spring-kafka/src/main/java/org/springframework/kafka/mock/MockProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/mock/MockProducerFactory.java index 25e0cef5b1..dc8d663a4c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/mock/MockProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/mock/MockProducerFactory.java @@ -16,15 +16,32 @@ package org.springframework.kafka.mock; -import java.util.function.BiFunction; -import java.util.function.Supplier; - +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; - +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; + +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.ProducerFactory; import org.springframework.lang.Nullable; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.function.BiFunction; +import java.util.function.Supplier; + /** * Support the use of {@link MockProducer} in tests. * @@ -32,8 +49,8 @@ * @param the value type. * * @author Gary Russell + * @author Pawel Szymczyk * @since 3.0.7 - * */ public class MockProducerFactory implements ProducerFactory { @@ -46,6 +63,7 @@ public class MockProducerFactory implements ProducerFactory { /** * Create an instance that does not support transactional producers. + * * @param producerProvider a {@link Supplier} for a {@link MockProducer}. */ public MockProducerFactory(Supplier> producerProvider) { @@ -58,11 +76,11 @@ public MockProducerFactory(Supplier> producerProvider) { * Create an instance that supports transactions, with the supplied producer provider {@link BiFunction}. The * function has two parameters, a boolean indicating whether a transactional producer * is being requested and, if true, the transaction id prefix for that producer. + * * @param producerProvider the provider function. * @param defaultTxId the default transactional id. */ - public MockProducerFactory(BiFunction> producerProvider, - @Nullable String defaultTxId) { + public MockProducerFactory(BiFunction> producerProvider, @Nullable String defaultTxId) { this.producerProvider = producerProvider; this.defaultTxId = defaultTxId; @@ -81,9 +99,8 @@ public Producer createProducer() { @Override public Producer createProducer(@Nullable String txIdPrefix) { - return txIdPrefix == null && this.defaultTxId == null - ? this.producerProvider.apply(false, null) - : this.producerProvider.apply(true, txIdPrefix == null ? this.defaultTxId : txIdPrefix); + return txIdPrefix == null && this.defaultTxId == null ? new CloseSafeMockProducer<>(this.producerProvider.apply(false, null)) : + new CloseSafeMockProducer<>(this.producerProvider.apply(true, txIdPrefix == null ? this.defaultTxId : txIdPrefix)); } @Override @@ -91,4 +108,87 @@ public Producer createNonTransactionalProducer() { return this.producerProvider.apply(false, null); } + /** + * A wrapper class for the delegate, inspired by {@link DefaultKafkaProducerFactory.CloseSafeProducer}. + * + * @param the key type. + * @param the value type. + * + * @author Pawel Szymczyk + */ + static class CloseSafeMockProducer implements Producer { + + private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(CloseSafeMockProducer.class)); + + private final MockProducer delegate; + + CloseSafeMockProducer(MockProducer delegate) { + this.delegate = delegate; + } + + @Override + public void initTransactions() { + this.delegate.initTransactions(); + } + + @Override + public void beginTransaction() throws ProducerFencedException { + this.delegate.beginTransaction(); + } + + @Override + public void sendOffsetsToTransaction(Map offsets, String consumerGroupId) throws ProducerFencedException { + this.delegate.sendOffsetsToTransaction(offsets, consumerGroupId); + } + + @Override + public void sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata groupMetadata) throws ProducerFencedException { + this.delegate.sendOffsetsToTransaction(offsets, groupMetadata); + } + + @Override + public void commitTransaction() throws ProducerFencedException { + this.delegate.commitTransaction(); + } + + @Override + public void abortTransaction() throws ProducerFencedException { + this.delegate.abortTransaction(); + } + + @Override + public Future send(ProducerRecord record) { + return this.delegate.send(record); + } + + @Override + public Future send(ProducerRecord record, Callback callback) { + return this.delegate.send(record, callback); + } + + @Override + public void flush() { + this.delegate.flush(); + } + + @Override + public List partitionsFor(String topic) { + return this.delegate.partitionsFor(topic); + } + + @Override + public Map metrics() { + return this.delegate.metrics(); + } + + @Override + public void close() { + close(null); + } + + @Override + public void close(Duration timeout) { + LOGGER.debug(() -> "The closing of delegate producer " + this.delegate + "has been skipped."); + } + } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/mock/MockProducerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/mock/MockProducerFactoryTests.java new file mode 100644 index 0000000000..166546f244 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/mock/MockProducerFactoryTests.java @@ -0,0 +1,24 @@ +package org.springframework.kafka.mock; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.core.KafkaTemplate; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Pawel Szymczyk + */ +public class MockProducerFactoryTests { + + @Test + void testSendingMultipleMessagesWithMockProducer() { + MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer()); + MockProducerFactory mockProducerFactory = new MockProducerFactory<>(() -> mockProducer); + KafkaTemplate kafkaTemplate = new KafkaTemplate<>(mockProducerFactory); + kafkaTemplate.send("topic", "Hello"); + kafkaTemplate.send("topic", "World"); + assertThat(mockProducer.history()).hasSize(2); + } +}