Skip to content

Commit

Permalink
GH-3227: Implement handleOne() in CDEH
Browse files Browse the repository at this point in the history
Fixes: #3227

* Implement handleOne() in `CommonDelegatingErrorHandler`
* Add tests for handle methods in `CommonDelegatingErrorHandler`
* Add tests for the new `handleOne()` method, as well as a test for `handleOtherException()`
* Checkstyle fixes

(cherry picked from commit 4e06c2c)
  • Loading branch information
Blackney authored and sobychacko committed May 3, 2024
1 parent 4a727e1 commit 18e509e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
*
* @author Gary Russell
* @author Adrian Chlebosz
* @author Antonin Arquey
* @author Dan Blackney
* @since 2.8
*
*/
Expand Down Expand Up @@ -188,6 +190,19 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
}
}

@Override
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
MessageListenerContainer container) {

CommonErrorHandler handler = findDelegate(thrownException);
if (handler != null) {
return handler.handleOne(thrownException, record, consumer, container);
}
else {
return this.defaultErrorHandler.handleOne(thrownException, record, consumer, container);
}
}

@Nullable
private CommonErrorHandler findDelegate(Throwable thrownException) {
Throwable cause = findCause(thrownException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand All @@ -27,6 +28,7 @@
import java.util.Map;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.jupiter.api.Test;

Expand All @@ -39,13 +41,15 @@
*
* @author Gary Russell
* @author Adrian Chlebosz
* @author Antonin Arquey
* @author Dan Blackney
* @since 2.8
*
*/
public class CommonDelegatingErrorHandlerTests {

@Test
void testRecordDelegates() {
void testHandleRemainingDelegates() {
var def = mock(CommonErrorHandler.class);
var one = mock(CommonErrorHandler.class);
var two = mock(CommonErrorHandler.class);
Expand All @@ -69,7 +73,7 @@ void testRecordDelegates() {
}

@Test
void testBatchDelegates() {
void testHandleBatchDelegates() {
var def = mock(CommonErrorHandler.class);
var one = mock(CommonErrorHandler.class);
var two = mock(CommonErrorHandler.class);
Expand All @@ -92,6 +96,54 @@ void testBatchDelegates() {
verify(one).handleBatch(any(), any(), any(), any(), any());
}

@Test
void testHandleOtherExceptionDelegates() {
var def = mock(CommonErrorHandler.class);
var one = mock(CommonErrorHandler.class);
var two = mock(CommonErrorHandler.class);
var three = mock(CommonErrorHandler.class);
var eh = new CommonDelegatingErrorHandler(def);
eh.setErrorHandlers(Map.of(IllegalStateException.class, one, IllegalArgumentException.class, two));
eh.addDelegate(RuntimeException.class, three);

eh.handleOtherException(wrap(new IOException()), mock(Consumer.class),
mock(MessageListenerContainer.class), true);
verify(def).handleOtherException(any(), any(), any(), anyBoolean());
eh.handleOtherException(wrap(new KafkaException("test")), mock(Consumer.class),
mock(MessageListenerContainer.class), true);
verify(three).handleOtherException(any(), any(), any(), anyBoolean());
eh.handleOtherException(wrap(new IllegalArgumentException()), mock(Consumer.class),
mock(MessageListenerContainer.class), true);
verify(two).handleOtherException(any(), any(), any(), anyBoolean());
eh.handleOtherException(wrap(new IllegalStateException()), mock(Consumer.class),
mock(MessageListenerContainer.class), true);
verify(one).handleOtherException(any(), any(), any(), anyBoolean());
}

@Test
void testHandleOneDelegates() {
var def = mock(CommonErrorHandler.class);
var one = mock(CommonErrorHandler.class);
var two = mock(CommonErrorHandler.class);
var three = mock(CommonErrorHandler.class);
var eh = new CommonDelegatingErrorHandler(def);
eh.setErrorHandlers(Map.of(IllegalStateException.class, one, IllegalArgumentException.class, two));
eh.addDelegate(RuntimeException.class, three);

eh.handleOne(wrap(new IOException()), mock(ConsumerRecord.class), mock(Consumer.class),
mock(MessageListenerContainer.class));
verify(def).handleOne(any(), any(), any(), any());
eh.handleOne(wrap(new KafkaException("test")), mock(ConsumerRecord.class), mock(Consumer.class),
mock(MessageListenerContainer.class));
verify(three).handleOne(any(), any(), any(), any());
eh.handleOne(wrap(new IllegalArgumentException()), mock(ConsumerRecord.class), mock(Consumer.class),
mock(MessageListenerContainer.class));
verify(two).handleOne(any(), any(), any(), any());
eh.handleOne(wrap(new IllegalStateException()), mock(ConsumerRecord.class), mock(Consumer.class),
mock(MessageListenerContainer.class));
verify(one).handleOne(any(), any(), any(), any());
}

@Test
void testDelegateForThrowableIsAppliedWhenCauseTraversingIsEnabled() {
var defaultHandler = mock(CommonErrorHandler.class);
Expand Down

0 comments on commit 18e509e

Please sign in to comment.