Skip to content

Commit

Permalink
StreamBridge's dynamic destinations should be closed on shutdown.
Browse files Browse the repository at this point in the history
- if not, we might lose some messages on shutdown especially in case of kafka async mode producing.

Resolves #2835
  • Loading branch information
kurthong77 authored and olegz committed Oct 20, 2023
1 parent b867311 commit 6af3206
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,25 @@ void testDynamicDestination() {
}
}

@Test
void testDynamicDestinationDestroy() {
BindingService bindingService;
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(InterceptorConfiguration.class))
.web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.stream.dynamic-destination-cache-size=1"
)) {
StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("a", "hello foo");

bindingService = context.getBean(BindingService.class);
assertThat(bindingService.getProducerBindingNames().length).isEqualTo(1);
assertThat(bindingService.getProducerBindingNames()[0]).isEqualTo("a");
}
assertThat(bindingService.getProducerBindingNames().length).isEqualTo(0);
}

@Test
void testWithIntegrationFlowBecauseMarcinSaidSo() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistration;
Expand Down Expand Up @@ -77,7 +78,7 @@
*
*/
@SuppressWarnings("rawtypes")
public final class StreamBridge implements StreamOperations, SmartInitializingSingleton {
public final class StreamBridge implements StreamOperations, SmartInitializingSingleton, DisposableBean {

private static final String STREAM_BRIDGE_FUNC_NAME = "streamBridge";

Expand Down Expand Up @@ -297,4 +298,11 @@ private void addInterceptors(AbstractMessageChannel messageChannel, String desti
this.applicationContext.getBean(GlobalChannelInterceptorProcessor.class);
globalChannelInterceptorProcessor.postProcessAfterInitialization(messageChannel, destinationName);
}

@Override
public void destroy() throws Exception {
channelCache.keySet().forEach(bindingService::unbindProducers);
channelCache.clear();
}

}

0 comments on commit 6af3206

Please sign in to comment.