Skip to content

Commit

Permalink
Change "synchronized" to reentrant lock for virtual-threads
Browse files Browse the repository at this point in the history
Fix checkstyles before merge

Code cleanup

Double-Checked Locking Optimization was used to avoid unnecessary locking overhead.
  • Loading branch information
omercelikceng authored and olegz committed Oct 7, 2024
1 parent 7a5e5d0 commit cbfd3aa
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 101 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2019 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.ToDoubleFunction;

import io.micrometer.core.instrument.FunctionCounter;
Expand Down Expand Up @@ -52,6 +53,7 @@
* We will keep this class, as long as we support Boot 2.2.x.
*
* @author Soby Chacko
* @author Omer Celik
* @since 3.0.0
*/
public class KafkaStreamsBinderMetrics {
Expand Down Expand Up @@ -84,6 +86,8 @@ public class KafkaStreamsBinderMetrics {

private volatile Set<MetricName> currentMeters = new HashSet<>();

private static final ReentrantLock metricsLock = new ReentrantLock();

public KafkaStreamsBinderMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
Expand All @@ -108,9 +112,13 @@ public void bindTo(Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans) {
}

public void addMetrics(Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans) {
synchronized (KafkaStreamsBinderMetrics.this) {
try {
metricsLock.lock();
this.bindTo(streamsBuilderFactoryBeans);
}
finally {
metricsLock.unlock();
}
}

void prepareToBindMetrics(MeterRegistry registry, Map<MetricName, ? extends Metric> metrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.ToDoubleFunction;

import io.micrometer.core.instrument.Gauge;
Expand Down Expand Up @@ -68,6 +69,7 @@
* @author Tomek Szmytka
* @author Nico Heller
* @author Kurt Hong
* @author Omer Celik
*/
public class KafkaBinderMetrics
implements MeterBinder, ApplicationListener<BindingCreatedEvent>, AutoCloseable {
Expand Down Expand Up @@ -97,6 +99,8 @@ public class KafkaBinderMetrics

ScheduledExecutorService scheduler;

private final ReentrantLock consumerFactoryLock = new ReentrantLock();

public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties binderConfigurationProperties,
ConsumerFactory<?, ?> defaultConsumerFactory,
Expand Down Expand Up @@ -231,25 +235,36 @@ else if (beginningOffset != null) {
return lag;
}

private synchronized ConsumerFactory<?, ?> createConsumerFactory() {
/**
* Double-Checked Locking Optimization was used to avoid unnecessary locking overhead.
*/
private ConsumerFactory<?, ?> createConsumerFactory() {
if (this.defaultConsumerFactory == null) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
Map<String, Object> mergedConfig = this.binderConfigurationProperties
.mergedConsumerConfiguration();
if (!ObjectUtils.isEmpty(mergedConfig)) {
props.putAll(mergedConfig);
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.binderConfigurationProperties
try {
this.consumerFactoryLock.lock();
if (this.defaultConsumerFactory == null) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
Map<String, Object> mergedConfig = this.binderConfigurationProperties
.mergedConsumerConfiguration();
if (!ObjectUtils.isEmpty(mergedConfig)) {
props.putAll(mergedConfig);
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.binderConfigurationProperties
.getKafkaConnectionString());
}
this.defaultConsumerFactory = new DefaultKafkaConsumerFactory<>(
props);
}
}
finally {
this.consumerFactoryLock.unlock();
}
this.defaultConsumerFactory = new DefaultKafkaConsumerFactory<>(
props);
}
return this.defaultConsumerFactory;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -79,6 +80,7 @@
* @author Oleg Zhurakousky
* @author Michael Michailidis
* @author Byungjun You
* @author Omer Celik
*/
// @checkstyle:off
public class RabbitExchangeQueueProvisioner
Expand All @@ -105,6 +107,8 @@ public class RabbitExchangeQueueProvisioner

private final AtomicInteger producerExchangeBeanNameQualifier = new AtomicInteger();

private final ReentrantLock autoDeclareContextLock = new ReentrantLock();

public RabbitExchangeQueueProvisioner(ConnectionFactory connectionFactory) {
this(connectionFactory, Collections.emptyList());
}
Expand Down Expand Up @@ -320,11 +324,15 @@ private void provisionSuperStream(ExtendedConsumerProperties<RabbitConsumerPrope
(q, i) -> IntStream.range(0, i)
.mapToObj(j -> rk + "-" + j)
.collect(Collectors.toList()));
synchronized (this.autoDeclareContext) {
try {
this.autoDeclareContextLock.lock();
if (!this.autoDeclareContext.containsBean(name + ".superStream")) {
this.autoDeclareContext.getBeanFactory().registerSingleton(name + ".superStream", ss);
}
}
finally {
this.autoDeclareContextLock.unlock();
}
try {
ss.getDeclarables().forEach(dec -> {
if (dec instanceof Exchange exch) {
Expand Down Expand Up @@ -716,14 +724,18 @@ private Exchange customizeAndDeclare(Exchange exchange) {
}

private void addToAutoDeclareContext(String name, Declarable bean) {
synchronized (this.autoDeclareContext) {
try {
this.autoDeclareContextLock.lock();
if (!this.autoDeclareContext.containsBean(name)) {
this.autoDeclareContext.getBeanFactory().registerSingleton(name, new Declarables(bean));
}
else {
this.autoDeclareContext.getBean(name, Declarables.class).getDeclarables().add(bean);
}
}
finally {
this.autoDeclareContextLock.unlock();
}
}

private void declareBinding(String rootName, org.springframework.amqp.core.Binding bindingArg) {
Expand Down Expand Up @@ -756,31 +768,36 @@ private void declareBinding(String rootName, org.springframework.amqp.core.Bindi
public void cleanAutoDeclareContext(ConsumerDestination destination,
ExtendedConsumerProperties<RabbitConsumerProperties> consumerProperties) {

synchronized (this.autoDeclareContext) {
try {
this.autoDeclareContextLock.lock();
Stream.of(StringUtils.tokenizeToStringArray(destination.getName(), ",", true,
true)).forEach(name -> {
String group = null;
String bindingName = null;
if (destination instanceof RabbitConsumerDestination rabbitConsumerDestination) {
group = rabbitConsumerDestination.getGroup();
bindingName = rabbitConsumerDestination.getBindingName();
}
RabbitConsumerProperties properties = consumerProperties.getExtension();
String toRemove = properties.isQueueNameGroupOnly() ? bindingName + "." + group : name.trim();
boolean partitioned = consumerProperties.isPartitioned();
if (partitioned) {
toRemove = removePartitionPart(toRemove);
}
removeSingleton(toRemove + ".exchange");
removeQueueAndBindingBeans(properties, name.trim(), "", group, partitioned);
});
true)).forEach(name -> {
String group = null;
String bindingName = null;
if (destination instanceof RabbitConsumerDestination rabbitConsumerDestination) {
group = rabbitConsumerDestination.getGroup();
bindingName = rabbitConsumerDestination.getBindingName();
}
RabbitConsumerProperties properties = consumerProperties.getExtension();
String toRemove = properties.isQueueNameGroupOnly() ? bindingName + "." + group : name.trim();
boolean partitioned = consumerProperties.isPartitioned();
if (partitioned) {
toRemove = removePartitionPart(toRemove);
}
removeSingleton(toRemove + ".exchange");
removeQueueAndBindingBeans(properties, name.trim(), "", group, partitioned);
});
}
finally {
this.autoDeclareContextLock.unlock();
}
}

public void cleanAutoDeclareContext(ProducerDestination dest,
ExtendedProducerProperties<RabbitProducerProperties> properties) {

synchronized (this.autoDeclareContext) {
try {
this.autoDeclareContextLock.lock();
if (dest instanceof RabbitProducerDestination rabbitProducerDestination) {
String qual = rabbitProducerDestination.getBeanNameQualifier();
removeSingleton(dest.getName() + "." + qual + ".exchange");
Expand All @@ -790,13 +807,13 @@ public void cleanAutoDeclareContext(ProducerDestination dest,
if (properties.isPartitioned()) {
for (int i = 0; i < properties.getPartitionCount(); i++) {
removeQueueAndBindingBeans(properties.getExtension(),
properties.getExtension().isQueueNameGroupOnly() ? "" : dest.getName(),
group + "-" + i, group, true);
properties.getExtension().isQueueNameGroupOnly() ? "" : dest.getName(),
group + "-" + i, group, true);
}
}
else {
removeQueueAndBindingBeans(properties.getExtension(), dest.getName() + "." + group, "",
group, false);
group, false);
}
}
}
Expand All @@ -811,6 +828,9 @@ public void cleanAutoDeclareContext(ProducerDestination dest,
}
}
}
finally {
this.autoDeclareContextLock.unlock();
}
}

private void removeQueueAndBindingBeans(RabbitCommonProperties properties, String name, String suffix,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
Expand All @@ -43,6 +44,7 @@
* @author Ilayaperumal Gopinathan
* @author Oleg Zhurakousky
* @author Soby Chacko
* @author Omer Celik
*/
public class BindableProxyFactory extends AbstractBindableProxyFactory
implements MethodInterceptor, FactoryBean<Object>, InitializingBean, BeanFactoryAware {
Expand All @@ -55,21 +57,25 @@ public class BindableProxyFactory extends AbstractBindableProxyFactory

protected BeanFactory beanFactory;

private final ReentrantLock lock = new ReentrantLock();

public BindableProxyFactory(Class<?> type) {
super(type);
this.type = type;
}

@Override
public synchronized Object invoke(MethodInvocation invocation) {
Method method = invocation.getMethod();
public Object invoke(MethodInvocation invocation) {
try {
this.lock.lock();
Method method = invocation.getMethod();

// try to use cached target
Object boundTarget = this.targetCache.get(method);
if (boundTarget != null) {
return boundTarget;
// try to use cached target
return this.targetCache.get(method);
}
finally {
this.lock.unlock();
}
return null;
}

public void replaceInputChannel(String originalChannelName, String newChannelName, SubscribableChannel messageChannel) {
Expand Down Expand Up @@ -97,11 +103,22 @@ public void afterPropertiesSet() {
"'bindingTargetFactories' cannot be empty");
}

/**
* Double-Checked Locking Optimization was used to avoid unnecessary locking overhead.
*/
@Override
public synchronized Object getObject() {
public Object getObject() {
if (this.proxy == null && this.type != null) {
ProxyFactory factory = new ProxyFactory(this.type, this);
this.proxy = factory.getProxy();
try {
this.lock.lock();
if (this.proxy == null && this.type != null) {
ProxyFactory factory = new ProxyFactory(this.type, this);
this.proxy = factory.getProxy();
}
}
finally {
this.lock.unlock();
}
}
return this.proxy;
}
Expand Down
Loading

0 comments on commit cbfd3aa

Please sign in to comment.