From e74072c6a95630369b5a13ba012928e023cf24d2 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Fri, 11 Oct 2024 19:03:01 +0900 Subject: [PATCH] Remove async retry callback from MessageListener contract. --- .../KafkaMessageListenerContainer.java | 49 +++++++++++++------ .../kafka/listener/MessageListener.java | 7 --- ...fkaBackoffAwareMessageListenerAdapter.java | 6 --- .../MessagingMessageListenerAdapter.java | 12 +++-- ...RecordMessagingMessageListenerAdapter.java | 6 --- ...eRetryTopicClassLevelIntegrationTests.java | 2 +- ...pletableFutureRetryTopicScenarioTests.java | 12 +---- ...eRetryTopicClassLevelIntegrationTests.java | 2 +- .../AsyncMonoRetryTopicScenarioTests.java | 12 +---- 9 files changed, 45 insertions(+), 63 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 953dc943cb..957f64c4c9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -108,6 +108,8 @@ import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption; import org.springframework.kafka.listener.ContainerProperties.EOSMode; import org.springframework.kafka.listener.adapter.AsyncRepliesAware; +import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; +import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaUtils; @@ -842,7 +844,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private volatile long lastPoll = System.currentTimeMillis(); - private final ConcurrentLinkedDeque> failedRecords = new ConcurrentLinkedDeque(); + private final ConcurrentLinkedDeque> failedRecords = new ConcurrentLinkedDeque<>(); @SuppressWarnings(UNCHECKED) ListenerConsumer(GenericMessageListener listener, ListenerType listenerType, @@ -900,13 +902,20 @@ else if (listener instanceof MessageListener) { this.pollThreadStateProcessor = setUpPollProcessor(false); this.observationEnabled = this.containerProperties.isObservationEnabled(); - if (!AopUtils.isAopProxy(listener)) { - BiConsumer, RuntimeException> callbackForAsyncFailureQueue = - (cRecord, ex) -> { - FailedRecordTuple failedRecord = new FailedRecordTuple<>(cRecord, ex); - this.failedRecords.addLast(failedRecord); - }; - this.listener.setCallbackForAsyncFailure(callbackForAsyncFailureQueue); + if (!AopUtils.isAopProxy(this.genericListener) && + this.genericListener instanceof KafkaBackoffAwareMessageListenerAdapter) { + KafkaBackoffAwareMessageListenerAdapter genListener = + (KafkaBackoffAwareMessageListenerAdapter) this.genericListener; + if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter) { + + RecordMessagingMessageListenerAdapter recordAdapterListener = + (RecordMessagingMessageListenerAdapter) genListener.getDelegate(); + + BiConsumer, RuntimeException> callbackForAsyncFailure = + (cRecord, ex) -> this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex)); + recordAdapterListener.setCallbackForAsyncFailure(callbackForAsyncFailure); + } + } } else { @@ -1458,17 +1467,25 @@ protected void pollAndInvoke() { } protected void handleAsyncFailure() { - List copyFailedRecords = new ArrayList<>(); + List> copyFailedRecords = new ArrayList<>(); while (!this.failedRecords.isEmpty()) { - FailedRecordTuple failedRecordTuple = this.failedRecords.pollFirst(); + FailedRecordTuple failedRecordTuple = this.failedRecords.pollFirst(); copyFailedRecords.add(failedRecordTuple); } // If any copied and failed record fails to complete due to an unexpected error, // We will give up on retrying with the remaining copied and failed Records. - if (!copyFailedRecords.isEmpty()) { - copyFailedRecords.forEach(failedRecordTuple -> - this.invokeErrorHandlerBySingleRecord(failedRecordTuple.record(), failedRecordTuple.ex())); + for (FailedRecordTuple copyFailedRecord : copyFailedRecords) { + try { + invokeErrorHandlerBySingleRecord(copyFailedRecord); + } + catch (Exception e) { + this.logger.warn(() -> + "Async failed record failed to complete, thus skip it. record :" + + copyFailedRecord.toString() + + ", Exception : " + + e.getMessage()); + } } } @@ -2864,7 +2881,9 @@ private void doInvokeOnMessage(final ConsumerRecord recordArg) { } } - private void invokeErrorHandlerBySingleRecord(final ConsumerRecord cRecord, RuntimeException rte) { + private void invokeErrorHandlerBySingleRecord(final FailedRecordTuple failedRecordTuple) { + final ConsumerRecord cRecord = failedRecordTuple.record; + RuntimeException rte = failedRecordTuple.ex; if (this.commonErrorHandler.seeksAfterHandling() || rte instanceof CommitFailedException) { try { if (this.producer == null) { @@ -3986,6 +4005,6 @@ private static class StopAfterFenceException extends KafkaException { } - record FailedRecordTuple(ConsumerRecord record, RuntimeException ex) { }; + private record FailedRecordTuple(ConsumerRecord record, RuntimeException ex) { }; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java index 1976efa226..5b20488d65 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java @@ -16,8 +16,6 @@ package org.springframework.kafka.listener; -import java.util.function.BiConsumer; - import org.apache.kafka.clients.consumer.ConsumerRecord; /** @@ -28,13 +26,8 @@ * * @author Marius Bogoevici * @author Gary Russell - * @author Sanghyeok An */ @FunctionalInterface public interface MessageListener extends GenericMessageListener> { - default void setCallbackForAsyncFailure( - BiConsumer, RuntimeException> asyncRetryCallback) { - // - } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java index a2774a8f4e..e208f50d1d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java @@ -20,7 +20,6 @@ import java.time.Clock; import java.time.Instant; import java.util.Optional; -import java.util.function.BiConsumer; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -44,7 +43,6 @@ * @param the record key type. * @param the record value type. * @author Tomaz Fernandes - * @author Sanghyeok An * @since 2.7 * */ @@ -146,8 +144,4 @@ public void onMessage(ConsumerRecord data, Consumer consumer) { onMessage(data, null, consumer); } - @Override - public void setCallbackForAsyncFailure(BiConsumer, RuntimeException> callbackForAsyncFailureQueue) { - this.delegate.setCallbackForAsyncFailure(callbackForAsyncFailureQueue); - } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 3e63bd0ec5..f08f087fa2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -155,7 +155,8 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS private String correlationHeaderName = KafkaHeaders.CORRELATION_ID; - private BiConsumer, RuntimeException> callbackForAsyncFailureQueue; + @Nullable + private BiConsumer, RuntimeException> callbackForAsyncFailure; /** * Create an instance with the provided bean and method. @@ -678,9 +679,10 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm catch (Throwable ex) { this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source); acknowledge(acknowledgment); - if (canAsyncRetry(request, ex)) { + if (canAsyncRetry(request, ex) && + Objects.nonNull(this.callbackForAsyncFailure)) { ConsumerRecord record = (ConsumerRecord) request; - this.callbackForAsyncFailureQueue.accept(record, (RuntimeException) ex); + this.callbackForAsyncFailure.accept(record, (RuntimeException) ex); } } } @@ -884,8 +886,8 @@ private boolean rawByParameterIsType(Type parameterType, Type type) { return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type); } - protected void setAsyncFailureCallback(BiConsumer, RuntimeException> asyncRetryCallback) { - this.callbackForAsyncFailureQueue = asyncRetryCallback; + public void setCallbackForAsyncFailure(BiConsumer, RuntimeException> asyncRetryCallback) { + this.callbackForAsyncFailure = asyncRetryCallback; } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java index ccfcb52b9a..6caa854e45 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java @@ -17,7 +17,6 @@ package org.springframework.kafka.listener.adapter; import java.lang.reflect.Method; -import java.util.function.BiConsumer; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -86,9 +85,4 @@ public void onMessage(ConsumerRecord record, @Nullable Acknowledgment ackn invoke(record, acknowledgment, consumer, message); } - @Override - public void setCallbackForAsyncFailure( - BiConsumer, RuntimeException> asyncRetryCallback) { - setAsyncFailureCallback(asyncRetryCallback); - } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.java index 0a6c64ecf5..64b66e3522 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2024 the original author or authors. + * Copyright 2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java index 5b949b18f9..27b5442557 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicScenarioTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2024 the original author or authors. + * Copyright 2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -30,7 +29,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; @@ -45,7 +43,6 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ContainerProperties; @@ -1210,13 +1207,6 @@ static class KafkaConsumerConfig { @Autowired EmbeddedKafkaBroker broker; - @Bean - KafkaAdmin kafkaAdmin() { - Map configs = new HashMap<>(); - configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString()); - return new KafkaAdmin(configs); - } - @Bean ConsumerFactory consumerFactory() { Map props = KafkaTestUtils.consumerProps( diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoFutureRetryTopicClassLevelIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoFutureRetryTopicClassLevelIntegrationTests.java index 4eca91f805..81e67987a3 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoFutureRetryTopicClassLevelIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoFutureRetryTopicClassLevelIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2024 the original author or authors. + * Copyright 2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java index 7426cdfde8..1de4c4af63 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncMonoRetryTopicScenarioTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2024 the original author or authors. + * Copyright 2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -30,7 +29,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; @@ -45,7 +43,6 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ContainerProperties; @@ -1178,13 +1175,6 @@ static class KafkaConsumerConfig { @Autowired EmbeddedKafkaBroker broker; - @Bean - KafkaAdmin kafkaAdmin() { - Map configs = new HashMap<>(); - configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString()); - return new KafkaAdmin(configs); - } - @Bean ConsumerFactory consumerFactory() { Map props = KafkaTestUtils.consumerProps(