diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index e4bca3a91e..28a087319c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -37,6 +37,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -144,6 +146,7 @@ * @author Wang Zhiyang * @author Sanghyeok An * @author Soby Chacko + * @author Omer Celik * * @see KafkaListener * @see KafkaListenerErrorHandler @@ -207,6 +210,8 @@ public class KafkaListenerAnnotationBeanPostProcessor @Nullable private RetryTopicConfigurer retryTopicConfigurer; + private final Lock globalLock = new ReentrantLock(); + @Override public int getOrder() { return LOWEST_PRECEDENCE; @@ -278,14 +283,20 @@ public void setApplicationContext(ApplicationContext applicationContext) throws * {@link #setEndpointRegistry endpoint registry} has to be explicitly configured. * @param beanFactory the {@link BeanFactory} to be used. */ - public synchronized void setBeanFactory(BeanFactory beanFactory) { - this.beanFactory = beanFactory; - if (beanFactory instanceof ConfigurableListableBeanFactory clbf) { - BeanExpressionResolver beanExpressionResolver = clbf.getBeanExpressionResolver(); - if (beanExpressionResolver != null) { - this.resolver = beanExpressionResolver; + public void setBeanFactory(BeanFactory beanFactory) { + try { + this.globalLock.lock(); + this.beanFactory = beanFactory; + if (beanFactory instanceof ConfigurableListableBeanFactory clbf) { + BeanExpressionResolver beanExpressionResolver = clbf.getBeanExpressionResolver(); + if (beanExpressionResolver != null) { + this.resolver = beanExpressionResolver; + } + this.expressionContext = new BeanExpressionContext(clbf, this.listenerScope); } - this.expressionContext = new BeanExpressionContext(clbf, this.listenerScope); + } + finally { + this.globalLock.unlock(); } } @@ -451,36 +462,48 @@ private KafkaListener enhance(AnnotatedElement element, KafkaListener ann) { } } - private synchronized void processMultiMethodListeners(Collection classLevelListeners, + private void processMultiMethodListeners(Collection classLevelListeners, List multiMethods, Class clazz, Object bean, String beanName) { - List checkedMethods = new ArrayList<>(); - Method defaultMethod = null; - for (Method method : multiMethods) { - Method checked = checkProxy(method, bean); - KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class); - if (annotation != null && annotation.isDefault()) { - Method toAssert = defaultMethod; - Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: " - + toAssert.toString() + " and " + method); - defaultMethod = checked; + try { + this.globalLock.lock(); + List checkedMethods = new ArrayList<>(); + Method defaultMethod = null; + for (Method method : multiMethods) { + Method checked = checkProxy(method, bean); + KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class); + if (annotation != null && annotation.isDefault()) { + Method toAssert = defaultMethod; + Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: " + + toAssert.toString() + " and " + method); + defaultMethod = checked; + } + checkedMethods.add(checked); + } + for (KafkaListener classLevelListener : classLevelListeners) { + MultiMethodKafkaListenerEndpoint endpoint = + new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean); + processMainAndRetryListeners(classLevelListener, bean, beanName, endpoint, null, clazz); } - checkedMethods.add(checked); } - for (KafkaListener classLevelListener : classLevelListeners) { - MultiMethodKafkaListenerEndpoint endpoint = - new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean); - processMainAndRetryListeners(classLevelListener, bean, beanName, endpoint, null, clazz); + finally { + this.globalLock.unlock(); } } - protected synchronized void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, + protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) { - Method methodToUse = checkProxy(method, bean); - MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>(); - endpoint.setMethod(methodToUse); - processMainAndRetryListeners(kafkaListener, bean, beanName, endpoint, methodToUse, null); + try { + this.globalLock.lock(); + Method methodToUse = checkProxy(method, bean); + MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>(); + endpoint.setMethod(methodToUse); + processMainAndRetryListeners(kafkaListener, bean, beanName, endpoint, methodToUse, null); + } + finally { + this.globalLock.unlock(); + } } private void processMainAndRetryListeners(KafkaListener kafkaListener, Object bean, String beanName, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java index 7e1e8ac8ab..af3f6a6bbc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java @@ -20,6 +20,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; @@ -40,6 +42,7 @@ * @author Gary Russell * @author Filip Halemba * @author Wang Zhiyang + * @author Omer Celik * * @see org.springframework.kafka.annotation.KafkaListenerConfigurer */ @@ -49,6 +52,8 @@ public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, Initial private List customMethodArgumentResolvers = new ArrayList<>(); + private final Lock endpointsLock = new ReentrantLock(); + private KafkaListenerEndpointRegistry endpointRegistry; private MessageHandlerMethodFactory messageHandlerMethodFactory; @@ -188,7 +193,8 @@ public void afterPropertiesSet() { } protected void registerAllEndpoints() { - synchronized (this.endpointDescriptors) { + try { + this.endpointsLock.lock(); for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) { if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint mmkle && this.validator != null) { @@ -199,6 +205,9 @@ protected void registerAllEndpoints() { } this.startImmediately = true; // trigger immediate startup } + finally { + this.endpointsLock.unlock(); + } } private KafkaListenerContainerFactory resolveContainerFactory(KafkaListenerEndpointDescriptor descriptor) { @@ -234,7 +243,8 @@ public void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaList Assert.hasText(endpoint.getId(), "Endpoint id must be set"); // Factory may be null, we defer the resolution right before actually creating the container KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory); - synchronized (this.endpointDescriptors) { + try { + this.endpointsLock.lock(); if (this.startImmediately) { // Register and start immediately this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor), true); @@ -243,6 +253,9 @@ public void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaList this.endpointDescriptors.add(descriptor); } } + finally { + this.endpointsLock.unlock(); + } } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java index 6e1ffc8ec2..44043b49a3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java @@ -32,10 +32,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; @@ -76,6 +79,8 @@ * @author Adrian Gygax * @author Sanghyeok An * @author Valentina Armenise + * @author Anders Swanson + * @author Omer Celik * * @since 1.3 */ @@ -93,6 +98,8 @@ public class KafkaAdmin extends KafkaResourceFactory private static final AtomicInteger CLIENT_ID_COUNTER = new AtomicInteger(); + private final Lock clusterIdLock = new ReentrantLock(); + private final Map configs; private ApplicationContext applicationContext; @@ -265,12 +272,7 @@ public final boolean initialize() { } if (adminClient != null) { try { - synchronized (this) { - if (this.clusterId != null) { - this.clusterId = adminClient.describeCluster().clusterId().get(this.operationTimeout, - TimeUnit.SECONDS); - } - } + updateClusterId(adminClient); addOrModifyTopicsIfNeeded(adminClient, newTopics); return true; } @@ -295,6 +297,19 @@ public final boolean initialize() { return false; } + private void updateClusterId(Admin adminClient) throws InterruptedException, ExecutionException, TimeoutException { + try { + this.clusterIdLock.lock(); + if (this.clusterId != null) { + this.clusterId = adminClient.describeCluster().clusterId().get(this.operationTimeout, + TimeUnit.SECONDS); + } + } + finally { + this.clusterIdLock.unlock(); + } + } + /** * Return a collection of {@link NewTopic}s to create or modify. The default * implementation retrieves all {@link NewTopic} beans in the application context and diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index 3f8fc01c2f..e2260613ba 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -49,6 +51,7 @@ * @author Gary Russell * @author Yvette Quinby * @author Adrian Chlebosz + * @author Omer Celik * @since 2.7 * */ @@ -62,6 +65,8 @@ public class DefaultDestinationTopicResolver extends ExceptionClassifier private final Map> sourceDestinationsHolderMap; + private final Lock sourceDestinationsHolderLock = new ReentrantLock(); + private final Clock clock; private ApplicationContext applicationContext; @@ -210,9 +215,13 @@ private DestinationTopicHolder getDestinationHolderFor(String mainListenerId, St } private DestinationTopicHolder getDestinationTopicSynchronized(String mainListenerId, String topic) { - synchronized (this.sourceDestinationsHolderMap) { + try { + this.sourceDestinationsHolderLock.lock(); return doGetDestinationFor(mainListenerId, topic); } + finally { + this.sourceDestinationsHolderLock.unlock(); + } } private DestinationTopicHolder doGetDestinationFor(String mainListenerId, String topic) { @@ -229,11 +238,15 @@ public void addDestinationTopics(String mainListenerId, List d + DefaultDestinationTopicResolver.class.getSimpleName() + " is already refreshed."); } validateDestinations(destinationsToAdd); - synchronized (this.sourceDestinationsHolderMap) { + try { + this.sourceDestinationsHolderLock.lock(); Map map = this.sourceDestinationsHolderMap.computeIfAbsent(mainListenerId, id -> new HashMap<>()); map.putAll(correlatePairSourceAndDestinationValues(destinationsToAdd)); } + finally { + this.sourceDestinationsHolderLock.unlock(); + } } private void validateDestinations(List destinationsToAdd) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonDeserializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonDeserializer.java index 243c658459..11688f41a5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonDeserializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonDeserializer.java @@ -25,6 +25,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import com.fasterxml.jackson.core.type.TypeReference; @@ -64,6 +66,7 @@ * @author Elliot Kennedy * @author Torsten Schleede * @author Ivan Ponomarev + * @author Omer Celik */ public class JsonDeserializer implements Deserializer { @@ -144,6 +147,8 @@ public class JsonDeserializer implements Deserializer { private boolean configured; + private final Lock trustedPackagesLock = new ReentrantLock(); + /** * Construct an instance with a default {@link ObjectMapper}. */ @@ -397,29 +402,35 @@ public void setTypeResolver(JsonTypeResolver typeResolver) { } @Override - public synchronized void configure(Map configs, boolean isKey) { - if (this.configured) { - return; - } - Assert.state(!this.setterCalled || !configsHasOurKeys(configs), - "JsonDeserializer must be configured with property setters, or via configuration properties; not both"); - doSetUseTypeMapperForKey(isKey); - setUpTypePrecedence(configs); - setupTarget(configs, isKey); - if (configs.containsKey(TRUSTED_PACKAGES) - && configs.get(TRUSTED_PACKAGES) instanceof String) { - this.typeMapper.addTrustedPackages( - StringUtils.delimitedListToStringArray((String) configs.get(TRUSTED_PACKAGES), ",", " \r\n\f\t")); - } - if (configs.containsKey(TYPE_MAPPINGS) && !this.typeMapperExplicitlySet - && this.typeMapper instanceof AbstractJavaTypeMapper) { - ((AbstractJavaTypeMapper) this.typeMapper).setIdClassMapping(createMappings(configs)); + public void configure(Map configs, boolean isKey) { + try { + this.trustedPackagesLock.lock(); + if (this.configured) { + return; + } + Assert.state(!this.setterCalled || !configsHasOurKeys(configs), + "JsonDeserializer must be configured with property setters, or via configuration properties; not both"); + doSetUseTypeMapperForKey(isKey); + setUpTypePrecedence(configs); + setupTarget(configs, isKey); + if (configs.containsKey(TRUSTED_PACKAGES) + && configs.get(TRUSTED_PACKAGES) instanceof String) { + this.typeMapper.addTrustedPackages( + StringUtils.delimitedListToStringArray((String) configs.get(TRUSTED_PACKAGES), ",", " \r\n\f\t")); + } + if (configs.containsKey(TYPE_MAPPINGS) && !this.typeMapperExplicitlySet + && this.typeMapper instanceof AbstractJavaTypeMapper) { + ((AbstractJavaTypeMapper) this.typeMapper).setIdClassMapping(createMappings(configs)); + } + if (configs.containsKey(REMOVE_TYPE_INFO_HEADERS)) { + this.removeTypeHeaders = Boolean.parseBoolean(configs.get(REMOVE_TYPE_INFO_HEADERS).toString()); + } + setUpTypeMethod(configs, isKey); + this.configured = true; } - if (configs.containsKey(REMOVE_TYPE_INFO_HEADERS)) { - this.removeTypeHeaders = Boolean.parseBoolean(configs.get(REMOVE_TYPE_INFO_HEADERS).toString()); + finally { + this.trustedPackagesLock.unlock(); } - setUpTypeMethod(configs, isKey); - this.configured = true; } private boolean configsHasOurKeys(Map configs) { @@ -522,9 +533,15 @@ else if (configs.get(key) instanceof String) { * @param packages the packages. * @since 2.1 */ - public synchronized void addTrustedPackages(String... packages) { - doAddTrustedPackages(packages); - this.setterCalled = true; + public void addTrustedPackages(String... packages) { + try { + this.trustedPackagesLock.lock(); + doAddTrustedPackages(packages); + this.setterCalled = true; + } + finally { + this.trustedPackagesLock.unlock(); + } } private void addMappingsToTrusted(Map> mappings) { @@ -704,10 +721,16 @@ public JsonDeserializer typeMapper(Jackson2JavaTypeMapper mapper) { * @return the deserializer. * @since 2,5 */ - public synchronized JsonDeserializer trustedPackages(String... packages) { - Assert.isTrue(!this.typeMapperExplicitlySet, "When using a custom type mapper, set the trusted packages there"); - this.typeMapper.addTrustedPackages(packages); - return this; + public JsonDeserializer trustedPackages(String... packages) { + try { + this.trustedPackagesLock.lock(); + Assert.isTrue(!this.typeMapperExplicitlySet, "When using a custom type mapper, set the trusted packages there"); + this.typeMapper.addTrustedPackages(packages); + return this; + } + finally { + this.trustedPackagesLock.unlock(); + } } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java index 83e49be1f3..0787bc9a00 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JavaType; @@ -52,6 +54,7 @@ * @author Gary Russell * @author Elliot Kennedy * @author Wang Zhiyang + * @author Omer Celik */ public class JsonSerializer implements Serializer { @@ -80,6 +83,8 @@ public class JsonSerializer implements Serializer { private boolean configured; + private final Lock globalLock = new ReentrantLock(); + public JsonSerializer() { this((JavaType) null, JacksonUtils.enhancedObjectMapper()); } @@ -146,31 +151,37 @@ public void setUseTypeMapperForKey(boolean isKey) { } @Override - public synchronized void configure(Map configs, boolean isKey) { - if (this.configured) { - return; - } - Assert.state(!this.setterCalled - || (!configs.containsKey(ADD_TYPE_INFO_HEADERS) && !configs.containsKey(TYPE_MAPPINGS)), - "JsonSerializer must be configured with property setters, or via configuration properties; not both"); - setUseTypeMapperForKey(isKey); - if (configs.containsKey(ADD_TYPE_INFO_HEADERS)) { - Object config = configs.get(ADD_TYPE_INFO_HEADERS); - if (config instanceof Boolean configBoolean) { - this.addTypeInfo = configBoolean; + public void configure(Map configs, boolean isKey) { + try { + this.globalLock.lock(); + if (this.configured) { + return; } - else if (config instanceof String configString) { - this.addTypeInfo = Boolean.parseBoolean(configString); + Assert.state(!this.setterCalled + || (!configs.containsKey(ADD_TYPE_INFO_HEADERS) && !configs.containsKey(TYPE_MAPPINGS)), + "JsonSerializer must be configured with property setters, or via configuration properties; not both"); + setUseTypeMapperForKey(isKey); + if (configs.containsKey(ADD_TYPE_INFO_HEADERS)) { + Object config = configs.get(ADD_TYPE_INFO_HEADERS); + if (config instanceof Boolean configBoolean) { + this.addTypeInfo = configBoolean; + } + else if (config instanceof String configString) { + this.addTypeInfo = Boolean.parseBoolean(configString); + } + else { + throw new IllegalStateException(ADD_TYPE_INFO_HEADERS + " must be Boolean or String"); + } } - else { - throw new IllegalStateException(ADD_TYPE_INFO_HEADERS + " must be Boolean or String"); + if (configs.containsKey(TYPE_MAPPINGS) && !this.typeMapperExplicitlySet + && this.typeMapper instanceof AbstractJavaTypeMapper abstractJavaTypeMapper) { + abstractJavaTypeMapper.setIdClassMapping(createMappings((String) configs.get(TYPE_MAPPINGS))); } + this.configured = true; } - if (configs.containsKey(TYPE_MAPPINGS) && !this.typeMapperExplicitlySet - && this.typeMapper instanceof AbstractJavaTypeMapper abstractJavaTypeMapper) { - abstractJavaTypeMapper.setIdClassMapping(createMappings((String) configs.get(TYPE_MAPPINGS))); + finally { + this.globalLock.unlock(); } - this.configured = true; } protected static Map> createMappings(String mappings) {