Skip to content

Commit ac110b4

Browse files
Add method
1 parent 723cc18 commit ac110b4

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -690,15 +690,19 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
690690
catch (Throwable ex) {
691691
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
692692
acknowledge(acknowledgment);
693-
if (request instanceof ConsumerRecord &&
694-
ex instanceof RuntimeException) {
693+
if (canAsyncRetry(request, ex)) {
695694
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request;
696695
FailedRecordTuple failedRecordTuple = new FailedRecordTuple(record, (RuntimeException) ex);
697696
this.callbackForAsyncFailureQueue.accept(failedRecordTuple);
698697
}
699698
}
700699
}
701700

701+
private boolean canAsyncRetry(Object request, Throwable exception) {
702+
// The async retry with @RetryableTopic only support in case of SingleRecord Listener.
703+
return request instanceof ConsumerRecord && exception instanceof RuntimeException;
704+
}
705+
702706
protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
703707
Message<?> message, ListenerExecutionFailedException e) {
704708

0 commit comments

Comments
 (0)