diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java index 622ec87459..343b40b1b8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java @@ -44,12 +44,12 @@ * the listener throws a {@link BatchListenerFailedException}, the offsets prior to the * failed record are committed and the remaining records have seeks performed. When the * retries are exhausted, the failed record is sent to the recoverer instead of being - * included in the seeks. If other exceptions are thrown processing is delegated to the - * fallback handler. + * included in the seeks. If other exceptions are thrown, the fallback handler takes the processing. * * @author Gary Russell * @author Francois Rosiere * @author Wang Zhiyang + * @author Artem Bilan * @since 2.8 * */ @@ -63,10 +63,10 @@ public abstract class FailedBatchProcessor extends FailedRecordProcessor { * Construct an instance with the provided properties. * @param recoverer the recoverer. * @param backOff the back off. - * @param fallbackHandler the fall back handler. + * @param fallbackHandler the fallback handler. */ public FailedBatchProcessor(@Nullable BiConsumer, Exception> recoverer, BackOff backOff, - CommonErrorHandler fallbackHandler) { + CommonErrorHandler fallbackHandler) { this(recoverer, backOff, null, fallbackHandler); } @@ -76,11 +76,11 @@ public FailedBatchProcessor(@Nullable BiConsumer, Exception * @param recoverer the recoverer. * @param backOff the back off. * @param backOffHandler the {@link BackOffHandler} - * @param fallbackHandler the fall back handler. + * @param fallbackHandler the fallback handler. * @since 2.9 */ public FailedBatchProcessor(@Nullable BiConsumer, Exception> recoverer, BackOff backOff, - @Nullable BackOffHandler backOffHandler, CommonErrorHandler fallbackHandler) { + @Nullable BackOffHandler backOffHandler, CommonErrorHandler fallbackHandler) { super(recoverer, backOff, backOffHandler); this.fallbackBatchHandler = fallbackHandler; @@ -103,7 +103,7 @@ public void setLogLevel(Level logLevel) { } /** - * Set to false to not reclassify the exception if different from the previous + * Set to {@code false} to not reclassify the exception if different from the previous * failure. If the changed exception is classified as retryable, the existing back off * sequence is used; a new sequence is not started. Default true. Only applies when * the fallback batch error handler (for exceptions other than @@ -195,7 +195,7 @@ private void fallback(Exception thrownException, ConsumerRecords data, Con this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener); } - private int findIndex(ConsumerRecords data, ConsumerRecord record) { + private int findIndex(ConsumerRecords data, @Nullable ConsumerRecord record) { if (record == null) { return -1; } @@ -229,57 +229,60 @@ private ConsumerRecords seekOrRecover(Exception thrownException, @N remaining.add(datum); } } + try { - if (offsets.size() > 0) { + if (!offsets.isEmpty()) { commit(consumer, container, offsets); } } - finally { - if (isSeekAfterError()) { - if (remaining.size() > 0) { - SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false, - getFailureTracker(), this.logger, getLogLevel()); - ConsumerRecord recovered = remaining.get(0); - commit(consumer, container, - Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()), - ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1))); - if (remaining.size() > 1) { - throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException); - } + catch (Exception ex) { + // Ignore and follow with seek below + } + + if (isSeekAfterError()) { + if (!remaining.isEmpty()) { + SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false, + getFailureTracker(), this.logger, getLogLevel()); + ConsumerRecord recovered = remaining.get(0); + commit(consumer, container, + Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()), + ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1))); + if (remaining.size() > 1) { + throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException); } - return ConsumerRecords.empty(); } - else { - if (remaining.size() > 0) { - try { - if (getFailureTracker().recovered(remaining.get(0), thrownException, container, - consumer)) { - remaining.remove(0); - } - } - catch (Exception e) { - if (SeekUtils.isBackoffException(thrownException)) { - this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0)) - + " included in remaining due to retry back off " + thrownException); - } - else { - this.logger.error(e, KafkaUtils.format(remaining.get(0)) - + " included in remaining due to " + thrownException); - } + return ConsumerRecords.empty(); + } + else { + if (!remaining.isEmpty()) { + try { + if (getFailureTracker().recovered(remaining.get(0), thrownException, container, + consumer)) { + remaining.remove(0); } } - if (remaining.isEmpty()) { - return ConsumerRecords.empty(); + catch (Exception e) { + if (SeekUtils.isBackoffException(thrownException)) { + this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0)) + + " included in remaining due to retry back off " + thrownException); + } + else { + this.logger.error(e, KafkaUtils.format(remaining.get(0)) + + " included in remaining due to " + thrownException); + } } - Map>> remains = new HashMap<>(); - remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()), - tp -> new ArrayList<>()).add((ConsumerRecord) rec)); - return new ConsumerRecords<>(remains); } + if (remaining.isEmpty()) { + return ConsumerRecords.empty(); + } + Map>> remains = new HashMap<>(); + remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()), + tp -> new ArrayList<>()).add((ConsumerRecord) rec)); + return new ConsumerRecords<>(remains); } } - private void commit(Consumer consumer, MessageListenerContainer container, + private static void commit(Consumer consumer, MessageListenerContainer container, Map offsets) { ContainerProperties properties = container.getContainerProperties(); @@ -296,7 +299,7 @@ private void commit(Consumer consumer, MessageListenerContainer container, } @Nullable - private BatchListenerFailedException getBatchListenerFailedException(Throwable throwableArg) { + private static BatchListenerFailedException getBatchListenerFailedException(@Nullable Throwable throwableArg) { if (throwableArg == null || throwableArg instanceof BatchListenerFailedException) { return (BatchListenerFailedException) throwableArg; }