diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc index 5f90bb77b3..52f9ec06cc 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc @@ -454,8 +454,7 @@ See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Han Starting with version 3.2, Recovery can now recover (skip) entire batch of records that keeps failing. Set `ContainerProperties.setBatchRecoverAfterRollback(true)` to enable this feature. -IMPORTANT: Default behavior, recovery is not possible with a batch listener, -since the framework has no knowledge about which record in the batch keeps failing. +IMPORTANT: Default behavior, recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing. In such cases, the application listener must handle a record that keeps failing. See also xref:kafka/annotation-error-handling.adoc#dead-letters[Publishing Dead-letter Records]. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java index 558b0a4887..7137ce018f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java @@ -35,6 +35,7 @@ * @param the value type. * * @author Gary Russell + * @author Wang Zhiyang * * @since 1.3.5 * @@ -80,6 +81,7 @@ void process(List> records, Consumer consumer, default void processBatch(ConsumerRecords records, List> recordList, Consumer consumer, MessageListenerContainer container, Exception exception, boolean recoverable, ContainerProperties.EOSMode eosMode) { + process(recordList, consumer, container, exception, recoverable, eosMode); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java index 1aa0c4464b..3a0dd0a118 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java @@ -26,6 +26,7 @@ * failed. * * @author Gary Russell + * @author Wang Zhiyang * @since 2.5 * */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index 3280e51574..616db9bea3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -51,6 +51,7 @@ * @author Johnny Lim * @author Lukasz Kaminski * @author Kyuhyeok Park + * @author Wang Zhiyang */ public class ContainerProperties extends ConsumerProperties { @@ -545,10 +546,20 @@ public void setTransactionManager(@Nullable PlatformTransactionManager transacti this.transactionManager = transactionManager; } + /** + * Recover batch records after rollback if true. + * @return true to recover. + * @since 3.2 + */ public boolean isBatchRecoverAfterRollback() { return this.batchRecoverAfterRollback; } + /** + * enable the batch recover after rollback. + * @param batchRecoverAfterRollback the batchRecoverAfterRollback to set. + * @since 3.2 + */ public void setBatchRecoverAfterRollback(boolean batchRecoverAfterRollback) { this.batchRecoverAfterRollback = batchRecoverAfterRollback; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java index 0b2a22b117..f14114f0f8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java @@ -50,6 +50,7 @@ * * @author Gary Russell * @author Francois Rosiere + * @author Wang Zhiyang * * @since 1.3.5 * diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 05a9f6fa16..0196ca4a1c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -161,6 +161,7 @@ * @author Francois Rosiere * @author Daniel Gentes * @author Soby Chacko + * @author Wang Zhiyang */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -2204,10 +2205,15 @@ protected void doInTransactionWithoutResult(TransactionStatus status) { }); } else { - afterRollbackProcessorToUse.processBatch(records, - Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)), this.consumer, - KafkaMessageListenerContainer.this.thisOrParentContainer, e, - this.wantsBatchRecoverAfterRollback, this.eosMode); + try { + afterRollbackProcessorToUse.processBatch(records, + Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)), this.consumer, + KafkaMessageListenerContainer.this.thisOrParentContainer, e, + this.wantsBatchRecoverAfterRollback, this.eosMode); + } + catch (Exception ex) { + this.logger.error(ex, "AfterRollbackProcessor threw exception"); + } } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java index a47abe8c25..4b2d1e7b79 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java @@ -31,6 +31,7 @@ * @author Gary Russell * @author Francois Rosiere * @author Antonio Tomac + * @author Wang Zhiyang * @since 2.0 * */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java index 4d38f7e712..4a4aa10419 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java @@ -45,6 +45,7 @@ * * @author Gary Russell * @author Francois Rosiere + * @author Wang Zhiyang * @since 2.2 * */ @@ -138,6 +139,7 @@ public static boolean doSeeks(List> records, Consumer * @param records the records. * @param consumer the consumer. * @param logger a {@link LogAccessor} for seek errors. + * @since 3.2 */ public static void doSeeksToBegin(List> records, Consumer consumer, LogAccessor logger) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java index cd27f61f73..55ede47ea2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java @@ -53,6 +53,7 @@ /** * @author Gary Russell * @author Francois Rosiere + * @author Wang Zhiyang * @since 2.3.1 * */ diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 031a58e503..ca8f60ae81 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -107,6 +107,7 @@ /** * @author Gary Russell * @author Artem Bilan + * @author Wang Zhiyang * * @since 1.3 *