Skip to content

Commit

Permalink
GH-2970: minor improvement error handler related
Browse files Browse the repository at this point in the history
Fixes: #2970 

* minor improvement error handler related
* remove `FailedRecordProcessor.retryListeners`, reuse FailureTracker's retryListeners
* cleanup related to error handlers
  • Loading branch information
Wzy19930507 authored Dec 22, 2023
1 parent 4962670 commit 48258df
Show file tree
Hide file tree
Showing 12 changed files with 23 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,13 @@ private void checkConfig() {
"A KafkaOperations is required when 'commitRecovered' is true");
}

@SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
@SuppressWarnings({ "unchecked", "rawtypes"})
@Override
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {

if (SeekUtils.doSeeks((List) records, consumer, exception, recoverable,
getFailureTracker()::recovered, container, this.logger)
getFailureTracker(), container, this.logger)
&& isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
ConsumerRecord<K, V> skipped = records.get(0);
this.kafkaTemplate.sendOffsetsToTransaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>
Consumer<?, ?> consumer, MessageListenerContainer container) {

SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
getFailureTracker()::recovered, this.logger, getLogLevel());
getFailureTracker(), this.logger, getLogLevel());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
listen(retryListeners, records, thrownException, attempt++);
ConsumerRecord<?, ?> first = records.iterator().next();
MessageListenerContainer childOrSingle = container.getContainerFor(first.topic(), first.partition());
if (childOrSingle instanceof ConsumerPauseResumeEventPublisher) {
((ConsumerPauseResumeEventPublisher) childOrSingle)
.publishConsumerPausedEvent(assignment, "For batch retry");
if (childOrSingle instanceof ConsumerPauseResumeEventPublisher consumerPauseResumeEventPublisher) {
consumerPauseResumeEventPublisher.publishConsumerPausedEvent(assignment, "For batch retry");
}
try {
Exception recoveryException = thrownException;
Expand Down Expand Up @@ -165,16 +164,16 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
retryListeners.forEach(listener -> listener.recovered(records, finalRecoveryException));
}
catch (Exception ex) {
logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch");
logger.error(ex, "Recoverer threw an exception; re-seeking batch");
retryListeners.forEach(listener -> listener.recoveryFailed(records, thrownException, ex));
seeker.handleBatch(thrownException, records, consumer, container, NO_OP);
}
}
finally {
Set<TopicPartition> assignment2 = consumer.assignment();
consumer.resume(assignment2);
if (childOrSingle instanceof ConsumerPauseResumeEventPublisher) {
((ConsumerPauseResumeEventPublisher) childOrSingle).publishConsumerResumedEvent(assignment2);
if (childOrSingle instanceof ConsumerPauseResumeEventPublisher consumerPauseResumeEventPublisher) {
consumerPauseResumeEventPublisher.publishConsumerResumedEvent(assignment2);
}
}
} // NOSONAR NCSS line count
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 the original author or authors.
* Copyright 2021-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -219,7 +219,6 @@ public Boolean removeClassification(Class<? extends Exception> exceptionType) {
* @author Gary Russell
*
*/
@SuppressWarnings("serial")
private static final class ExtendedBinaryExceptionClassifier extends BinaryExceptionClassifier {

ExtendedBinaryExceptionClassifier(Map<Class<? extends Throwable>, Boolean> typeMap, boolean defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,6 @@

package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -56,8 +54,6 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen

private final FailedRecordTracker failureTracker;

private final List<RetryListener> retryListeners = new ArrayList<>();

private boolean commitRecovered;

private BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> userBackOffFunction = (rec, ex) -> null;
Expand Down Expand Up @@ -136,12 +132,10 @@ public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange)
public void setRetryListeners(RetryListener... listeners) {
Assert.noNullElements(listeners, "'listeners' cannot have null elements");
this.failureTracker.setRetryListeners(listeners);
this.retryListeners.clear();
this.retryListeners.addAll(Arrays.asList(listeners));
}

protected List<RetryListener> getRetryListeners() {
return this.retryListeners;
return this.failureTracker.getRetryListeners();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ class FailedRecordTracker implements RecoveryStrategy {
};
}
else {
if (recoverer instanceof ConsumerAwareRecordRecoverer) {
this.recoverer = (ConsumerAwareRecordRecoverer) recoverer;
if (recoverer instanceof ConsumerAwareRecordRecoverer carr) {
this.recoverer = carr;
}
else {
this.recoverer = (rec, consumer, ex) -> recoverer.accept(rec, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
.stream()
.collect(
Collectors.toMap(tp -> tp,
tp -> data.records(tp).get(0).offset(), (u, v) -> (long) v, LinkedHashMap::new))
tp -> data.records(tp).get(0).offset(), (u, v) -> v, LinkedHashMap::new))
.forEach(consumer::seek);

throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1458,9 +1458,8 @@ private void doProcessCommits() {
ConsumerRecords<K, V> pending = this.remainingRecords;
this.remainingRecords = null;
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
Iterator<ConsumerRecord<K, V>> iterator = pending.iterator();
while (iterator.hasNext()) {
records.add(iterator.next());
for (ConsumerRecord<K, V> kvConsumerRecord : pending) {
records.add(kvConsumerRecord);
}
this.commonErrorHandler.handleRemaining(cfe, records, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
Expand Down Expand Up @@ -2403,7 +2402,7 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
ConsumerRecords<K, V> records = recordsArg;
List<ConsumerRecord<K, V>> recordList = recordListArg;
if (this.listenerinfo != null) {
records.iterator().forEachRemaining(rec -> listenerInfo(rec));
records.iterator().forEachRemaining(this::listenerInfo);
}
if (this.batchInterceptor != null) {
records = this.batchInterceptor.intercept(recordsArg, this.consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExec
if (interval == BackOffExecution.STOP) {
interval = lastIntervals.get(currentThread);
if (interval == null) {
interval = Long.valueOf(0);
interval = 0L;
}
}
lastIntervals.put(currentThread, interval);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -204,7 +204,7 @@ public static void seekOrRecover(Exception thrownException, @Nullable List<Consu
}
}

if (records == null || !doSeeks(records, consumer, thrownException, true, recovery, container, logger)) { // NOSONAR
if (!doSeeks(records, consumer, thrownException, true, recovery, container, logger)) { // NOSONAR
throw new KafkaException("Seek to current after exception", level, thrownException);
}
if (commitRecovered) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -93,8 +93,6 @@ public void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exce

});
ConsumerRecord<String, String> record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
ConsumerRecord<String, String> record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar");
List<ConsumerRecord<?, ?>> records = Arrays.asList(record1, record2);
IllegalStateException illegalState = new IllegalStateException();
Consumer<?, ?> consumer = mock(Consumer.class);
assertThat(handler.handleOne(illegalState, record1, consumer, mock(MessageListenerContainer.class))).isFalse();
Expand All @@ -116,7 +114,7 @@ public void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exce
assertThat(failedDeliveryAttempt.get()).isEqualTo(1);
assertThat(recoveryFailureEx.get())
.isInstanceOf(RuntimeException.class)
.extracting(ex -> ex.getMessage())
.extracting(Throwable::getMessage)
.isEqualTo("test recoverer failure");
assertThat(isRecovered.get()).isTrue();
}
Expand Down Expand Up @@ -183,7 +181,7 @@ consumer, mock(MessageListenerContainer.class)))
assertThat(failedDeliveryAttempt.get()).isEqualTo(1);
assertThat(recoveryFailureEx.get())
.isInstanceOf(RuntimeException.class)
.extracting(ex -> ex.getMessage())
.extracting(Throwable::getMessage)
.isEqualTo("test recoverer failure");
assertThat(isRecovered.get()).isTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,6 @@ public void testDelegateType() throws Exception {
.isEqualTo(ListenerType.SIMPLE);
template.sendDefault(0, 0, "foo");
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
// verify that the container called the right method - avoiding the creation of an Acknowledgment
// assertThat(trace.get()[1].getMethodName()).contains("onMessage"); // onMessage(d, a, c) (inner)
// assertThat(trace.get()[2].getMethodName()).contains("onMessage"); // bridge
// assertThat(trace.get()[3].getMethodName()).contains("onMessage"); // onMessage(d, a, c) (outer)
// assertThat(trace.get()[4].getMethodName()).contains("onMessage"); // onMessage(d)
// assertThat(trace.get()[5].getMethodName()).contains("onMessage"); // bridge
// assertThat(trace.get()[6].getMethodName()).contains("invokeRecordListener");
container.stop();
final CountDownLatch latch3 = new CountDownLatch(1);
filtering = new FilteringMessageListenerAdapter<>(
Expand All @@ -299,15 +292,6 @@ public void testDelegateType() throws Exception {
.isEqualTo(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE);
template.sendDefault(0, 0, "foo");
assertThat(latch3.await(10, TimeUnit.SECONDS)).isTrue();
// verify that the container called the 3 arg method directly
// int i = 0;
// if (trace.get()[1].getClassName().endsWith("AcknowledgingConsumerAwareMessageListener")) {
// // this frame does not appear in eclise, but does in gradle.\
// i++;
// }
// assertThat(trace.get()[i + 1].getMethodName()).contains("onMessage"); // onMessage(d, a, c)
// assertThat(trace.get()[i + 2].getMethodName()).contains("onMessage"); // bridge
// assertThat(trace.get()[i + 3].getMethodName()).contains("invokeRecordListener");
container.stop();
long t = System.currentTimeMillis();
container.stop();
Expand Down

0 comments on commit 48258df

Please sign in to comment.