Skip to content

Commit

Permalink
review fix
Browse files Browse the repository at this point in the history
* @author classes.
* fix adoc
* poblish `AfterRollbackProcessor`
* javadoc at `ContainerProperties`
  • Loading branch information
Wzy19930507 committed Jan 21, 2024
1 parent d2484f4 commit 1f5dd35
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,7 @@ See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Han
Starting with version 3.2, Recovery can now recover (skip) entire batch of records that keeps failing.
Set `ContainerProperties.setBatchRecoverAfterRollback(true)` to enable this feature.

IMPORTANT: Default behavior, recovery is not possible with a batch listener,
since the framework has no knowledge about which record in the batch keeps failing.
IMPORTANT: Default behavior, recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing.
In such cases, the application listener must handle a record that keeps failing.

See also xref:kafka/annotation-error-handling.adoc#dead-letters[Publishing Dead-letter Records].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* @param <V> the value type.
*
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 1.3.5
*
Expand Down Expand Up @@ -80,6 +81,7 @@ void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
default void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList,
Consumer<K, V> consumer, MessageListenerContainer container, Exception exception,
boolean recoverable, ContainerProperties.EOSMode eosMode) {

process(recordList, consumer, container, exception, recoverable, eosMode);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* failed.
*
* @author Gary Russell
* @author Wang Zhiyang
* @since 2.5
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* @author Johnny Lim
* @author Lukasz Kaminski
* @author Kyuhyeok Park
* @author Wang Zhiyang
*/
public class ContainerProperties extends ConsumerProperties {

Expand Down Expand Up @@ -545,10 +546,20 @@ public void setTransactionManager(@Nullable PlatformTransactionManager transacti
this.transactionManager = transactionManager;
}

/**
* Recover batch records after rollback if true.
* @return true to recover.
* @since 3.2
*/
public boolean isBatchRecoverAfterRollback() {
return this.batchRecoverAfterRollback;
}

/**
* enable the batch recover after rollback.
* @param batchRecoverAfterRollback the batchRecoverAfterRollback to set.
* @since 3.2
*/
public void setBatchRecoverAfterRollback(boolean batchRecoverAfterRollback) {
this.batchRecoverAfterRollback = batchRecoverAfterRollback;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
*
* @author Gary Russell
* @author Francois Rosiere
* @author Wang Zhiyang
*
* @since 1.3.5
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@
* @author Francois Rosiere
* @author Daniel Gentes
* @author Soby Chacko
* @author Wang Zhiyang
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -2204,10 +2205,15 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
});
}
else {
afterRollbackProcessorToUse.processBatch(records,
Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)), this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer, e,
this.wantsBatchRecoverAfterRollback, this.eosMode);
try {
afterRollbackProcessorToUse.processBatch(records,
Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)), this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer, e,
this.wantsBatchRecoverAfterRollback, this.eosMode);
}
catch (Exception ex) {
this.logger.error(ex, "AfterRollbackProcessor threw exception");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* @author Gary Russell
* @author Francois Rosiere
* @author Antonio Tomac
* @author Wang Zhiyang
* @since 2.0
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
*
* @author Gary Russell
* @author Francois Rosiere
* @author Wang Zhiyang
* @since 2.2
*
*/
Expand Down Expand Up @@ -138,6 +139,7 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
* @param records the records.
* @param consumer the consumer.
* @param logger a {@link LogAccessor} for seek errors.
* @since 3.2
*/
public static void doSeeksToBegin(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
LogAccessor logger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
/**
* @author Gary Russell
* @author Francois Rosiere
* @author Wang Zhiyang
* @since 2.3.1
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
/**
* @author Gary Russell
* @author Artem Bilan
* @author Wang Zhiyang
*
* @since 1.3
*
Expand Down

0 comments on commit 1f5dd35

Please sign in to comment.