diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java index 3ee7da9f90..622ec87459 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -229,49 +229,53 @@ private ConsumerRecords seekOrRecover(Exception thrownException, @N remaining.add(datum); } } - if (offsets.size() > 0) { - commit(consumer, container, offsets); - } - if (isSeekAfterError()) { - if (remaining.size() > 0) { - SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false, - getFailureTracker(), this.logger, getLogLevel()); - ConsumerRecord recovered = remaining.get(0); - commit(consumer, container, - Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()), - ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1))); - if (remaining.size() > 1) { - throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException); - } + try { + if (offsets.size() > 0) { + commit(consumer, container, offsets); } - return ConsumerRecords.empty(); } - else { - if (remaining.size() > 0) { - try { - if (getFailureTracker().recovered(remaining.get(0), thrownException, container, - consumer)) { - remaining.remove(0); + finally { + if (isSeekAfterError()) { + if (remaining.size() > 0) { + SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false, + getFailureTracker(), this.logger, getLogLevel()); + ConsumerRecord recovered = remaining.get(0); + commit(consumer, container, + Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()), + ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1))); + if (remaining.size() > 1) { + throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException); } } - catch (Exception e) { - if (SeekUtils.isBackoffException(thrownException)) { - this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0)) - + " included in remaining due to retry back off " + thrownException); + return ConsumerRecords.empty(); + } + else { + if (remaining.size() > 0) { + try { + if (getFailureTracker().recovered(remaining.get(0), thrownException, container, + consumer)) { + remaining.remove(0); + } } - else { - this.logger.error(e, KafkaUtils.format(remaining.get(0)) - + " included in remaining due to " + thrownException); + catch (Exception e) { + if (SeekUtils.isBackoffException(thrownException)) { + this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0)) + + " included in remaining due to retry back off " + thrownException); + } + else { + this.logger.error(e, KafkaUtils.format(remaining.get(0)) + + " included in remaining due to " + thrownException); + } } } + if (remaining.isEmpty()) { + return ConsumerRecords.empty(); + } + Map>> remains = new HashMap<>(); + remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()), + tp -> new ArrayList<>()).add((ConsumerRecord) rec)); + return new ConsumerRecords<>(remains); } - if (remaining.isEmpty()) { - return ConsumerRecords.empty(); - } - Map>> remains = new HashMap<>(); - remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()), - tp -> new ArrayList<>()).add((ConsumerRecord) rec)); - return new ConsumerRecords<>(remains); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java index 5cd79076b9..c4da76a1cd 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,11 @@ package org.springframework.kafka.listener; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.BDDMockito.willReturn; import static org.mockito.BDDMockito.willThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -34,16 +37,19 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RebalanceInProgressException; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.springframework.core.log.LogAccessor; import org.springframework.data.util.DirectFieldAccessFallbackBeanWrapper; +import org.springframework.kafka.KafkaException; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.FixedBackOff; /** * @author Gary Russell + * @author Francois Rosiere * @since 3.0.3 * */ @@ -52,15 +58,6 @@ public class FailedBatchProcessorTests { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void indexOutOfBounds() { - class TestFBP extends FailedBatchProcessor { - - TestFBP(BiConsumer, Exception> recoverer, BackOff backOff, - CommonErrorHandler fallbackHandler) { - - super(recoverer, backOff, fallbackHandler); - } - - } CommonErrorHandler mockEH = mock(CommonErrorHandler.class); willThrow(new IllegalStateException("fallback")).given(mockEH).handleBatch(any(), any(), any(), any(), any()); @@ -83,15 +80,6 @@ records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnab @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void recordNotPresent() { - class TestFBP extends FailedBatchProcessor { - - TestFBP(BiConsumer, Exception> recoverer, BackOff backOff, - CommonErrorHandler fallbackHandler) { - - super(recoverer, backOff, fallbackHandler); - } - - } CommonErrorHandler mockEH = mock(CommonErrorHandler.class); willThrow(new IllegalStateException("fallback")).given(mockEH).handleBatch(any(), any(), any(), any(), any()); @@ -114,4 +102,34 @@ records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnab assertThat(output).contains("Record not found in batch: topic-42@123;"); } + @Test + void testExceptionDuringCommit() { + CommonErrorHandler mockEH = mock(CommonErrorHandler.class); + willThrow(new IllegalStateException("ise")).given(mockEH).handleBatch(any(), any(), any(), any(), any()); + + ConsumerRecord rec1 = new ConsumerRecord("topic", 0, 0L, null, null); + ConsumerRecord rec2 = new ConsumerRecord("topic", 0, 1L, null, null); + ConsumerRecord rec3 = new ConsumerRecord("topic", 0, 2L, null, null); + + ConsumerRecords records = new ConsumerRecords(Map.of(new TopicPartition("topic", 0), List.of(rec1, rec2, rec3))); + TestFBP testFBP = new TestFBP((rec, ex) -> { }, new FixedBackOff(2L, 2L), mockEH); + final Consumer consumer = mock(Consumer.class); + willThrow(new RebalanceInProgressException("rebalance in progress")).given(consumer).commitSync(anyMap(), any()); + final MessageListenerContainer mockMLC = mock(MessageListenerContainer.class); + willReturn(new ContainerProperties("topic")).given(mockMLC).getContainerProperties(); + assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> + testFBP.handle(new BatchListenerFailedException("topic", rec2), + records, consumer, mockMLC, mock(Runnable.class)) + ).withMessage("Seek to current after exception"); + } + + static class TestFBP extends FailedBatchProcessor { + + TestFBP(BiConsumer, Exception> recoverer, BackOff backOff, + CommonErrorHandler fallbackHandler) { + + super(recoverer, backOff, fallbackHandler); + } + + } }