diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java index 136034b038e..dbe3e0c9118 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java @@ -44,6 +44,7 @@ import org.springframework.integration.config.xml.IntegrationNamespaceUtils; import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.context.IntegrationProperties; +import org.springframework.integration.endpoint.management.IntegrationKeepAlive; import org.springframework.integration.handler.LoggingHandler; import org.springframework.integration.handler.support.IntegrationMessageHandlerMethodFactory; import org.springframework.integration.json.JsonPathUtils; @@ -129,6 +130,7 @@ public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) t registerListMessageHandlerMethodFactory(); registerIntegrationConfigurationReport(); registerControlBusCommandRegistry(); + registerKeepAlive(); } @Override @@ -460,4 +462,15 @@ private static BeanDefinitionBuilder createMessageHandlerMethodFactoryBeanDefini IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME); } + private void registerKeepAlive() { + if (!this.beanFactory.containsBean(IntegrationContextUtils.INTEGRATION_KEEP_ALIVE_BEAN_NAME)) { + BeanDefinitionBuilder builder = + BeanDefinitionBuilder.genericBeanDefinition(IntegrationKeepAlive.class) + .setRole(BeanDefinition.ROLE_INFRASTRUCTURE); + + this.registry.registerBeanDefinition(IntegrationContextUtils.INTEGRATION_KEEP_ALIVE_BEAN_NAME, + builder.getBeanDefinition()); + } + } + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java index f7d2f8a857c..b404c060323 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java @@ -100,8 +100,18 @@ public abstract class IntegrationContextUtils { public static final String LIST_MESSAGE_HANDLER_FACTORY_BEAN_NAME = "integrationListMessageHandlerMethodFactory"; + /** + * The bean name for the {@code org.springframework.integration.support.management.ControlBusCommandRegistry}. + * @since 6.4 + */ public static final String CONTROL_BUS_COMMAND_REGISTRY_BEAN_NAME = "controlBusCommandRegistry"; + /** + * The bean name for the {@code org.springframework.integration.endpoint.management.IntegrationKeepAlive}. + * @since 6.4 + */ + public static final String INTEGRATION_KEEP_ALIVE_BEAN_NAME = "integrationKeepAlive"; + /** * The default timeout for blocking operations like send and receive messages. * @since 6.1 diff --git a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationProperties.java b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationProperties.java index d31bd8c9303..41c58b376e4 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationProperties.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationProperties.java @@ -25,7 +25,7 @@ /** * Utility class to encapsulate infrastructure Integration properties constants and their default values. - * The default values can be overridden by the {@code META-INF/spring.integration.properties} with this entries + * The default values can be overridden by the {@code META-INF/spring.integration.properties} with these entries * (includes their default values): * * * @author Artem Bilan @@ -117,6 +118,12 @@ public final class IntegrationProperties { */ public static final String ENDPOINTS_DEFAULT_TIMEOUT = INTEGRATION_PROPERTIES_PREFIX + "endpoints.defaultTimeout"; + /** + * Set to {@code false} to fully disable Keep-Alive thread. + * @since 6.4 + */ + public static final String KEEP_ALIVE = INTEGRATION_PROPERTIES_PREFIX + "keepAlive"; + private static final Properties DEFAULTS; private boolean channelsAutoCreate = true; @@ -139,6 +146,8 @@ public final class IntegrationProperties { private long endpointsDefaultTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT; + private boolean keepAlive = true; + private volatile Properties properties; static { @@ -312,11 +321,30 @@ public long getEndpointsDefaultTimeout() { /** * Configure a value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option. * @param endpointsDefaultTimeout the value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option. + * @since 6.2 */ public void setEndpointsDefaultTimeout(long endpointsDefaultTimeout) { this.endpointsDefaultTimeout = endpointsDefaultTimeout; } + /** + * Return the value of {@link #KEEP_ALIVE} option. + * @return the value of {@link #KEEP_ALIVE} option. + * @since 6.4 + */ + public boolean isKeepAlive() { + return this.keepAlive; + } + + /** + * Configure a value for {@link #KEEP_ALIVE} option. + * Defaults {@code true} - set to {@code false} disable keep-alive thread. + * @param keepAlive {@code false} to disable keep-alive thread. + */ + public void setKeepAlive(boolean keepAlive) { + this.keepAlive = keepAlive; + } + /** * Represent the current instance as a {@link Properties}. * @return the {@link Properties} representation. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/management/IntegrationKeepAlive.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/management/IntegrationKeepAlive.java new file mode 100644 index 00000000000..33d7e0efe79 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/management/IntegrationKeepAlive.java @@ -0,0 +1,150 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.endpoint.management; + +import java.time.Instant; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.context.SmartLifecycle; +import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.integration.context.IntegrationProperties; +import org.springframework.integration.endpoint.AbstractPollingEndpoint; +import org.springframework.scheduling.TaskScheduler; + +/** + * The component to keep an application alive when there are no non-daemon threads. + * Some application might just not rely on the loops in specific threads for their logic. + * Or target protocol to integrate with communicates via daemon threads. + *

+ * A bean for this class is registered automatically by Spring Integration infrastructure. + * It is started by application context for a blocked keep-alive dedicated thread + * only if there is no {@link AbstractPollingEndpoint} beans in the application context + * or {@link TaskScheduler} is configured for daemon (or virtual) threads. + *

+ * Can be stopped (or started respectively) manually after injection into some target service if found redundant. + *

+ * The {@link IntegrationProperties#KEEP_ALIVE} integration global + * property can be set to {@code false} to disable this component regardless of the application logic. + * + * @author Artem Bilan + * + * @since 6.4 + */ +public class IntegrationKeepAlive implements SmartLifecycle, SmartInitializingSingleton, BeanFactoryAware { + + private static final Log LOG = LogFactory.getLog(IntegrationKeepAlive.class); + + private final AtomicBoolean running = new AtomicBoolean(); + + private BeanFactory beanFactory; + + private boolean autoStartup; + + private volatile Thread keepAliveThread; + + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + this.beanFactory = beanFactory; + } + + @Override + public void afterSingletonsInstantiated() { + IntegrationProperties integrationProperties = IntegrationContextUtils.getIntegrationProperties(this.beanFactory); + this.autoStartup = + integrationProperties.isKeepAlive() + && (isTaskSchedulerDaemon() || !isAbstractPollingEndpointPresent()); + } + + private boolean isTaskSchedulerDaemon() { + TaskScheduler taskScheduler = IntegrationContextUtils.getTaskScheduler(this.beanFactory); + AtomicBoolean isDaemon = new AtomicBoolean(); + CountDownLatch checkDaemonThreadLatch = new CountDownLatch(1); + taskScheduler.schedule(() -> { + isDaemon.set(Thread.currentThread().isDaemon()); + checkDaemonThreadLatch.countDown(); + }, Instant.now()); + + boolean logWarning = false; + try { + if (!checkDaemonThreadLatch.await(10, TimeUnit.SECONDS)) { + logWarning = true; + } + } + catch (InterruptedException ex) { + logWarning = true; + } + if (logWarning) { + LOG.warn("The 'IntegrationKeepAlive' cannot check a 'TaskScheduler' daemon threads status. " + + "Falling back to 'keep-alive'"); + } + return isDaemon.get(); + } + + private boolean isAbstractPollingEndpointPresent() { + return this.beanFactory.getBeanProvider(AbstractPollingEndpoint.class) + .stream() + .findAny() + .isPresent(); + } + + @Override + public boolean isAutoStartup() { + return this.autoStartup; + } + + @Override + public void start() { + if (this.running.compareAndSet(false, true)) { + this.keepAliveThread = + new Thread(() -> { + while (true) { + try { + Thread.sleep(Long.MAX_VALUE); + } + catch (InterruptedException ex) { + break; + } + } + }); + this.keepAliveThread.setDaemon(false); + this.keepAliveThread.setName("spring-integration-keep-alive"); + this.keepAliveThread.start(); + } + } + + @Override + public void stop() { + if (this.running.compareAndSet(true, false)) { + this.keepAliveThread.interrupt(); + } + } + + @Override + public boolean isRunning() { + return this.running.get(); + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/management/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/management/package-info.java index 694a5401751..d92f03293e5 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/management/package-info.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/management/package-info.java @@ -1,4 +1,6 @@ /** * Provides classes related to endpoint management. */ +@org.springframework.lang.NonNullApi +@org.springframework.lang.NonNullFields package org.springframework.integration.endpoint.management; diff --git a/spring-integration-core/src/main/resources/META-INF/spring.integration.default.properties b/spring-integration-core/src/main/resources/META-INF/spring.integration.default.properties index 573f1916e63..7534067d513 100644 --- a/spring-integration-core/src/main/resources/META-INF/spring.integration.default.properties +++ b/spring-integration-core/src/main/resources/META-INF/spring.integration.default.properties @@ -9,3 +9,4 @@ spring.integration.messagingTemplate.throwExceptionOnLateReply=false spring.integration.readOnly.headers= spring.integration.endpoints.noAutoStartup= spring.integration.endpoints.defaultTimeout=30000 +spring.integration.keepAlive=true diff --git a/spring-integration-core/src/test/java/org/springframework/integration/context/IntegrationContextTests.java b/spring-integration-core/src/test/java/org/springframework/integration/context/IntegrationContextTests.java index 46aab1d3446..74ea143e26b 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/context/IntegrationContextTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/context/IntegrationContextTests.java @@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.integration.endpoint.AbstractEndpoint; +import org.springframework.integration.endpoint.management.IntegrationKeepAlive; import org.springframework.integration.test.util.TestUtils; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.test.annotation.DirtiesContext; @@ -52,6 +53,9 @@ public class IntegrationContextTests { @Autowired private ThreadPoolTaskScheduler taskScheduler; + @Autowired + private IntegrationKeepAlive integrationKeepAlive; + @Test public void testIntegrationContextComponents() { assertThat(this.integrationProperties.isMessagingTemplateThrowExceptionOnLateReply()).isTrue(); @@ -62,6 +66,7 @@ public void testIntegrationContextComponents() { assertThat(this.serviceActivator.isRunning()).isFalse(); assertThat(this.serviceActivatorExplicit.isAutoStartup()).isTrue(); assertThat(this.serviceActivatorExplicit.isRunning()).isTrue(); + assertThat(this.integrationKeepAlive.isRunning()).isTrue(); } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/endpoint/IntegrationKeepAliveTests.java b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/IntegrationKeepAliveTests.java new file mode 100644 index 00000000000..f6b19c04385 --- /dev/null +++ b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/IntegrationKeepAliveTests.java @@ -0,0 +1,135 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.endpoint; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.integration.context.IntegrationProperties; +import org.springframework.integration.endpoint.management.IntegrationKeepAlive; +import org.springframework.integration.test.util.TestUtils; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.mock; + +/** + * @author Artem Bilan + * + * @since 6.4 + */ +@SpringJUnitConfig +@DirtiesContext +public class IntegrationKeepAliveTests { + + @Test + void keepAliveIsActive(@Autowired IntegrationKeepAlive integrationKeepAlive) { + assertThat(integrationKeepAlive.isRunning()).isTrue(); + Thread keepAliveThread = TestUtils.getPropertyValue(integrationKeepAlive, "keepAliveThread", Thread.class); + assertThat(keepAliveThread.isAlive()).isTrue(); + integrationKeepAlive.stop(); + await().untilAsserted(() -> assertThat(keepAliveThread.isAlive()).isFalse()); + integrationKeepAlive.start(); + } + + @Configuration + @EnableIntegration + public static class TestConfiguration { + + } + + @Nested + @ContextConfiguration(classes = WithPollingEndpoint.WithPollingEndpointConfig.class) + class WithPollingEndpoint { + + @Test + void keepAliveNotActive(@Autowired IntegrationKeepAlive integrationKeepAlive) { + assertThat(integrationKeepAlive.isRunning()).isFalse(); + } + + @Configuration + static class WithPollingEndpointConfig { + + @Bean + AbstractPollingEndpoint mockPollingEndpoint() { + return mock(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = WithDaemonTaskScheduler.WithDaemonTaskSchedulerConfig.class) + class WithDaemonTaskScheduler { + + @Test + void keepAliveActive(@Autowired IntegrationKeepAlive integrationKeepAlive) { + assertThat(integrationKeepAlive.isRunning()).isTrue(); + } + + @Configuration + static class WithDaemonTaskSchedulerConfig { + + @Bean + AbstractPollingEndpoint mockPollingEndpoint() { + return mock(); + } + + @Bean + String daemonSetter(ThreadPoolTaskScheduler taskScheduler) { + taskScheduler.setDaemon(true); + return null; + } + + } + + } + + @Nested + @ContextConfiguration(classes = WithGlobalProperty.WithGlobalPropertyConfig.class) + class WithGlobalProperty { + + @Test + void keepAliveNotActive(@Autowired IntegrationKeepAlive integrationKeepAlive) { + assertThat(integrationKeepAlive.isRunning()).isFalse(); + } + + @Configuration + static class WithGlobalPropertyConfig { + + @Bean(IntegrationContextUtils.INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME) + static IntegrationProperties integrationProperties() { + IntegrationProperties integrationProperties = new IntegrationProperties(); + integrationProperties.setKeepAlive(false); + return integrationProperties; + } + + } + + } + +} diff --git a/src/reference/antora/modules/ROOT/nav.adoc b/src/reference/antora/modules/ROOT/nav.adoc index 61e2b1e3279..16fde26c3d6 100644 --- a/src/reference/antora/modules/ROOT/nav.adoc +++ b/src/reference/antora/modules/ROOT/nav.adoc @@ -99,6 +99,7 @@ ** xref:shutdown.adoc[] ** xref:graph.adoc[] ** xref:integration-graph-controller.adoc[] +** xref:keep-alive.adoc[] * xref:reactive-streams.adoc[] * xref:native-aot.adoc[] * xref:endpoint-summary.adoc[] diff --git a/src/reference/antora/modules/ROOT/pages/configuration/global-properties.adoc b/src/reference/antora/modules/ROOT/pages/configuration/global-properties.adoc index e631b390f2b..adb62534efb 100644 --- a/src/reference/antora/modules/ROOT/pages/configuration/global-properties.adoc +++ b/src/reference/antora/modules/ROOT/pages/configuration/global-properties.adoc @@ -19,6 +19,7 @@ spring.integration.endpoints.noAutoStartup= <7> spring.integration.channels.error.requireSubscribers=true <8> spring.integration.channels.error.ignoreFailures=true <9> spring.integration.endpoints.defaultTimeout=30000 <10> +spring.integration.keepAlive=true <11> ---- <1> When true, `input-channel` instances are automatically declared as `DirectChannel` instances when not explicitly found in the application context. @@ -57,6 +58,11 @@ Since version 5.5. Default value is 30 seconds to avoid indefinite blocking. Can be configured to a negative value to restore infinite blocking behavior in endpoints. Since version 6.2. + +<11> Whether to start the `IntegrationKeepAlive`. +Default is `true`, however depends on the beans in the application context. +See xref:keep-alive.adoc[Keep Alive] for more information. +Since version 6.4. ==== These properties can be overridden by adding a `/META-INF/spring.integration.properties` file to the classpath or an `IntegrationContextUtils.INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME` bean for the `org.springframework.integration.context.IntegrationProperties` instance. @@ -77,5 +83,6 @@ spring.integration.channels.maxBroadcastSubscribers=0x7fffffff spring.integration.readOnly.headers= spring.integration.messagingTemplate.throwExceptionOnLateReply=true spring.integration.endpoints.defaultTimeout=30000 +spring.integration.keepAlive=false ---- diff --git a/src/reference/antora/modules/ROOT/pages/keep-alive.adoc b/src/reference/antora/modules/ROOT/pages/keep-alive.adoc new file mode 100644 index 00000000000..623a47b5c1a --- /dev/null +++ b/src/reference/antora/modules/ROOT/pages/keep-alive.adoc @@ -0,0 +1,17 @@ +[[keep-alive]] += Integration Keep Alive + +Starting with version 6.4, Spring Integration provides an `IntegrationKeepAlive` infrastructure bean. +It manages an `spring-integration-keep-alive` non-daemon forever thread which keeps an application running. +In some use-cases, e.g. `WebSocketInboundChannelAdapter` based on the `StandardWebSocketClient` does not use non-daemon thread for session, therefore an application may exit prematurely. +Or an application logic may have only service activators or outbound channel adapters which rely on some other interaction, but not loops from executors like the one from Web server. +Or all the threads in the application are virtual. + +The `IntegrationKeepAlive` is started automatically only if `TaskScheduler` is configured for non-daemon threads and there is no `AbstractPollingEndpoint` beans in the application context. +In case of the `TaskScheduler` bean configured for daemon or virtual threads, the `IntegrationKeepAlive` is started regardless of the presence for `AbstractPollingEndpoint` beans. + +This component can be disabled by the `spring.integration.keepAlive` global property. +See xref:configuration/global-properties.adoc[Global Properties] for more information. +The `IntegrationKeepAlive` can be injected in some service and stopped manually if there is no need to keep an application alive or such a status is managed somewhere else. + +See also Spring Boot https://docs.spring.io/spring-boot/reference/features/spring-application.html#features.spring-application.virtual-threads[Virtual Threads] documentation for a `spring.main.keep-alive` property. \ No newline at end of file diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index b5e22303c8c..7f1b9d2d81c 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -27,6 +27,10 @@ The SpEL evaluation infrastructure now supports configuration for `IndexAccessor Also, an out-of-the-box `JsonIndexAccessor` is provided. See xref:spel.adoc[SpEL Support] for more information. +The `IntegrationKeepAlive` component has been introduced. +See xref:keep-alive.adoc[Integration Keep Alive] for more information. + + [[x6.4-general]] === General Changes