Skip to content

Commit

Permalink
GH-2997 Fix support for producer's error-handler-definition
Browse files Browse the repository at this point in the history
Resolves #2997
  • Loading branch information
olegz committed Sep 24, 2024
1 parent add6b01 commit 8fc89bc
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*
* @author Chris Bono
*/
public class RabbitTestContainer {
public final class RabbitTestContainer {

private static final RabbitMQContainer RABBITMQ;
static {
Expand All @@ -42,6 +42,9 @@ public class RabbitTestContainer {
RABBITMQ.start();
}

private RabbitTestContainer() {

}
/**
* Should be called early by test that wants to ensure a shared {@link RabbitMQContainer} is up and running.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void test() throws Exception {
"--spring.cloud.stream.bindings.function-in-0.destination=input")) {
TestChannelBinder binder = context.getBean(TestChannelBinder.class);
Method registerErrorInfrastructure = ReflectionUtils
.findMethod(TestChannelBinder.class, "registerErrorInfrastructure", ProducerDestination.class, String.class);
.findMethod(TestChannelBinder.class, "registerErrorInfrastructure", ProducerDestination.class, String.class, boolean.class);
registerErrorInfrastructure.setAccessible(true);
ProducerDestination destination = new ProducerDestination() {
@Override
Expand All @@ -56,7 +56,7 @@ public String getName() {
return "sample";
}
};
registerErrorInfrastructure.invoke(binder, destination, "function-in-0");
registerErrorInfrastructure.invoke(binder, destination, "function-in-0", false);
destination = new ProducerDestination() {
@Override
public String getNameForPartition(int partition) {
Expand All @@ -68,7 +68,7 @@ public String getName() {
return "sample";
}
};
registerErrorInfrastructure.invoke(binder, destination, "function-in-0");
registerErrorInfrastructure.invoke(binder, destination, "function-in-0", false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,13 @@ public final Binding<MessageChannel> doBindProducer(final String destination,
bp = bsp.getBindingProperties(bindingName);
}

SubscribableChannel errorChannel = (bp != null && StringUtils.hasText(bp.getErrorHandlerDefinition())) || producerProperties.isErrorChannelEnabled()
? registerErrorInfrastructure(producerDestination, producerProperties.getBindingName()) : null;
boolean errorHandlerDefined = bp != null && StringUtils.hasText(bp.getErrorHandlerDefinition());
SubscribableChannel errorChannel = errorHandlerDefined || producerProperties.isErrorChannelEnabled()
? registerErrorInfrastructure(producerDestination, producerProperties.getBindingName(), errorHandlerDefined)
: null;

String errorChannelName = errorsBaseName(producerDestination, producerProperties.getBindingName());
this.subscribeFunctionErrorHandler(errorChannelName, producerProperties.getBindingName());

producerMessageHandler = createProducerMessageHandler(producerDestination,
producerProperties, outputChannel, errorChannel);
Expand Down Expand Up @@ -727,7 +732,7 @@ protected void afterUnbindConsumer(ConsumerDestination destination, String group
* @return the channel.
*/
private SubscribableChannel registerErrorInfrastructure(
ProducerDestination destination, String bindingName) {
ProducerDestination destination, String bindingName, boolean errorHandlerDefinitionAvailable) {

String errorChannelName = errorsBaseName(destination, bindingName);
SubscribableChannel errorChannel = new PublishSubscribeChannel();
Expand All @@ -751,7 +756,7 @@ private SubscribableChannel registerErrorInfrastructure(
}

MessageChannel defaultErrorChannel = null;
if (getApplicationContext()
if (!errorHandlerDefinitionAvailable && getApplicationContext()
.containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) {
defaultErrorChannel = getApplicationContext().getBean(
IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
Expand Down

0 comments on commit 8fc89bc

Please sign in to comment.