diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 03adedfec0..953dc943cb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -906,7 +906,7 @@ else if (listener instanceof MessageListener) { FailedRecordTuple failedRecord = new FailedRecordTuple<>(cRecord, ex); this.failedRecords.addLast(failedRecord); }; - this.listener.setCallbackForAsyncFailureQueue(callbackForAsyncFailureQueue); + this.listener.setCallbackForAsyncFailure(callbackForAsyncFailureQueue); } } else { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java index e00eebb15b..1976efa226 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java @@ -33,7 +33,7 @@ @FunctionalInterface public interface MessageListener extends GenericMessageListener> { - default void setCallbackForAsyncFailureQueue( + default void setCallbackForAsyncFailure( BiConsumer, RuntimeException> asyncRetryCallback) { // } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java index d35d8f0831..a2774a8f4e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java @@ -147,7 +147,7 @@ public void onMessage(ConsumerRecord data, Consumer consumer) { } @Override - public void setCallbackForAsyncFailureQueue(BiConsumer, RuntimeException> callbackForAsyncFailureQueue) { - this.delegate.setCallbackForAsyncFailureQueue(callbackForAsyncFailureQueue); + public void setCallbackForAsyncFailure(BiConsumer, RuntimeException> callbackForAsyncFailureQueue) { + this.delegate.setCallbackForAsyncFailure(callbackForAsyncFailureQueue); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index e439ea9dfb..5acdf94956 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -884,8 +884,8 @@ private boolean rawByParameterIsType(Type parameterType, Type type) { return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type); } - public void putInAsyncFailureQueue(BiConsumer, RuntimeException> callbackForAsyncFailureQueue) { - this.callbackForAsyncFailureQueue = callbackForAsyncFailureQueue; + protected void setCallbackForAsyncFailure(BiConsumer, RuntimeException> asyncRetryCallback) { + this.callbackForAsyncFailureQueue = asyncRetryCallback; } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java index 209a4c7286..9904d1575e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java @@ -87,8 +87,8 @@ public void onMessage(ConsumerRecord record, @Nullable Acknowledgment ackn } @Override - public void setCallbackForAsyncFailureQueue( + public void setCallbackForAsyncFailure( BiConsumer, RuntimeException> asyncRetryCallback) { - putInAsyncFailureQueue(asyncRetryCallback); + super.setCallbackForAsyncFailure(asyncRetryCallback); } }