Skip to content

Commit

Permalink
GH-2826: Fix CommonDelegatingErrorHandler
Browse files Browse the repository at this point in the history
Resolves #2826

The `addDelegate` method did not update the classifier, so it failed to
work when cause chain traversal is enabled.

**cherry-pick to 3.0.x, 2.9.x**

* Do not mutate the delegates field until after the validity check.
  • Loading branch information
garyrussell authored Oct 9, 2023
1 parent 6815803 commit c98e4cd
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,7 @@ public void setErrorHandlers(Map<Class<? extends Throwable>, CommonErrorHandler>
Assert.notNull(delegates, "'delegates' cannot be null");
this.delegates.clear();
this.delegates.putAll(delegates);
checkDelegates();
updateClassifier(delegates);
}

private void updateClassifier(Map<Class<? extends Throwable>, CommonErrorHandler> delegates) {
Map<Class<? extends Throwable>, Boolean> classifications = delegates.keySet().stream()
.map(commonErrorHandler -> Map.entry(commonErrorHandler, true))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
this.classifier = new BinaryExceptionClassifier(classifications);
checkDelegatesAndUpdateClassifier(this.delegates);
}

/**
Expand Down Expand Up @@ -119,12 +111,17 @@ public void setAckAfterHandle(boolean ack) {
* @param handler the handler.
*/
public void addDelegate(Class<? extends Throwable> throwable, CommonErrorHandler handler) {
this.delegates.put(throwable, handler);
checkDelegates();
Map<Class<? extends Throwable>, CommonErrorHandler> delegatesToCheck = new LinkedHashMap<>(this.delegates);
delegatesToCheck.put(throwable, handler);
checkDelegatesAndUpdateClassifier(delegatesToCheck);
this.delegates.clear();
this.delegates.putAll(delegatesToCheck);
}

@SuppressWarnings("deprecation")
private void checkDelegates() {
private void checkDelegatesAndUpdateClassifier(Map<Class<? extends Throwable>,
CommonErrorHandler> delegatesToCheck) {

boolean ackAfterHandle = this.defaultErrorHandler.isAckAfterHandle();
boolean seeksAfterHandling = this.defaultErrorHandler.seeksAfterHandling();
this.delegates.values().forEach(handler -> {
Expand All @@ -133,6 +130,14 @@ private void checkDelegates() {
Assert.isTrue(seeksAfterHandling == handler.seeksAfterHandling(),
"All delegates must return the same value when calling 'seeksAfterHandling()'");
});
updateClassifier(delegatesToCheck);
}

private void updateClassifier(Map<Class<? extends Throwable>, CommonErrorHandler> delegates) {
Map<Class<? extends Throwable>, Boolean> classifications = delegates.keySet().stream()
.map(commonErrorHandler -> Map.entry(commonErrorHandler, true))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
this.classifier = new BinaryExceptionClassifier(classifications);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.kafka.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -31,6 +32,7 @@

import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.test.utils.KafkaTestUtils;

/**
* Tests for {@link CommonDelegatingErrorHandler}.
Expand Down Expand Up @@ -134,7 +136,7 @@ void testDelegateForThrowableCauseIsAppliedWhenCauseTraversingIsEnabled() {
}

@Test
@SuppressWarnings("ConstantConditions")
@SuppressWarnings({ "ConstantConditions", "unchecked" })
void testDelegateForClassifiableThrowableCauseIsAppliedWhenCauseTraversingIsEnabled() {
var defaultHandler = mock(CommonErrorHandler.class);

Expand All @@ -147,6 +149,10 @@ void testDelegateForClassifiableThrowableCauseIsAppliedWhenCauseTraversingIsEnab
delegatingErrorHandler.setErrorHandlers(Map.of(
KafkaException.class, directCauseErrorHandler
));
delegatingErrorHandler.addDelegate(IllegalStateException.class, mock(CommonErrorHandler.class));
assertThat(KafkaTestUtils.getPropertyValue(delegatingErrorHandler, "classifier.classified", Map.class).keySet())
.contains(IllegalStateException.class);


delegatingErrorHandler.handleRemaining(exc, Collections.emptyList(), mock(Consumer.class),
mock(MessageListenerContainer.class));
Expand Down

0 comments on commit c98e4cd

Please sign in to comment.