From 6af320626e5fb637e536b0ac11c407cf821bbd30 Mon Sep 17 00:00:00 2001 From: kurt Date: Fri, 20 Oct 2023 14:39:33 +0900 Subject: [PATCH] StreamBridge's dynamic destinations should be closed on shutdown. - if not, we might lose some messages on shutdown especially in case of kafka async mode producing. Resolves #2835 --- .../stream/function/StreamBridgeTests.java | 19 +++++++++++++++++++ .../cloud/stream/function/StreamBridge.java | 10 +++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java index 8a7eb786c..766f39626 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java @@ -639,6 +639,25 @@ void testDynamicDestination() { } } + @Test + void testDynamicDestinationDestroy() { + BindingService bindingService; + try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration + .getCompleteConfiguration(InterceptorConfiguration.class)) + .web(WebApplicationType.NONE).run( + "--spring.jmx.enabled=false", + "--spring.cloud.stream.dynamic-destination-cache-size=1" + )) { + StreamBridge bridge = context.getBean(StreamBridge.class); + bridge.send("a", "hello foo"); + + bindingService = context.getBean(BindingService.class); + assertThat(bindingService.getProducerBindingNames().length).isEqualTo(1); + assertThat(bindingService.getProducerBindingNames()[0]).isEqualTo("a"); + } + assertThat(bindingService.getProducerBindingNames().length).isEqualTo(0); + } + @Test void testWithIntegrationFlowBecauseMarcinSaidSo() { try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java index c27f68849..df85dc557 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionRegistration; @@ -77,7 +78,7 @@ * */ @SuppressWarnings("rawtypes") -public final class StreamBridge implements StreamOperations, SmartInitializingSingleton { +public final class StreamBridge implements StreamOperations, SmartInitializingSingleton, DisposableBean { private static final String STREAM_BRIDGE_FUNC_NAME = "streamBridge"; @@ -297,4 +298,11 @@ private void addInterceptors(AbstractMessageChannel messageChannel, String desti this.applicationContext.getBean(GlobalChannelInterceptorProcessor.class); globalChannelInterceptorProcessor.postProcessAfterInitialization(messageChannel, destinationName); } + + @Override + public void destroy() throws Exception { + channelCache.keySet().forEach(bindingService::unbindProducers); + channelCache.clear(); + } + }