Skip to content

Commit

Permalink
GH-9455: Introduce IntegrationKeepAlive (#9493)
Browse files Browse the repository at this point in the history
Fixes: #9455
Issue link: #9455

* Add an `IntegrationKeepAlive` infrastructure bean to initiate a long-lived non-daemon thread
to keep application alive when it cannot be kept like that for various reason, but has to.
* Expose `spring.integration.keepAlive` global property to disable an `IntegrationKeepAlive` auto-startup
* Test and document the feature
  • Loading branch information
artembilan authored Sep 25, 2024
1 parent 52e8174 commit 8f838d0
Show file tree
Hide file tree
Showing 12 changed files with 374 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.endpoint.management.IntegrationKeepAlive;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.support.IntegrationMessageHandlerMethodFactory;
import org.springframework.integration.json.JsonPathUtils;
Expand Down Expand Up @@ -129,6 +130,7 @@ public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) t
registerListMessageHandlerMethodFactory();
registerIntegrationConfigurationReport();
registerControlBusCommandRegistry();
registerKeepAlive();
}

@Override
Expand Down Expand Up @@ -460,4 +462,15 @@ private static BeanDefinitionBuilder createMessageHandlerMethodFactoryBeanDefini
IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME);
}

private void registerKeepAlive() {
if (!this.beanFactory.containsBean(IntegrationContextUtils.INTEGRATION_KEEP_ALIVE_BEAN_NAME)) {
BeanDefinitionBuilder builder =
BeanDefinitionBuilder.genericBeanDefinition(IntegrationKeepAlive.class)
.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);

this.registry.registerBeanDefinition(IntegrationContextUtils.INTEGRATION_KEEP_ALIVE_BEAN_NAME,
builder.getBeanDefinition());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,18 @@ public abstract class IntegrationContextUtils {

public static final String LIST_MESSAGE_HANDLER_FACTORY_BEAN_NAME = "integrationListMessageHandlerMethodFactory";

/**
* The bean name for the {@code org.springframework.integration.support.management.ControlBusCommandRegistry}.
* @since 6.4
*/
public static final String CONTROL_BUS_COMMAND_REGISTRY_BEAN_NAME = "controlBusCommandRegistry";

/**
* The bean name for the {@code org.springframework.integration.endpoint.management.IntegrationKeepAlive}.
* @since 6.4
*/
public static final String INTEGRATION_KEEP_ALIVE_BEAN_NAME = "integrationKeepAlive";

/**
* The default timeout for blocking operations like send and receive messages.
* @since 6.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

/**
* Utility class to encapsulate infrastructure Integration properties constants and their default values.
* The default values can be overridden by the {@code META-INF/spring.integration.properties} with this entries
* The default values can be overridden by the {@code META-INF/spring.integration.properties} with these entries
* (includes their default values):
* <ul>
* <li> {@code spring.integration.channels.autoCreate=true}
Expand All @@ -38,6 +38,7 @@
* <li> {@code spring.integration.channels.error.requireSubscribers=true}
* <li> {@code spring.integration.channels.error.ignoreFailures=true}
* <li> {@code spring.integration.endpoints.defaultTimeout=30000}
* <li> {@code spring.integration.keepAlive=true}
* </ul>
*
* @author Artem Bilan
Expand Down Expand Up @@ -117,6 +118,12 @@ public final class IntegrationProperties {
*/
public static final String ENDPOINTS_DEFAULT_TIMEOUT = INTEGRATION_PROPERTIES_PREFIX + "endpoints.defaultTimeout";

/**
* Set to {@code false} to fully disable Keep-Alive thread.
* @since 6.4
*/
public static final String KEEP_ALIVE = INTEGRATION_PROPERTIES_PREFIX + "keepAlive";

private static final Properties DEFAULTS;

private boolean channelsAutoCreate = true;
Expand All @@ -139,6 +146,8 @@ public final class IntegrationProperties {

private long endpointsDefaultTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;

private boolean keepAlive = true;

private volatile Properties properties;

static {
Expand Down Expand Up @@ -312,11 +321,30 @@ public long getEndpointsDefaultTimeout() {
/**
* Configure a value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
* @param endpointsDefaultTimeout the value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
* @since 6.2
*/
public void setEndpointsDefaultTimeout(long endpointsDefaultTimeout) {
this.endpointsDefaultTimeout = endpointsDefaultTimeout;
}

/**
* Return the value of {@link #KEEP_ALIVE} option.
* @return the value of {@link #KEEP_ALIVE} option.
* @since 6.4
*/
public boolean isKeepAlive() {
return this.keepAlive;
}

/**
* Configure a value for {@link #KEEP_ALIVE} option.
* Defaults {@code true} - set to {@code false} disable keep-alive thread.
* @param keepAlive {@code false} to disable keep-alive thread.
*/
public void setKeepAlive(boolean keepAlive) {
this.keepAlive = keepAlive;
}

/**
* Represent the current instance as a {@link Properties}.
* @return the {@link Properties} representation.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.endpoint.management;

import java.time.Instant;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.SmartLifecycle;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.endpoint.AbstractPollingEndpoint;
import org.springframework.scheduling.TaskScheduler;

/**
* The component to keep an application alive when there are no non-daemon threads.
* Some application might just not rely on the loops in specific threads for their logic.
* Or target protocol to integrate with communicates via daemon threads.
* <p>
* A bean for this class is registered automatically by Spring Integration infrastructure.
* It is started by application context for a blocked keep-alive dedicated thread
* only if there is no {@link AbstractPollingEndpoint} beans in the application context
* or {@link TaskScheduler} is configured for daemon (or virtual) threads.
* <p>
* Can be stopped (or started respectively) manually after injection into some target service if found redundant.
* <p>
* The {@link IntegrationProperties#KEEP_ALIVE} integration global
* property can be set to {@code false} to disable this component regardless of the application logic.
*
* @author Artem Bilan
*
* @since 6.4
*/
public class IntegrationKeepAlive implements SmartLifecycle, SmartInitializingSingleton, BeanFactoryAware {

private static final Log LOG = LogFactory.getLog(IntegrationKeepAlive.class);

private final AtomicBoolean running = new AtomicBoolean();

private BeanFactory beanFactory;

private boolean autoStartup;

private volatile Thread keepAliveThread;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}

@Override
public void afterSingletonsInstantiated() {
IntegrationProperties integrationProperties = IntegrationContextUtils.getIntegrationProperties(this.beanFactory);
this.autoStartup =
integrationProperties.isKeepAlive()
&& (isTaskSchedulerDaemon() || !isAbstractPollingEndpointPresent());
}

private boolean isTaskSchedulerDaemon() {
TaskScheduler taskScheduler = IntegrationContextUtils.getTaskScheduler(this.beanFactory);
AtomicBoolean isDaemon = new AtomicBoolean();
CountDownLatch checkDaemonThreadLatch = new CountDownLatch(1);
taskScheduler.schedule(() -> {
isDaemon.set(Thread.currentThread().isDaemon());
checkDaemonThreadLatch.countDown();
}, Instant.now());

boolean logWarning = false;
try {
if (!checkDaemonThreadLatch.await(10, TimeUnit.SECONDS)) {
logWarning = true;
}
}
catch (InterruptedException ex) {
logWarning = true;
}
if (logWarning) {
LOG.warn("The 'IntegrationKeepAlive' cannot check a 'TaskScheduler' daemon threads status. " +
"Falling back to 'keep-alive'");
}
return isDaemon.get();
}

private boolean isAbstractPollingEndpointPresent() {
return this.beanFactory.getBeanProvider(AbstractPollingEndpoint.class)
.stream()
.findAny()
.isPresent();
}

@Override
public boolean isAutoStartup() {
return this.autoStartup;
}

@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
this.keepAliveThread =
new Thread(() -> {
while (true) {
try {
Thread.sleep(Long.MAX_VALUE);
}
catch (InterruptedException ex) {
break;
}
}
});
this.keepAliveThread.setDaemon(false);
this.keepAliveThread.setName("spring-integration-keep-alive");
this.keepAliveThread.start();
}
}

@Override
public void stop() {
if (this.running.compareAndSet(true, false)) {
this.keepAliveThread.interrupt();
}
}

@Override
public boolean isRunning() {
return this.running.get();
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/**
* Provides classes related to endpoint management.
*/
@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
package org.springframework.integration.endpoint.management;
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ spring.integration.messagingTemplate.throwExceptionOnLateReply=false
spring.integration.readOnly.headers=
spring.integration.endpoints.noAutoStartup=
spring.integration.endpoints.defaultTimeout=30000
spring.integration.keepAlive=true
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.management.IntegrationKeepAlive;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.test.annotation.DirtiesContext;
Expand Down Expand Up @@ -52,6 +53,9 @@ public class IntegrationContextTests {
@Autowired
private ThreadPoolTaskScheduler taskScheduler;

@Autowired
private IntegrationKeepAlive integrationKeepAlive;

@Test
public void testIntegrationContextComponents() {
assertThat(this.integrationProperties.isMessagingTemplateThrowExceptionOnLateReply()).isTrue();
Expand All @@ -62,6 +66,7 @@ public void testIntegrationContextComponents() {
assertThat(this.serviceActivator.isRunning()).isFalse();
assertThat(this.serviceActivatorExplicit.isAutoStartup()).isTrue();
assertThat(this.serviceActivatorExplicit.isRunning()).isTrue();
assertThat(this.integrationKeepAlive.isRunning()).isTrue();
}

}
Loading

0 comments on commit 8f838d0

Please sign in to comment.