Skip to content

Commit

Permalink
review fix
Browse files Browse the repository at this point in the history
* `what-new.adoc` and `annotation-error-handling.adoc`
* add javadoc in `SeekUtils` and `AfterRollbackProcessor`
* change `ListenerUtils.nextBackOff` public to default
* change logger args to static string
  • Loading branch information
Wzy19930507 committed Jan 19, 2024
1 parent 094deaa commit d2484f4
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,11 @@ AfterRollbackProcessor<String, String> processor =
When you do not use transactions, you can achieve similar functionality by configuring a `DefaultErrorHandler`.
See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Handlers].

IMPORTANT: Recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing.
Starting with version 3.2, Recovery can now recover (skip) entire batch of records that keeps failing.
Set `ContainerProperties.setBatchRecoverAfterRollback(true)` to enable this feature.

IMPORTANT: Default behavior, recovery is not possible with a batch listener,
since the framework has no knowledge about which record in the batch keeps failing.
In such cases, the application listener must handle a record that keeps failing.

See also xref:kafka/annotation-error-handling.adoc#dead-letters[Publishing Dead-letter Records].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,11 @@ For changes in earlier version, see xref:appendix/change-history.adoc[Change His

A new `TransactionIdSuffixStrategy` interface was introduced to manage `transactional.id` suffix.
The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.


[[x32-after-rollback-processing]]
=== After Rollback Processing

A new `AfterRollbackProcessor` API `processBatch` is provided.
See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Processor] for more information.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,6 +64,19 @@ public interface AfterRollbackProcessor<K, V> {
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode);

/**
* Process the entire batch of records.
* Recoverable will be true if the container is processing entire batch of records;
* @param records the records.
* @param recordList the record list.
* @param consumer the consumer.
* @param container the container.
* @param exception the exception
* @param recoverable the recoverable.
* @param eosMode the {@link EOSMode}.
* @since 3.2
* @see #isProcessInTransaction()
*/
default void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList,
Consumer<K, V> consumer, MessageListenerContainer container, Exception exception,
boolean recoverable, ContainerProperties.EOSMode eosMode) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -216,7 +216,7 @@ public void processBatch(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V
}
catch (Exception ex) {
SeekUtils.doSeeksToBegin((List) recordList, consumer, this.logger);
logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch");
logger.error(ex, "Recoverer threw an exception; re-seeking batch");
throw ex;
}
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -139,7 +139,7 @@ public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExec
}
}

public static long nextBackOff(BackOff backOff, Map<Thread, BackOffExecution> executions) {
static long nextBackOff(BackOff backOff, Map<Thread, BackOffExecution> executions) {

Thread currentThread = Thread.currentThread();
BackOffExecution backOffExecution = executions.get(currentThread);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -133,6 +133,12 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
return skipped.get();
}

/**
* Seek records to begin position, optionally skipping the first.
* @param records the records.
* @param consumer the consumer.
* @param logger a {@link LogAccessor} for seek errors.
*/
public static void doSeeksToBegin(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
LogAccessor logger) {

Expand Down

0 comments on commit d2484f4

Please sign in to comment.