Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor improvement error handler related #2970

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,13 @@ private void checkConfig() {
"A KafkaOperations is required when 'commitRecovered' is true");
}

@SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
@SuppressWarnings({ "unchecked", "rawtypes"})
@Override
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {

if (SeekUtils.doSeeks((List) records, consumer, exception, recoverable,
getFailureTracker()::recovered, container, this.logger)
getFailureTracker(), container, this.logger)
&& isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
ConsumerRecord<K, V> skipped = records.get(0);
this.kafkaTemplate.sendOffsetsToTransaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>
Consumer<?, ?> consumer, MessageListenerContainer container) {

SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
getFailureTracker()::recovered, this.logger, getLogLevel());
getFailureTracker(), this.logger, getLogLevel());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
listen(retryListeners, records, thrownException, attempt++);
ConsumerRecord<?, ?> first = records.iterator().next();
MessageListenerContainer childOrSingle = container.getContainerFor(first.topic(), first.partition());
if (childOrSingle instanceof ConsumerPauseResumeEventPublisher) {
((ConsumerPauseResumeEventPublisher) childOrSingle)
.publishConsumerPausedEvent(assignment, "For batch retry");
if (childOrSingle instanceof ConsumerPauseResumeEventPublisher consumerPauseResumeEventPublisher) {
consumerPauseResumeEventPublisher.publishConsumerPausedEvent(assignment, "For batch retry");
}
try {
Exception recoveryException = thrownException;
Expand Down Expand Up @@ -165,16 +164,16 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
retryListeners.forEach(listener -> listener.recovered(records, finalRecoveryException));
}
catch (Exception ex) {
logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch");
logger.error(ex, "Recoverer threw an exception; re-seeking batch");
retryListeners.forEach(listener -> listener.recoveryFailed(records, thrownException, ex));
seeker.handleBatch(thrownException, records, consumer, container, NO_OP);
}
}
finally {
Set<TopicPartition> assignment2 = consumer.assignment();
consumer.resume(assignment2);
if (childOrSingle instanceof ConsumerPauseResumeEventPublisher) {
((ConsumerPauseResumeEventPublisher) childOrSingle).publishConsumerResumedEvent(assignment2);
if (childOrSingle instanceof ConsumerPauseResumeEventPublisher consumerPauseResumeEventPublisher) {
consumerPauseResumeEventPublisher.publishConsumerResumedEvent(assignment2);
}
}
} // NOSONAR NCSS line count
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 the original author or authors.
* Copyright 2021-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -219,7 +219,6 @@ public Boolean removeClassification(Class<? extends Exception> exceptionType) {
* @author Gary Russell
*
*/
@SuppressWarnings("serial")
private static final class ExtendedBinaryExceptionClassifier extends BinaryExceptionClassifier {

ExtendedBinaryExceptionClassifier(Map<Class<? extends Throwable>, Boolean> typeMap, boolean defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,6 @@

package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -56,8 +54,6 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen

private final FailedRecordTracker failureTracker;

private final List<RetryListener> retryListeners = new ArrayList<>();

private boolean commitRecovered;

private BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> userBackOffFunction = (rec, ex) -> null;
Expand Down Expand Up @@ -136,12 +132,10 @@ public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange)
public void setRetryListeners(RetryListener... listeners) {
Assert.noNullElements(listeners, "'listeners' cannot have null elements");
this.failureTracker.setRetryListeners(listeners);
this.retryListeners.clear();
this.retryListeners.addAll(Arrays.asList(listeners));
}

protected List<RetryListener> getRetryListeners() {
return this.retryListeners;
return this.failureTracker.getRetryListeners();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ class FailedRecordTracker implements RecoveryStrategy {
};
}
else {
if (recoverer instanceof ConsumerAwareRecordRecoverer) {
this.recoverer = (ConsumerAwareRecordRecoverer) recoverer;
if (recoverer instanceof ConsumerAwareRecordRecoverer carr) {
this.recoverer = carr;
}
else {
this.recoverer = (rec, consumer, ex) -> recoverer.accept(rec, ex);
Expand Down Expand Up @@ -150,7 +150,7 @@ public void setRetryListeners(RetryListener... listeners) {
this.retryListeners.addAll(Arrays.asList(listeners));
}

List<RetryListener> getRetryListeners() {
protected List<RetryListener> getRetryListeners() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class itself is package-protected. Why did you make this method protected.?

Copy link
Contributor Author

@Wzy19930507 Wzy19930507 Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for you review, package-protected is much better.

return this.retryListeners;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
.stream()
.collect(
Collectors.toMap(tp -> tp,
tp -> data.records(tp).get(0).offset(), (u, v) -> (long) v, LinkedHashMap::new))
tp -> data.records(tp).get(0).offset(), (u, v) -> v, LinkedHashMap::new))
.forEach(consumer::seek);

throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1458,9 +1458,8 @@ private void doProcessCommits() {
ConsumerRecords<K, V> pending = this.remainingRecords;
this.remainingRecords = null;
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
Iterator<ConsumerRecord<K, V>> iterator = pending.iterator();
while (iterator.hasNext()) {
records.add(iterator.next());
for (ConsumerRecord<K, V> kvConsumerRecord : pending) {
records.add(kvConsumerRecord);
}
this.commonErrorHandler.handleRemaining(cfe, records, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
Expand Down Expand Up @@ -2403,7 +2402,7 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
ConsumerRecords<K, V> records = recordsArg;
List<ConsumerRecord<K, V>> recordList = recordListArg;
if (this.listenerinfo != null) {
records.iterator().forEachRemaining(rec -> listenerInfo(rec));
records.iterator().forEachRemaining(this::listenerInfo);
}
if (this.batchInterceptor != null) {
records = this.batchInterceptor.intercept(recordsArg, this.consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public static void unrecoverableBackOff(BackOff backOff, Map<Thread, BackOffExec
if (interval == BackOffExecution.STOP) {
interval = lastIntervals.get(currentThread);
if (interval == null) {
interval = Long.valueOf(0);
interval = 0L;
}
}
lastIntervals.put(currentThread, interval);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -204,7 +204,7 @@ public static void seekOrRecover(Exception thrownException, @Nullable List<Consu
}
}

if (records == null || !doSeeks(records, consumer, thrownException, true, recovery, container, logger)) { // NOSONAR
if (!doSeeks(records, consumer, thrownException, true, recovery, container, logger)) { // NOSONAR
throw new KafkaException("Seek to current after exception", level, thrownException);
}
if (commitRecovered) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -93,8 +93,6 @@ public void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exce

});
ConsumerRecord<String, String> record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
ConsumerRecord<String, String> record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar");
List<ConsumerRecord<?, ?>> records = Arrays.asList(record1, record2);
IllegalStateException illegalState = new IllegalStateException();
Consumer<?, ?> consumer = mock(Consumer.class);
assertThat(handler.handleOne(illegalState, record1, consumer, mock(MessageListenerContainer.class))).isFalse();
Expand All @@ -116,7 +114,7 @@ public void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exce
assertThat(failedDeliveryAttempt.get()).isEqualTo(1);
assertThat(recoveryFailureEx.get())
.isInstanceOf(RuntimeException.class)
.extracting(ex -> ex.getMessage())
.extracting(Throwable::getMessage)
.isEqualTo("test recoverer failure");
assertThat(isRecovered.get()).isTrue();
}
Expand Down Expand Up @@ -183,7 +181,7 @@ consumer, mock(MessageListenerContainer.class)))
assertThat(failedDeliveryAttempt.get()).isEqualTo(1);
assertThat(recoveryFailureEx.get())
.isInstanceOf(RuntimeException.class)
.extracting(ex -> ex.getMessage())
.extracting(Throwable::getMessage)
.isEqualTo("test recoverer failure");
assertThat(isRecovered.get()).isTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,6 @@ public void testDelegateType() throws Exception {
.isEqualTo(ListenerType.SIMPLE);
template.sendDefault(0, 0, "foo");
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
// verify that the container called the right method - avoiding the creation of an Acknowledgment
// assertThat(trace.get()[1].getMethodName()).contains("onMessage"); // onMessage(d, a, c) (inner)
// assertThat(trace.get()[2].getMethodName()).contains("onMessage"); // bridge
// assertThat(trace.get()[3].getMethodName()).contains("onMessage"); // onMessage(d, a, c) (outer)
// assertThat(trace.get()[4].getMethodName()).contains("onMessage"); // onMessage(d)
// assertThat(trace.get()[5].getMethodName()).contains("onMessage"); // bridge
// assertThat(trace.get()[6].getMethodName()).contains("invokeRecordListener");
container.stop();
final CountDownLatch latch3 = new CountDownLatch(1);
filtering = new FilteringMessageListenerAdapter<>(
Expand All @@ -299,15 +292,6 @@ public void testDelegateType() throws Exception {
.isEqualTo(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE);
template.sendDefault(0, 0, "foo");
assertThat(latch3.await(10, TimeUnit.SECONDS)).isTrue();
// verify that the container called the 3 arg method directly
// int i = 0;
// if (trace.get()[1].getClassName().endsWith("AcknowledgingConsumerAwareMessageListener")) {
// // this frame does not appear in eclise, but does in gradle.\
// i++;
// }
// assertThat(trace.get()[i + 1].getMethodName()).contains("onMessage"); // onMessage(d, a, c)
// assertThat(trace.get()[i + 2].getMethodName()).contains("onMessage"); // bridge
// assertThat(trace.get()[i + 3].getMethodName()).contains("invokeRecordListener");
container.stop();
long t = System.currentTimeMillis();
container.stop();
Expand Down