Skip to content

Commit

Permalink
Replace synchronized blocks with ReentrantLocks for virtual thread su…
Browse files Browse the repository at this point in the history
…pport

Fixes: #3652

Replace synchronized methods and blocks with ReentrantLocks in a few classes in Spring Kafka
to improve compatibility with virtual threads. This changes the synchronization mechanism in:

- KafkaListenerAnnotationBeanPostProcessor
- KafkaListenerEndpointRegistrar
- KafkaAdmin
- DefaultDestinationTopicResolver
- JsonDeserializer/JsonSerializer

The change helps avoid blocking virtual threads when using Spring Kafka in Project Loom
environments while maintaining thread safety.
  • Loading branch information
omercelikceng authored and sobychacko committed Dec 13, 2024
1 parent 71b571c commit b4b0f7b
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.regex.Pattern;
import java.util.stream.Stream;
Expand Down Expand Up @@ -144,6 +146,7 @@
* @author Wang Zhiyang
* @author Sanghyeok An
* @author Soby Chacko
* @author Omer Celik
*
* @see KafkaListener
* @see KafkaListenerErrorHandler
Expand Down Expand Up @@ -207,6 +210,8 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
@Nullable
private RetryTopicConfigurer retryTopicConfigurer;

private final Lock globalLock = new ReentrantLock();

@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
Expand Down Expand Up @@ -278,14 +283,20 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
* {@link #setEndpointRegistry endpoint registry} has to be explicitly configured.
* @param beanFactory the {@link BeanFactory} to be used.
*/
public synchronized void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableListableBeanFactory clbf) {
BeanExpressionResolver beanExpressionResolver = clbf.getBeanExpressionResolver();
if (beanExpressionResolver != null) {
this.resolver = beanExpressionResolver;
public void setBeanFactory(BeanFactory beanFactory) {
try {
this.globalLock.lock();
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableListableBeanFactory clbf) {
BeanExpressionResolver beanExpressionResolver = clbf.getBeanExpressionResolver();
if (beanExpressionResolver != null) {
this.resolver = beanExpressionResolver;
}
this.expressionContext = new BeanExpressionContext(clbf, this.listenerScope);
}
this.expressionContext = new BeanExpressionContext(clbf, this.listenerScope);
}
finally {
this.globalLock.unlock();
}
}

Expand Down Expand Up @@ -451,36 +462,48 @@ private KafkaListener enhance(AnnotatedElement element, KafkaListener ann) {
}
}

