diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/binder/ErrorBindingTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/binder/ErrorBindingTests.java index 679aadb19..6ee9ddd9d 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/binder/ErrorBindingTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/binder/ErrorBindingTests.java @@ -19,6 +19,7 @@ import java.util.function.Consumer; import java.util.function.Function; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.springframework.boot.WebApplicationType; @@ -29,6 +30,9 @@ import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; +import org.springframework.integration.channel.AbstractMessageChannel; +import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.integration.handler.BridgeHandler; import org.springframework.messaging.support.GenericMessage; import static org.assertj.core.api.Assertions.assertThat; @@ -99,7 +103,7 @@ void configurationWithoutBinderSpecificErrorHandler() { @Test void errorBindingWithMultipleDestinationPerBinding() { - new SpringApplicationBuilder( + ApplicationContext context = new SpringApplicationBuilder( TestChannelBinderConfiguration.getCompleteConfiguration(NoErrorHandler.class)) .web(WebApplicationType.NONE) .run("--spring.cloud.stream.bindings.process-in-0.consumer.max-attempts=1", @@ -108,6 +112,27 @@ void errorBindingWithMultipleDestinationPerBinding() { "--spring.jmx.enabled=false"); // must not fail GH-2599 + InputDestination source = context.getBean(InputDestination.class); + source.send(new GenericMessage("Hello".getBytes())); + // We validate that error is logged only once : BridgeHandler to bean 'errorChannel' subscribed only once + BinderErrorChannel binderErrorChannel = context.getBean(BinderErrorChannel.class); + assertThat(binderErrorChannel.getSubscriberCount()) + .isEqualTo(2); // binderProvidedErrorHandler and BridgeHandler to bean 'errorChannel' + // The BridgeHandler bean associated with this binding bridges to 'errorChannel' bean + assertThat( + context.getBeansOfType(BridgeHandler.class) + .entrySet() + .stream() + .filter(entry -> entry.getKey().endsWith("process-in-0.errors.bridge")) + .findAny()) + .isPresent() + .get() + .satisfies(bridgeHandler -> assertThat(bridgeHandler.getValue().getOutputChannel()) + .isNotNull() + .asInstanceOf(InstanceOfAssertFactories.type(AbstractMessageChannel.class)) + .extracting(AbstractMessageChannel::getBeanName) + .isEqualTo(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) + ); } @EnableAutoConfiguration diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java index d00b95a9c..6bd7c5d48 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java @@ -886,10 +886,14 @@ protected final ErrorInfrastructure registerErrorInfrastructure( // Setup a bridge to global errorChannel to ensure logging of errors could be controlled via standard SI way if (this.getApplicationContext().containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) && this.isSubscribable(binderErrorChannel)) { - SubscribableChannel globalErrorChannel = this.getApplicationContext().getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, SubscribableChannel.class); - BridgeHandler bridge = new BridgeHandler(); - bridge.setOutputChannel(globalErrorChannel); - binderErrorChannel.subscribe(bridge); + String errorBridgeHandlerName = getErrorBridgeName(destination, group, consumerProperties); + if (!getApplicationContext().containsBean(errorBridgeHandlerName)) { + SubscribableChannel globalErrorChannel = this.getApplicationContext().getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, SubscribableChannel.class); + BridgeHandler bridge = new BridgeHandler(); + bridge.setOutputChannel(globalErrorChannel); + binderErrorChannel.subscribe(bridge); + ((GenericApplicationContext) getApplicationContext()).registerBean(errorBridgeHandlerName, BridgeHandler.class, () -> bridge); + } } return new ErrorInfrastructure(binderErrorChannel, recoverer, binderProvidedErrorHandler); }