Skip to content

Commit

Permalink
GH-2968: Fix DEH#handleBatchAndReturnRemaining
Browse files Browse the repository at this point in the history
DefaultErrorHandler#handleBatchAndReturnRemaining recovered invalid and infinite loop when kafka listener threw BatchListenerFailedException and error record is first one in remaining list

* address empty catch
* add unit test

Co-authored-by: Zhiyang.Wang1 <[email protected]>
  • Loading branch information
Wzy19930507 and Zhiyang.Wang1 authored Jan 17, 2024
1 parent cf63cab commit c188304
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@

package org.springframework.kafka.listener;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -37,6 +35,7 @@

import org.springframework.kafka.KafkaException;
import org.springframework.kafka.KafkaException.Level;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.backoff.BackOff;

Expand All @@ -50,6 +49,7 @@
*
* @author Gary Russell
* @author Francois Rosiere
* @author Wang Zhiyang
* @since 2.8
*
*/
Expand Down Expand Up @@ -120,7 +120,7 @@ public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange)
@Override
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) {
notRetryable.forEach(ex -> handler.addNotRetryableExceptions(ex));
notRetryable.forEach(handler::addNotRetryableExceptions);
}
}

Expand Down Expand Up @@ -178,7 +178,6 @@ protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, Consume
else {
return String.format("Record not found in batch, index %d out of bounds (0, %d); "
+ "re-seeking batch", index, data.count() - 1);

}
});
fallback(thrownException, data, consumer, container, invokeListener);
Expand All @@ -201,11 +200,9 @@ private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
return -1;
}
int i = 0;
Iterator<?> iterator = data.iterator();
while (iterator.hasNext()) {
ConsumerRecord<?, ?> candidate = (ConsumerRecord<?, ?>) iterator.next();
if (candidate.topic().equals(record.topic()) && candidate.partition() == record.partition()
&& candidate.offset() == record.offset()) {
for (ConsumerRecord<?, ?> datum : data) {
if (datum.topic().equals(record.topic()) && datum.partition() == record.partition()
&& datum.offset() == record.offset()) {
break;
}
i++;
Expand All @@ -220,29 +217,25 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
if (data == null) {
return ConsumerRecords.empty();
}
Iterator<?> iterator = data.iterator();
List<ConsumerRecord<?, ?>> toCommit = new ArrayList<>();
List<ConsumerRecord<?, ?>> remaining = new ArrayList<>();
int index = indexArg;
while (iterator.hasNext()) {
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) iterator.next();
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<?, ?> datum : data) {
if (index-- > 0) {
toCommit.add(record);
offsets.compute(new TopicPartition(datum.topic(), datum.partition()),
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, datum.offset() + 1));
}
else {
remaining.add(record);
remaining.add(datum);
}
}
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()),
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
if (offsets.size() > 0) {
commit(consumer, container, offsets);
}
if (isSeekAfterError()) {
if (remaining.size() > 0) {
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
getFailureTracker()::recovered, this.logger, getLogLevel());
getFailureTracker(), this.logger, getLogLevel());
ConsumerRecord<?, ?> recovered = remaining.get(0);
commit(consumer, container,
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
Expand All @@ -254,35 +247,43 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
return ConsumerRecords.empty();
}
else {
if (indexArg == 0) {
return (ConsumerRecords<K, V>) data; // first record just rerun the whole thing
}
else {
if (remaining.size() > 0) {
try {
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
consumer)) {
remaining.remove(0);
}
}
catch (Exception e) {
if (SeekUtils.isBackoffException(thrownException)) {
this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0))
+ " included in remaining due to retry back off " + thrownException);
}
else {
this.logger.error(e, KafkaUtils.format(remaining.get(0))
+ " included in remaining due to " + thrownException);
}
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
tp -> new ArrayList<ConsumerRecord<K, V>>()).add((ConsumerRecord<K, V>) rec));
return new ConsumerRecords<>(remains);
}
if (remaining.isEmpty()) {
return ConsumerRecords.empty();
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
tp -> new ArrayList<>()).add((ConsumerRecord<K, V>) rec));
return new ConsumerRecords<>(remains);
}
}

private void commit(Consumer<?, ?> consumer, MessageListenerContainer container, Map<TopicPartition, OffsetAndMetadata> offsets) {
private void commit(Consumer<?, ?> consumer, MessageListenerContainer container,
Map<TopicPartition, OffsetAndMetadata> offsets) {

boolean syncCommits = container.getContainerProperties().isSyncCommits();
Duration timeout = container.getContainerProperties().getSyncCommitTimeout();
if (syncCommits) {
consumer.commitSync(offsets, timeout);
ContainerProperties properties = container.getContainerProperties();
if (properties.isSyncCommits()) {
consumer.commitSync(offsets, properties.getSyncCommitTimeout());
}
else {
OffsetCommitCallback commitCallback = container.getContainerProperties().getCommitCallback();
OffsetCommitCallback commitCallback = properties.getCommitCallback();
if (commitCallback == null) {
commitCallback = LOGGING_COMMIT_CALLBACK;
}
Expand All @@ -304,8 +305,8 @@ private BatchListenerFailedException getBatchListenerFailedException(Throwable t
throwable = throwable.getCause();
checked.add(throwable);

if (throwable instanceof BatchListenerFailedException) {
target = (BatchListenerFailedException) throwable;
if (throwable instanceof BatchListenerFailedException batchListenerFailedException) {
target = batchListenerFailedException;
break;
}
}
Expand Down
Loading

0 comments on commit c188304

Please sign in to comment.