private synchronized void processMultiMethodListeners(Collection<KafkaListener> classLevelListeners,
private void processMultiMethodListeners(Collection<KafkaListener> classLevelListeners,
List<Method> multiMethods, Class<?> clazz, Object bean, String beanName) {

List<Method> checkedMethods = new ArrayList<>();
Method defaultMethod = null;
for (Method method : multiMethods) {
Method checked = checkProxy(method, bean);
KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
if (annotation != null && annotation.isDefault()) {
Method toAssert = defaultMethod;
Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
+ toAssert.toString() + " and " + method);
defaultMethod = checked;
try {
this.globalLock.lock();
List<Method> checkedMethods = new ArrayList<>();
Method defaultMethod = null;
for (Method method : multiMethods) {
Method checked = checkProxy(method, bean);
KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
if (annotation != null && annotation.isDefault()) {
Method toAssert = defaultMethod;
Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
+ toAssert.toString() + " and " + method);
defaultMethod = checked;
}
checkedMethods.add(checked);
}
for (KafkaListener classLevelListener : classLevelListeners) {
MultiMethodKafkaListenerEndpoint<K, V> endpoint =
new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
processMainAndRetryListeners(classLevelListener, bean, beanName, endpoint, null, clazz);
}
checkedMethods.add(checked);
}
for (KafkaListener classLevelListener : classLevelListeners) {
MultiMethodKafkaListenerEndpoint<K, V> endpoint =
new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
processMainAndRetryListeners(classLevelListener, bean, beanName, endpoint, null, clazz);
finally {
this.globalLock.unlock();
}
}

protected synchronized void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean,
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean,
String beanName) {

Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setMethod(methodToUse);
processMainAndRetryListeners(kafkaListener, bean, beanName, endpoint, methodToUse, null);
try {
this.globalLock.lock();
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setMethod(methodToUse);
processMainAndRetryListeners(kafkaListener, bean, beanName, endpoint, methodToUse, null);
}
finally {
this.globalLock.unlock();
}
}

private void processMainAndRetryListeners(KafkaListener kafkaListener, Object bean, String beanName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
Expand All @@ -40,6 +42,7 @@
* @author Gary Russell
* @author Filip Halemba
* @author Wang Zhiyang
* @author Omer Celik
*
* @see org.springframework.kafka.annotation.KafkaListenerConfigurer
*/
Expand All @@ -49,6 +52,8 @@ public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, Initial

private List<HandlerMethodArgumentResolver> customMethodArgumentResolvers = new ArrayList<>();

private final Lock endpointsLock = new ReentrantLock();

private KafkaListenerEndpointRegistry endpointRegistry;

private MessageHandlerMethodFactory messageHandlerMethodFactory;
Expand Down Expand Up @@ -188,7 +193,8 @@ public void afterPropertiesSet() {
}

protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
try {
this.endpointsLock.lock();
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> mmkle
&& this.validator != null) {
Expand All @@ -199,6 +205,9 @@ protected void registerAllEndpoints() {
}
this.startImmediately = true; // trigger immediate startup
}
finally {
this.endpointsLock.unlock();
}
}

private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListenerEndpointDescriptor descriptor) {
Expand Down Expand Up @@ -234,7 +243,8 @@ public void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaList
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
// Factory may be null, we defer the resolution right before actually creating the container
KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
try {
this.endpointsLock.lock();
if (this.startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
Expand All @@ -243,6 +253,9 @@ public void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaList
this.endpointDescriptors.add(descriptor);
}
}
finally {
this.endpointsLock.unlock();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
Expand Down Expand Up @@ -76,6 +79,8 @@
* @author Adrian Gygax
* @author Sanghyeok An
* @author Valentina Armenise
* @author Anders Swanson
* @author Omer Celik
*
* @since 1.3
*/
Expand All @@ -93,6 +98,8 @@ public class KafkaAdmin extends KafkaResourceFactory

private static final AtomicInteger CLIENT_ID_COUNTER = new AtomicInteger();

private final Lock clusterIdLock = new ReentrantLock();

private final Map<String, Object> configs;

private ApplicationContext applicationContext;
Expand Down Expand Up @@ -265,12 +272,7 @@ public final boolean initialize() {
}
if (adminClient != null) {
try {
synchronized (this) {
if (this.clusterId != null) {
this.clusterId = adminClient.describeCluster().clusterId().get(this.operationTimeout,
TimeUnit.SECONDS);
}
}
updateClusterId(adminClient);
addOrModifyTopicsIfNeeded(adminClient, newTopics);
return true;
}
Expand All @@ -295,6 +297,19 @@ public final boolean initialize() {
return false;
}

private void updateClusterId(Admin adminClient) throws InterruptedException, ExecutionException, TimeoutException {
try {
this.clusterIdLock.lock();
if (this.clusterId != null) {
this.clusterId = adminClient.describeCluster().clusterId().get(this.operationTimeout,
TimeUnit.SECONDS);
}
}
finally {
this.clusterIdLock.unlock();
}
}

/**
* Return a collection of {@link NewTopic}s to create or modify. The default
* implementation retrieves all {@link NewTopic} beans in the application context and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -49,6 +51,7 @@
* @author Gary Russell
* @author Yvette Quinby
* @author Adrian Chlebosz
* @author Omer Celik
* @since 2.7
*
*/
Expand All @@ -62,6 +65,8 @@ public class DefaultDestinationTopicResolver extends ExceptionClassifier

private final Map<String, Map<String, DestinationTopicHolder>> sourceDestinationsHolderMap;

private final Lock sourceDestinationsHolderLock = new ReentrantLock();

private final Clock clock;

private ApplicationContext applicationContext;
Expand Down Expand Up @@ -210,9 +215,13 @@ private DestinationTopicHolder getDestinationHolderFor(String mainListenerId, St
}

private DestinationTopicHolder getDestinationTopicSynchronized(String mainListenerId, String topic) {
synchronized (this.sourceDestinationsHolderMap) {
try {
this.sourceDestinationsHolderLock.lock();
return doGetDestinationFor(mainListenerId, topic);
}
finally {
this.sourceDestinationsHolderLock.unlock();
}
}

private DestinationTopicHolder doGetDestinationFor(String mainListenerId, String topic) {
Expand All @@ -229,11 +238,15 @@ public void addDestinationTopics(String mainListenerId, List<DestinationTopic> d
+ DefaultDestinationTopicResolver.class.getSimpleName() + " is already refreshed.");
}
validateDestinations(destinationsToAdd);
synchronized (this.sourceDestinationsHolderMap) {
try {
this.sourceDestinationsHolderLock.lock();
Map<String, DestinationTopicHolder> map = this.sourceDestinationsHolderMap.computeIfAbsent(mainListenerId,
id -> new HashMap<>());
map.putAll(correlatePairSourceAndDestinationValues(destinationsToAdd));
}
finally {
this.sourceDestinationsHolderLock.unlock();
}
}

private void validateDestinations(List<DestinationTopic> destinationsToAdd) {
Expand Down
Loading

0 comments on commit b4b0f7b

Please sign in to comment.