From 74be3febc090837a3101d790a60c38a0e545ac55 Mon Sep 17 00:00:00 2001 From: Adrian Chlebosz <36669019+breader124@users.noreply.github.com> Date: Thu, 18 May 2023 15:25:25 +0200 Subject: [PATCH 1/5] GH-2190: add ctors to reactive producer and consumer (#2683) Fixes GH-2190 (https://github.com/spring-projects/spring-kafka/issues/2190) --- .../ReactiveKafkaConsumerTemplate.java | 8 +++++++- .../ReactiveKafkaProducerTemplate.java | 14 +++++++++++++- ...eKafkaProducerTemplateIntegrationTests.java | 18 ++++++++++++++++-- ...cerTemplateTransactionIntegrationTests.java | 13 +++++++++++-- 4 files changed, 47 insertions(+), 6 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.java index 320c6b7c11..0dc15d7b60 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ * @param the value type. * * @author Mark Norkin + * @author Adrian Chlebosz * * @since 2.3.0 */ @@ -61,6 +62,11 @@ public ReactiveKafkaConsumerTemplate(ReceiverOptions receiverOptions) { this.kafkaReceiver = KafkaReceiver.create(receiverOptions); } + public ReactiveKafkaConsumerTemplate(KafkaReceiver kafkaReceiver) { + Assert.notNull(kafkaReceiver, "Kafka receiver can not be null"); + this.kafkaReceiver = kafkaReceiver; + } + public Flux> receive() { return this.kafkaReceiver.receive(); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java index 904bedbbc4..e2e9af4771 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,6 +51,7 @@ * @param the value type. * * @author Mark Norkin + * @author Adrian Chlebosz * * @since 2.3.0 */ @@ -71,6 +72,17 @@ public ReactiveKafkaProducerTemplate(SenderOptions senderOptions, RecordMe this.messageConverter = messageConverter; } + public ReactiveKafkaProducerTemplate(KafkaSender sender) { + this(sender, new MessagingMessageConverter()); + } + + public ReactiveKafkaProducerTemplate(KafkaSender sender, RecordMessageConverter messageConverter) { + Assert.notNull(sender, "Sender can not be null"); + Assert.notNull(messageConverter, "Message converter can not be null"); + this.sender = sender; + this.messageConverter = messageConverter; + } + public Flux> sendTransactionally(Publisher> records) { Flux>> sendTransactionally = this.sender.sendTransactionally(Flux.just(records)); return sendTransactionally.flatMap(Function.identity()); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java index fd63140bce..bd9f1e9896 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -60,6 +60,7 @@ import reactor.core.publisher.Mono; import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.receiver.ReceiverRecord; +import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderOptions; import reactor.kafka.sender.SenderRecord; import reactor.kafka.sender.SenderResult; @@ -69,6 +70,7 @@ /** * @author Mark Norkin * @author Gary Russell + * @author Adrian Chlebosz * * @since 2.3.0 */ @@ -132,10 +134,17 @@ public void tearDown() { @Test public void shouldNotCreateTemplateIfOptionsIsNull() { assertThatIllegalArgumentException() - .isThrownBy(() -> new ReactiveKafkaProducerTemplate(null)) + .isThrownBy(() -> new ReactiveKafkaProducerTemplate<>((SenderOptions) null)) .withMessage("Sender options can not be null"); } + @Test + public void shouldNotCreateTemplateIfSenderIsNull() { + assertThatIllegalArgumentException() + .isThrownBy(() -> new ReactiveKafkaProducerTemplate<>((KafkaSender) null)) + .withMessage("Sender can not be null"); + } + @Test @SuppressWarnings("unchecked") public void shouldNotCreateTemplateIfConverterIsNull() { @@ -143,6 +152,11 @@ public void shouldNotCreateTemplateIfConverterIsNull() { .isThrownBy(() -> new ReactiveKafkaProducerTemplate(Mockito.mock(SenderOptions.class), null)) .withMessage("Message converter can not be null"); + + assertThatIllegalArgumentException() + .isThrownBy(() -> + new ReactiveKafkaProducerTemplate(Mockito.mock(KafkaSender.class), null)) + .withMessage("Message converter can not be null"); } @Test diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java index db7ae084ce..61d711724d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.receiver.ReceiverRecord; import reactor.kafka.sender.SenderOptions; @@ -65,6 +66,7 @@ * @author Mark Norkin * @author Gary Russell * @author Will Kennedy + * @author Adrian Chlebosz * * @since 2.3.0 */ @@ -138,10 +140,17 @@ public void tearDown() { @Test public void shouldNotCreateTemplateIfOptionsIsNull() { assertThatIllegalArgumentException() - .isThrownBy(() -> new ReactiveKafkaConsumerTemplate(null)) + .isThrownBy(() -> new ReactiveKafkaConsumerTemplate<>((ReceiverOptions) null)) .withMessage("Receiver options can not be null"); } + @Test + public void shouldNotCreateTemplateIfReceiverIsNull() { + assertThatIllegalArgumentException() + .isThrownBy(() -> new ReactiveKafkaConsumerTemplate<>((KafkaReceiver) null)) + .withMessage("Kafka receiver can not be null"); + } + @Test public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiveIt() { ProducerRecord producerRecord = From 5d294b5a03c09d1810afbfc0f489780d9051dca5 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 22 May 2023 12:28:50 -0400 Subject: [PATCH 2/5] Doc: Clarify kafka-clients Version for Boot 3.1.x --- spring-kafka-docs/src/main/asciidoc/appendix.adoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spring-kafka-docs/src/main/asciidoc/appendix.adoc b/spring-kafka-docs/src/main/asciidoc/appendix.adoc index 534d733844..867a41523a 100644 --- a/spring-kafka-docs/src/main/asciidoc/appendix.adoc +++ b/spring-kafka-docs/src/main/asciidoc/appendix.adoc @@ -4,6 +4,8 @@ When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot's dependency management. If you wish to use a different version of `kafka-clients` or `kafka-streams`, and use the embedded kafka broker for testing, you need to override their version used by Spring Boot dependency management; set the `kafka.version` property. +NOTE: Default `kafka-clients` dependencies for Spring Boot 3.0.x and 3.1.x are 3.3.2 and 3.4.0 respectively. + Or, to use a different Spring for Apache Kafka version with a supported Spring Boot version, set the `spring-kafka.version` property. ==== From db71a836b90311fdd1db46df4ca5ba14b492e9cb Mon Sep 17 00:00:00 2001 From: tbcs Date: Thu, 1 Jun 2023 12:12:49 -0400 Subject: [PATCH 3/5] docs: fix Javadoc of CommonErrorHandler::remainingRecords (#2691) The doc suggests that the default value of this method is false, even though it's actually true. Update the doc to match the code. --- .../kafka/listener/CommonErrorHandler.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java index e6dc715b25..0174eb1658 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java @@ -38,10 +38,10 @@ public interface CommonErrorHandler extends DeliveryAttemptAware { /** - * Return false if this error handler should only receive the current failed record; - * remaining records will be passed to the listener after the error handler returns. - * When true (default), all remaining records including the failed record are passed - * to the error handler. + * Return false (default) if this error handler should only receive the current failed + * record; remaining records will be passed to the listener after the error handler + * returns. When true, all remaining records including the failed record are passed to + * the error handler. * @return false to receive only the failed record. * @deprecated in favor of {@link #seeksAfterHandling()}. * @see #handleRecord(Exception, ConsumerRecord, Consumer, MessageListenerContainer) From 54601b1e829a3e9697b4933557a9d6bb16a7de7d Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Mon, 5 Jun 2023 21:47:31 +0530 Subject: [PATCH 4/5] Fixed Typo (#2698) --- spring-kafka-docs/src/main/asciidoc/retrytopic.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index a088b76d9c..3b0641e3af 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -126,7 +126,7 @@ To achieve more fine-grained control over how to handle non-blocking retrials fo public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { return RetryTopicConfigurationBuilder .newInstance() - .fixedBackoff(3000) + .fixedBackOff(3000) .maxAttempts(5) .concurrency(1) .includeTopics("my-topic", "my-other-topic") From 206e68fd6226e092f77dd444c1f88be634800d25 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 8 Jun 2023 13:03:55 -0400 Subject: [PATCH 5/5] Default Empty Map for PF configurationProperties The default behavior was to throw an `UnsupportedOperationException` which effectively makes the `MockProducerFactory` unusable without subclassing, e.g. in `spring-integration-kafka`, which calls that method to examine some properties. Change the default behavior to return an empty map. --- .../java/org/springframework/kafka/core/ProducerFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java index b65802ac88..bb3dcdc732 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java @@ -123,7 +123,7 @@ default void reset() { * @since 2.5 */ default Map getConfigurationProperties() { - throw new UnsupportedOperationException("This implementation doesn't support this method"); + return Collections.emptyMap(); } /**