From 26135e3c8ce15d0c66ba3768638d348cc7bc5bde Mon Sep 17 00:00:00 2001 From: Antonin ARQUEY Date: Tue, 20 Feb 2024 23:14:23 +0100 Subject: [PATCH] GH-3050: Delegating EH delegates compatibility Fixes: #3050 Correct CommonDelegatingErrorHandler validation for delegates compatibility. Add documentation stating that delegates must be compatible with default error handler. --- .../kafka/annotation-error-handling.adoc | 2 + .../CommonDelegatingErrorHandler.java | 8 ++-- .../CommonDelegatingErrorHandlerTests.java | 47 ++++++++++++++++++- 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc index 52f9ec06cc..d163358646 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc @@ -351,6 +351,8 @@ This is to cause the transaction to roll back (if transactions are enabled). The `CommonDelegatingErrorHandler` can delegate to different error handlers, depending on the exception type. For example, you may wish to invoke a `DefaultErrorHandler` for most exceptions, or a `CommonContainerStoppingErrorHandler` for others. +All delegates must share the same compatible properties (`ackAfterHandle`, `seekAfterError` ...). + [[log-eh]] == Logging Error Handler diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java index d510701f35..af937e78cc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,7 @@ * * @author Gary Russell * @author Adrian Chlebosz + * @author Antonin Arquey * @since 2.8 * */ @@ -65,6 +66,7 @@ public CommonDelegatingErrorHandler(CommonErrorHandler defaultErrorHandler) { * Set the delegate error handlers; a {@link LinkedHashMap} argument is recommended so * that the delegates are searched in a known order. * @param delegates the delegates. + * @throws IllegalArgumentException if any of the delegates is not compatible with the default error handler. */ public void setErrorHandlers(Map, CommonErrorHandler> delegates) { Assert.notNull(delegates, "'delegates' cannot be null"); @@ -109,6 +111,7 @@ public void setAckAfterHandle(boolean ack) { * Add a delegate to the end of the current collection. * @param throwable the throwable for this handler. * @param handler the handler. + * @throws IllegalArgumentException if the handler is not compatible with the default error handler. */ public void addDelegate(Class throwable, CommonErrorHandler handler) { Map, CommonErrorHandler> delegatesToCheck = new LinkedHashMap<>(this.delegates); @@ -118,13 +121,12 @@ public void addDelegate(Class throwable, CommonErrorHandler this.delegates.putAll(delegatesToCheck); } - @SuppressWarnings("deprecation") private void checkDelegatesAndUpdateClassifier(Map, CommonErrorHandler> delegatesToCheck) { boolean ackAfterHandle = this.defaultErrorHandler.isAckAfterHandle(); boolean seeksAfterHandling = this.defaultErrorHandler.seeksAfterHandling(); - this.delegates.values().forEach(handler -> { + delegatesToCheck.values().forEach(handler -> { Assert.isTrue(ackAfterHandle == handler.isAckAfterHandle(), "All delegates must return the same value when calling 'isAckAfterHandle()'"); Assert.isTrue(seeksAfterHandling == handler.seeksAfterHandling(), diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/CommonDelegatingErrorHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/CommonDelegatingErrorHandlerTests.java index 1a25253a41..c58b25da18 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/CommonDelegatingErrorHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/CommonDelegatingErrorHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,9 @@ package org.springframework.kafka.listener; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -39,6 +41,7 @@ * * @author Gary Russell * @author Adrian Chlebosz + * @author Antonin Arquey * @since 2.8 * */ @@ -173,6 +176,48 @@ void testDefaultDelegateIsApplied() { verify(defaultHandler).handleRemaining(any(), any(), any(), any()); } + @Test + void testAddIncompatibleAckAfterHandleDelegate() { + var defaultHandler = mock(CommonErrorHandler.class); + given(defaultHandler.isAckAfterHandle()).willReturn(true); + var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler); + var delegate = mock(CommonErrorHandler.class); + given(delegate.isAckAfterHandle()).willReturn(false); + + assertThatThrownBy(() -> delegatingErrorHandler.addDelegate(IllegalStateException.class, delegate)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("All delegates must return the same value when calling 'isAckAfterHandle()'"); + } + + @Test + void testAddIncompatibleSeeksAfterHandlingDelegate() { + var defaultHandler = mock(CommonErrorHandler.class); + given(defaultHandler.seeksAfterHandling()).willReturn(true); + var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler); + var delegate = mock(CommonErrorHandler.class); + given(delegate.seeksAfterHandling()).willReturn(false); + + assertThatThrownBy(() -> delegatingErrorHandler.addDelegate(IllegalStateException.class, delegate)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("All delegates must return the same value when calling 'seeksAfterHandling()'"); + } + + @Test + void testAddMultipleDelegatesWithOneIncompatible() { + var defaultHandler = mock(CommonErrorHandler.class); + given(defaultHandler.seeksAfterHandling()).willReturn(true); + var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler); + var one = mock(CommonErrorHandler.class); + given(one.seeksAfterHandling()).willReturn(true); + var two = mock(CommonErrorHandler.class); + given(one.seeksAfterHandling()).willReturn(false); + Map, CommonErrorHandler> delegates = Map.of(IllegalStateException.class, one, IOException.class, two); + + assertThatThrownBy(() -> delegatingErrorHandler.setErrorHandlers(delegates)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("All delegates must return the same value when calling 'seeksAfterHandling()'"); + } + private Exception wrap(Exception ex) { return new ListenerExecutionFailedException("test", ex); }