From 8bcb0529e2f0875b3499ae74aaa46a21200f47e3 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 24 Jun 2021 14:04:45 -0400 Subject: [PATCH] GH-1842: Add Conditional Delegating Error Handlers Resolves https://github.com/spring-projects/spring-kafka/issues/1842 * Checkstyle fixes and polishing. * Polishing - return after a delegate handles the error. --- .../src/main/asciidoc/kafka.adoc | 8 ++ .../src/main/asciidoc/whats-new.adoc | 6 + ...onditionalDelegatingBatchErrorHandler.java | 106 ++++++++++++++++++ .../ConditionalDelegatingErrorHandler.java | 95 ++++++++++++++++ .../listener/ContainerAwareErrorHandler.java | 8 +- ...onditionalDelegatingErrorHandlerTests.java | 90 +++++++++++++++ 6 files changed, 311 insertions(+), 2 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/ConditionalDelegatingBatchErrorHandler.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/ConditionalDelegatingErrorHandler.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/ConditionalDelegatingErrorHandlerTests.java diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 86b5d53ff4..549287086a 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -5314,6 +5314,14 @@ The `ContainerStoppingBatchErrorHandler` (used with batch listeners) stops the c After the container stops, an exception that wraps the `ListenerExecutionFailedException` is thrown. This is to cause the transaction to roll back (if transactions are enabled). +[[cond-eh]] +===== Conditional Delegating Error Handlers + +Introduced in version 2.7.4, the `ConditionalDelegatingErrorHandler` can delegate to different error handlers, depending on the exception type. +For example, you may wish to invoke a `SeekToCurrentErrorHandler` for most exceptions, or a `ContainerStoppingErrorHandler` for others. + +Similarly, the `ConditionalDelegatingBatchErrorHandler` is provided. + [[after-rollback]] ===== After-rollback Processor diff --git a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc index 401b6f159b..853d2d5667 100644 --- a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc +++ b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc @@ -105,3 +105,9 @@ See <> for more information. A new `BackOff` implementation is provided, making it more convenient to configure the max retries. See <> for more information. + +[[x27-delegating-eh]] +==== Conditional Delegating Error Handlers + +These new error handlers can be configured to delegate to different error handlers, depending on the exception type. +See <> for more information. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConditionalDelegatingBatchErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConditionalDelegatingBatchErrorHandler.java new file mode 100644 index 0000000000..18981252f9 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConditionalDelegatingBatchErrorHandler.java @@ -0,0 +1,106 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * An error handler that delegates to different error handlers, depending on the exception + * type. + * + * @author Gary Russell + * @since 2.7.4 + * + */ +public class ConditionalDelegatingBatchErrorHandler implements ContainerAwareBatchErrorHandler { + + private final ContainerAwareBatchErrorHandler defaultErrorHandler; + + private final Map, ContainerAwareBatchErrorHandler> delegates = new LinkedHashMap<>(); + + /** + * Construct an instance with a default error handler that will be invoked if the + * exception has no matches. + * @param defaultErrorHandler the default error handler. + */ + public ConditionalDelegatingBatchErrorHandler(ContainerAwareBatchErrorHandler defaultErrorHandler) { + Assert.notNull(defaultErrorHandler, "'defaultErrorHandler' cannot be null"); + this.defaultErrorHandler = defaultErrorHandler; + } + + /** + * Set the delegate error handlers; a {@link LinkedHashMap} argument is recommended so + * that the delegates are searched in a known order. + * @param delegates the delegates. + */ + public void setErrorHandlers(Map, ContainerAwareBatchErrorHandler> delegates) { + this.delegates.clear(); + this.delegates.putAll(delegates); + } + + /** + * Add a delegate to the end of the current collection. + * @param throwable the throwable for this handler. + * @param handler the handler. + */ + public void addDelegate(Class throwable, ContainerAwareBatchErrorHandler handler) { + this.delegates.put(throwable, handler); + } + + @Override + public void handle(Exception thrownException, ConsumerRecords records, Consumer consumer, + MessageListenerContainer container) { + + // Never called but, just in case + doHandle(thrownException, records, consumer, container, null); + } + + @Override + public void handle(Exception thrownException, ConsumerRecords records, Consumer consumer, + MessageListenerContainer container, Runnable invokeListener) { + + doHandle(thrownException, records, consumer, container, invokeListener); + } + + protected void doHandle(Exception thrownException, ConsumerRecords records, Consumer consumer, + MessageListenerContainer container, @Nullable Runnable invokeListener) { + + Throwable cause = thrownException; + if (cause instanceof ListenerExecutionFailedException) { + cause = thrownException.getCause(); + } + if (cause != null) { + Class causeClass = cause.getClass(); + for (Entry, ContainerAwareBatchErrorHandler> entry : this.delegates.entrySet()) { + if (entry.getKey().equals(causeClass)) { + entry.getValue().handle(thrownException, records, consumer, container, invokeListener); + return; + } + } + } + this.defaultErrorHandler.handle(thrownException, records, consumer, container, invokeListener); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConditionalDelegatingErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConditionalDelegatingErrorHandler.java new file mode 100644 index 0000000000..f2f7a01880 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConditionalDelegatingErrorHandler.java @@ -0,0 +1,95 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * An error handler that delegates to different error handlers, depending on the exception + * type. + * + * @author Gary Russell + * @since 2.7.4 + * + */ +public class ConditionalDelegatingErrorHandler implements ContainerAwareErrorHandler { + + private final ContainerAwareErrorHandler defaultErrorHandler; + + private final Map, ContainerAwareErrorHandler> delegates = new LinkedHashMap<>(); + + /** + * Construct an instance with a default error handler that will be invoked if the + * exception has no matches. + * @param defaultErrorHandler the default error handler. + */ + public ConditionalDelegatingErrorHandler(ContainerAwareErrorHandler defaultErrorHandler) { + Assert.notNull(defaultErrorHandler, "'defaultErrorHandler' cannot be null"); + this.defaultErrorHandler = defaultErrorHandler; + } + + /** + * Set the delegate error handlers; a {@link LinkedHashMap} argument is recommended so + * that the delegates are searched in a known order. + * @param delegates the delegates. + */ + public void setErrorHandlers(Map, ContainerAwareErrorHandler> delegates) { + this.delegates.clear(); + this.delegates.putAll(delegates); + } + + /** + * Add a delegate to the end of the current collection. + * @param throwable the throwable for this handler. + * @param handler the handler. + */ + public void addDelegate(Class throwable, ContainerAwareErrorHandler handler) { + this.delegates.put(throwable, handler); + } + + @Override + public void handle(Exception thrownException, @Nullable List> records, Consumer consumer, + MessageListenerContainer container) { + + boolean handled = false; + Throwable cause = thrownException; + if (cause instanceof ListenerExecutionFailedException) { + cause = thrownException.getCause(); + } + if (cause != null) { + Class causeClass = cause.getClass(); + for (Entry, ContainerAwareErrorHandler> entry : this.delegates.entrySet()) { + if (entry.getKey().equals(causeClass)) { + handled = true; + entry.getValue().handle(thrownException, records, consumer, container); + return; + } + } + } + this.defaultErrorHandler.handle(thrownException, records, consumer, container); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerAwareErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerAwareErrorHandler.java index afc7150ae8..4c473d473a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerAwareErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerAwareErrorHandler.java @@ -21,6 +21,8 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.lang.Nullable; + /** * An error handler that has access to the unprocessed records from the last poll * (including the failed record), the consumer, and the container. @@ -35,12 +37,14 @@ public interface ContainerAwareErrorHandler extends RemainingRecordsErrorHandler { @Override - default void handle(Exception thrownException, List> records, Consumer consumer) { + default void handle(Exception thrownException, @Nullable List> records, + Consumer consumer) { + throw new UnsupportedOperationException("Container should never call this"); } @Override - void handle(Exception thrownException, List> records, Consumer consumer, + void handle(Exception thrownException, @Nullable List> records, Consumer consumer, MessageListenerContainer container); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConditionalDelegatingErrorHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConditionalDelegatingErrorHandlerTests.java new file mode 100644 index 0000000000..cbfbfbc44c --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConditionalDelegatingErrorHandlerTests.java @@ -0,0 +1,90 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.junit.jupiter.api.Test; + +/** + * @author Gary Russell + * @since 2.7.4 + * + */ +public class ConditionalDelegatingErrorHandlerTests { + + @Test + void testRecordDelegates() { + var def = mock(ContainerAwareErrorHandler.class); + var one = mock(ContainerAwareErrorHandler.class); + var two = mock(ContainerAwareErrorHandler.class); + var three = mock(ContainerAwareErrorHandler.class); + var eh = new ConditionalDelegatingErrorHandler(def); + eh.setErrorHandlers(Map.of(IllegalStateException.class, one, IllegalArgumentException.class, two)); + eh.addDelegate(RuntimeException.class, three); + + eh.handle(wrap(new IOException()), Collections.emptyList(), mock(Consumer.class), + mock(MessageListenerContainer.class)); + verify(def).handle(any(), any(), any(), any()); + eh.handle(wrap(new RuntimeException()), Collections.emptyList(), mock(Consumer.class), + mock(MessageListenerContainer.class)); + verify(three).handle(any(), any(), any(), any()); + eh.handle(wrap(new IllegalArgumentException()), Collections.emptyList(), mock(Consumer.class), + mock(MessageListenerContainer.class)); + verify(two).handle(any(), any(), any(), any()); + eh.handle(wrap(new IllegalStateException()), Collections.emptyList(), mock(Consumer.class), + mock(MessageListenerContainer.class)); + verify(one).handle(any(), any(), any(), any()); + } + + @Test + void testBatchDelegates() { + var def = mock(ContainerAwareBatchErrorHandler.class); + var one = mock(ContainerAwareBatchErrorHandler.class); + var two = mock(ContainerAwareBatchErrorHandler.class); + var three = mock(ContainerAwareBatchErrorHandler.class); + var eh = new ConditionalDelegatingBatchErrorHandler(def); + eh.setErrorHandlers(Map.of(IllegalStateException.class, one, IllegalArgumentException.class, two)); + eh.addDelegate(RuntimeException.class, three); + + eh.handle(wrap(new IOException()), mock(ConsumerRecords.class), mock(Consumer.class), + mock(MessageListenerContainer.class), mock(Runnable.class)); + verify(def).handle(any(), any(), any(), any(), any()); + eh.handle(wrap(new RuntimeException()), mock(ConsumerRecords.class), mock(Consumer.class), + mock(MessageListenerContainer.class), mock(Runnable.class)); + verify(three).handle(any(), any(), any(), any(), any()); + eh.handle(wrap(new IllegalArgumentException()), mock(ConsumerRecords.class), mock(Consumer.class), + mock(MessageListenerContainer.class), mock(Runnable.class)); + verify(two).handle(any(), any(), any(), any(), any()); + eh.handle(wrap(new IllegalStateException()), mock(ConsumerRecords.class), mock(Consumer.class), + mock(MessageListenerContainer.class), mock(Runnable.class)); + verify(one).handle(any(), any(), any(), any(), any()); + } + + private Exception wrap(Exception ex) { + return new ListenerExecutionFailedException("test", ex); + } + +}