Skip to content

Commit

Permalink
GH-3019: seek even in case of commit failure
Browse files Browse the repository at this point in the history
Fixes: #3019

* DefaultErrorHandler is not able to seek in case of an exception during the commit

(cherry picked from commit cfa369b)
  • Loading branch information
frosiere authored and spring-builds committed Feb 22, 2024
1 parent 2c9aacc commit 5b63191
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -229,49 +229,53 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
remaining.add(datum);
}
}
if (offsets.size() > 0) {
commit(consumer, container, offsets);
}
if (isSeekAfterError()) {
if (remaining.size() > 0) {
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
getFailureTracker(), this.logger, getLogLevel());
ConsumerRecord<?, ?> recovered = remaining.get(0);
commit(consumer, container,
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1)));
if (remaining.size() > 1) {
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
}
try {
if (offsets.size() > 0) {
commit(consumer, container, offsets);
}
return ConsumerRecords.empty();
}
else {
if (remaining.size() > 0) {
try {
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
consumer)) {
remaining.remove(0);
finally {
if (isSeekAfterError()) {
if (remaining.size() > 0) {
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
getFailureTracker(), this.logger, getLogLevel());
ConsumerRecord<?, ?> recovered = remaining.get(0);
commit(consumer, container,
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1)));
if (remaining.size() > 1) {
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
}
}
catch (Exception e) {
if (SeekUtils.isBackoffException(thrownException)) {
this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0))
+ " included in remaining due to retry back off " + thrownException);
return ConsumerRecords.empty();
}
else {
if (remaining.size() > 0) {
try {
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
consumer)) {
remaining.remove(0);
}
}
else {
this.logger.error(e, KafkaUtils.format(remaining.get(0))
+ " included in remaining due to " + thrownException);
catch (Exception e) {
if (SeekUtils.isBackoffException(thrownException)) {
this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0))
+ " included in remaining due to retry back off " + thrownException);
}
else {
this.logger.error(e, KafkaUtils.format(remaining.get(0))
+ " included in remaining due to " + thrownException);
}
}
}
if (remaining.isEmpty()) {
return ConsumerRecords.empty();
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
tp -> new ArrayList<>()).add((ConsumerRecord<K, V>) rec));
return new ConsumerRecords<>(remains);
}
if (remaining.isEmpty()) {
return ConsumerRecords.empty();
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
tp -> new ArrayList<>()).add((ConsumerRecord<K, V>) rec));
return new ConsumerRecords<>(remains);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,8 +17,11 @@
package org.springframework.kafka.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand All @@ -34,16 +37,19 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import org.springframework.core.log.LogAccessor;
import org.springframework.data.util.DirectFieldAccessFallbackBeanWrapper;
import org.springframework.kafka.KafkaException;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

/**
* @author Gary Russell
* @author Francois Rosiere
* @since 3.0.3
*
*/
Expand All @@ -52,15 +58,6 @@ public class FailedBatchProcessorTests {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void indexOutOfBounds() {
class TestFBP extends FailedBatchProcessor {

TestFBP(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
CommonErrorHandler fallbackHandler) {

super(recoverer, backOff, fallbackHandler);
}

}
CommonErrorHandler mockEH = mock(CommonErrorHandler.class);
willThrow(new IllegalStateException("fallback")).given(mockEH).handleBatch(any(), any(), any(), any(), any());

Expand All @@ -83,15 +80,6 @@ records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnab
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void recordNotPresent() {
class TestFBP extends FailedBatchProcessor {

TestFBP(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
CommonErrorHandler fallbackHandler) {

super(recoverer, backOff, fallbackHandler);
}

}
CommonErrorHandler mockEH = mock(CommonErrorHandler.class);
willThrow(new IllegalStateException("fallback")).given(mockEH).handleBatch(any(), any(), any(), any(), any());

Expand All @@ -114,4 +102,34 @@ records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnab
assertThat(output).contains("Record not found in batch: topic-42@123;");
}

@Test
void testExceptionDuringCommit() {
CommonErrorHandler mockEH = mock(CommonErrorHandler.class);
willThrow(new IllegalStateException("ise")).given(mockEH).handleBatch(any(), any(), any(), any(), any());

ConsumerRecord rec1 = new ConsumerRecord("topic", 0, 0L, null, null);
ConsumerRecord rec2 = new ConsumerRecord("topic", 0, 1L, null, null);
ConsumerRecord rec3 = new ConsumerRecord("topic", 0, 2L, null, null);

ConsumerRecords records = new ConsumerRecords(Map.of(new TopicPartition("topic", 0), List.of(rec1, rec2, rec3)));
TestFBP testFBP = new TestFBP((rec, ex) -> { }, new FixedBackOff(2L, 2L), mockEH);
final Consumer consumer = mock(Consumer.class);
willThrow(new RebalanceInProgressException("rebalance in progress")).given(consumer).commitSync(anyMap(), any());
final MessageListenerContainer mockMLC = mock(MessageListenerContainer.class);
willReturn(new ContainerProperties("topic")).given(mockMLC).getContainerProperties();
assertThatExceptionOfType(KafkaException.class).isThrownBy(() ->
testFBP.handle(new BatchListenerFailedException("topic", rec2),
records, consumer, mockMLC, mock(Runnable.class))
).withMessage("Seek to current after exception");
}

static class TestFBP extends FailedBatchProcessor {

TestFBP(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
CommonErrorHandler fallbackHandler) {

super(recoverer, backOff, fallbackHandler);
}

}
}

0 comments on commit 5b63191

Please sign in to comment.