Skip to content

Commit

Permalink
GH-2968 DefaultErrorHandler#handleBatchAndReturnRemaining recovered i…
Browse files Browse the repository at this point in the history
…nvalid

* address empty catch
  • Loading branch information
Zhiyang.Wang1 committed Dec 22, 2023
1 parent 6186e47 commit cf1277a
Showing 1 changed file with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import org.springframework.kafka.KafkaException;
import org.springframework.kafka.KafkaException.Level;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.backoff.BackOff;

Expand Down Expand Up @@ -246,13 +247,23 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
return ConsumerRecords.empty();
}
else {
try {
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
consumer)) {
remaining.remove(0);
if (remaining.size() > 0) {
try {
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
consumer)) {
remaining.remove(0);
}
}
catch (Exception e) {
if (SeekUtils.isBackoffException(thrownException)) {
this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0))
+ " included in remaining due to retry back off " + thrownException);
}
else {
this.logger.error(e, KafkaUtils.format(remaining.get(0))
+ " included in remaining due to " + thrownException);
}
}
}
catch (Exception e) {
}
if (remaining.isEmpty()) {
return ConsumerRecords.empty();
Expand Down

0 comments on commit cf1277a

Please sign in to comment.