From 48258dfeae4e43ebd0d1723c80df5355a675a992 Mon Sep 17 00:00:00 2001 From: Wang Zhiyang <1208931582@qq.com> Date: Sat, 23 Dec 2023 02:19:43 +0800 Subject: [PATCH] GH-2970: minor improvement error handler related Fixes: #2970 * minor improvement error handler related * remove `FailedRecordProcessor.retryListeners`, reuse FailureTracker's retryListeners * cleanup related to error handlers --- .../listener/DefaultAfterRollbackProcessor.java | 4 ++-- .../kafka/listener/DefaultErrorHandler.java | 2 +- .../kafka/listener/ErrorHandlingUtils.java | 11 +++++------ .../kafka/listener/ExceptionClassifier.java | 3 +-- .../kafka/listener/FailedRecordProcessor.java | 10 ++-------- .../kafka/listener/FailedRecordTracker.java | 4 ++-- .../listener/FallbackBatchErrorHandler.java | 2 +- .../listener/KafkaMessageListenerContainer.java | 7 +++---- .../kafka/listener/ListenerUtils.java | 2 +- .../kafka/listener/SeekUtils.java | 4 ++-- .../listener/DefaultErrorHandlerRecordTests.java | 8 +++----- .../KafkaMessageListenerContainerTests.java | 16 ---------------- 12 files changed, 23 insertions(+), 50 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java index 4685e018dc..bef6a1c6ec 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java @@ -150,13 +150,13 @@ private void checkConfig() { "A KafkaOperations is required when 'commitRecovered' is true"); } - @SuppressWarnings({ "unchecked", "rawtypes", "deprecation" }) + @SuppressWarnings({ "unchecked", "rawtypes"}) @Override public void process(List> records, Consumer consumer, @Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) { if (SeekUtils.doSeeks((List) records, consumer, exception, recoverable, - getFailureTracker()::recovered, container, this.logger) + getFailureTracker(), container, this.logger) && isCommitRecovered() && this.kafkaTemplate.isTransactional()) { ConsumerRecord skipped = records.get(0); this.kafkaTemplate.sendOffsetsToTransaction( diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java index 8d3df39088..318569c87b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java @@ -166,7 +166,7 @@ public void handleRemaining(Exception thrownException, List Consumer consumer, MessageListenerContainer container) { SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR - getFailureTracker()::recovered, this.logger, getLogLevel()); + getFailureTracker(), this.logger, getLogLevel()); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java index 7d7700a967..79d02a1183 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java @@ -94,9 +94,8 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r listen(retryListeners, records, thrownException, attempt++); ConsumerRecord first = records.iterator().next(); MessageListenerContainer childOrSingle = container.getContainerFor(first.topic(), first.partition()); - if (childOrSingle instanceof ConsumerPauseResumeEventPublisher) { - ((ConsumerPauseResumeEventPublisher) childOrSingle) - .publishConsumerPausedEvent(assignment, "For batch retry"); + if (childOrSingle instanceof ConsumerPauseResumeEventPublisher consumerPauseResumeEventPublisher) { + consumerPauseResumeEventPublisher.publishConsumerPausedEvent(assignment, "For batch retry"); } try { Exception recoveryException = thrownException; @@ -165,7 +164,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r retryListeners.forEach(listener -> listener.recovered(records, finalRecoveryException)); } catch (Exception ex) { - logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch"); + logger.error(ex, "Recoverer threw an exception; re-seeking batch"); retryListeners.forEach(listener -> listener.recoveryFailed(records, thrownException, ex)); seeker.handleBatch(thrownException, records, consumer, container, NO_OP); } @@ -173,8 +172,8 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r finally { Set assignment2 = consumer.assignment(); consumer.resume(assignment2); - if (childOrSingle instanceof ConsumerPauseResumeEventPublisher) { - ((ConsumerPauseResumeEventPublisher) childOrSingle).publishConsumerResumedEvent(assignment2); + if (childOrSingle instanceof ConsumerPauseResumeEventPublisher consumerPauseResumeEventPublisher) { + consumerPauseResumeEventPublisher.publishConsumerResumedEvent(assignment2); } } } // NOSONAR NCSS line count diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java index 05527b8f60..a746d2aabb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -219,7 +219,6 @@ public Boolean removeClassification(Class exceptionType) { * @author Gary Russell * */ - @SuppressWarnings("serial") private static final class ExtendedBinaryExceptionClassifier extends BinaryExceptionClassifier { ExtendedBinaryExceptionClassifier(Map, Boolean> typeMap, boolean defaultValue) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java index 80f9399346..a8bcf45054 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ package org.springframework.kafka.listener; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -56,8 +54,6 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen private final FailedRecordTracker failureTracker; - private final List retryListeners = new ArrayList<>(); - private boolean commitRecovered; private BiFunction, Exception, BackOff> userBackOffFunction = (rec, ex) -> null; @@ -136,12 +132,10 @@ public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) public void setRetryListeners(RetryListener... listeners) { Assert.noNullElements(listeners, "'listeners' cannot have null elements"); this.failureTracker.setRetryListeners(listeners); - this.retryListeners.clear(); - this.retryListeners.addAll(Arrays.asList(listeners)); } protected List getRetryListeners() { - return this.retryListeners; + return this.failureTracker.getRetryListeners(); } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java index b773136c63..bfbe435b13 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java @@ -90,8 +90,8 @@ class FailedRecordTracker implements RecoveryStrategy { }; } else { - if (recoverer instanceof ConsumerAwareRecordRecoverer) { - this.recoverer = (ConsumerAwareRecordRecoverer) recoverer; + if (recoverer instanceof ConsumerAwareRecordRecoverer carr) { + this.recoverer = carr; } else { this.recoverer = (rec, consumer, ex) -> recoverer.accept(rec, ex); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java index 087af22217..2e2af07d6a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java @@ -181,7 +181,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords data, C .stream() .collect( Collectors.toMap(tp -> tp, - tp -> data.records(tp).get(0).offset(), (u, v) -> (long) v, LinkedHashMap::new)) + tp -> data.records(tp).get(0).offset(), (u, v) -> v, LinkedHashMap::new)) .forEach(consumer::seek); throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 7e6c93408e..7e7461d9b1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -1458,9 +1458,8 @@ private void doProcessCommits() { ConsumerRecords pending = this.remainingRecords; this.remainingRecords = null; List> records = new ArrayList<>(); - Iterator> iterator = pending.iterator(); - while (iterator.hasNext()) { - records.add(iterator.next()); + for (ConsumerRecord kvConsumerRecord : pending) { + records.add(kvConsumerRecord); } this.commonErrorHandler.handleRemaining(cfe, records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer); @@ -2403,7 +2402,7 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords r ConsumerRecords records = recordsArg; List> recordList = recordListArg; if (this.listenerinfo != null) { - records.iterator().forEachRemaining(rec -> listenerInfo(rec)); + records.iterator().forEachRemaining(this::listenerInfo); } if (this.batchInterceptor != null) { records = this.batchInterceptor.intercept(recordsArg, this.consumer); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java index b398966daf..b6bdf9b42b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java @@ -135,7 +135,7 @@ public static void unrecoverableBackOff(BackOff backOff, Map record, Exception original, Exce }); ConsumerRecord record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar"); - ConsumerRecord record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar"); - List> records = Arrays.asList(record1, record2); IllegalStateException illegalState = new IllegalStateException(); Consumer consumer = mock(Consumer.class); assertThat(handler.handleOne(illegalState, record1, consumer, mock(MessageListenerContainer.class))).isFalse(); @@ -116,7 +114,7 @@ public void recoveryFailed(ConsumerRecord record, Exception original, Exce assertThat(failedDeliveryAttempt.get()).isEqualTo(1); assertThat(recoveryFailureEx.get()) .isInstanceOf(RuntimeException.class) - .extracting(ex -> ex.getMessage()) + .extracting(Throwable::getMessage) .isEqualTo("test recoverer failure"); assertThat(isRecovered.get()).isTrue(); } @@ -183,7 +181,7 @@ consumer, mock(MessageListenerContainer.class))) assertThat(failedDeliveryAttempt.get()).isEqualTo(1); assertThat(recoveryFailureEx.get()) .isInstanceOf(RuntimeException.class) - .extracting(ex -> ex.getMessage()) + .extracting(Throwable::getMessage) .isEqualTo("test recoverer failure"); assertThat(isRecovered.get()).isTrue(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 88899ca37e..80e7439a33 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -279,13 +279,6 @@ public void testDelegateType() throws Exception { .isEqualTo(ListenerType.SIMPLE); template.sendDefault(0, 0, "foo"); assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue(); - // verify that the container called the right method - avoiding the creation of an Acknowledgment - // assertThat(trace.get()[1].getMethodName()).contains("onMessage"); // onMessage(d, a, c) (inner) - // assertThat(trace.get()[2].getMethodName()).contains("onMessage"); // bridge - // assertThat(trace.get()[3].getMethodName()).contains("onMessage"); // onMessage(d, a, c) (outer) - // assertThat(trace.get()[4].getMethodName()).contains("onMessage"); // onMessage(d) - // assertThat(trace.get()[5].getMethodName()).contains("onMessage"); // bridge - // assertThat(trace.get()[6].getMethodName()).contains("invokeRecordListener"); container.stop(); final CountDownLatch latch3 = new CountDownLatch(1); filtering = new FilteringMessageListenerAdapter<>( @@ -299,15 +292,6 @@ public void testDelegateType() throws Exception { .isEqualTo(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE); template.sendDefault(0, 0, "foo"); assertThat(latch3.await(10, TimeUnit.SECONDS)).isTrue(); - // verify that the container called the 3 arg method directly - // int i = 0; - // if (trace.get()[1].getClassName().endsWith("AcknowledgingConsumerAwareMessageListener")) { - // // this frame does not appear in eclise, but does in gradle.\ - // i++; - // } - // assertThat(trace.get()[i + 1].getMethodName()).contains("onMessage"); // onMessage(d, a, c) - // assertThat(trace.get()[i + 2].getMethodName()).contains("onMessage"); // bridge - // assertThat(trace.get()[i + 3].getMethodName()).contains("invokeRecordListener"); container.stop(); long t = System.currentTimeMillis(); container.stop();