From cbfd3aa9953ea0dbef98272c2ab53380857ffd77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20=C3=87elik?= Date: Fri, 4 Oct 2024 23:03:56 +0300 Subject: [PATCH] Change "synchronized" to reentrant lock for virtual-threads Fix checkstyles before merge Code cleanup Double-Checked Locking Optimization was used to avoid unnecessary locking overhead. --- .../streams/KafkaStreamsBinderMetrics.java | 12 +++- .../binder/kafka/KafkaBinderMetrics.java | 47 ++++++++----- .../RabbitExchangeQueueProvisioner.java | 68 ++++++++++++------- .../stream/binding/BindableProxyFactory.java | 39 ++++++++--- .../cloud/stream/binding/BindingService.java | 46 +++++++++---- .../binding/DynamicDestinationsBindable.java | 40 ++++++++--- .../controllers/ServerController.java | 56 ++++++++------- 7 files changed, 207 insertions(+), 101 deletions(-) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderMetrics.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderMetrics.java index 57cc367012..51686802ef 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderMetrics.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderMetrics.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2019 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.ToDoubleFunction; import io.micrometer.core.instrument.FunctionCounter; @@ -52,6 +53,7 @@ * We will keep this class, as long as we support Boot 2.2.x. * * @author Soby Chacko + * @author Omer Celik * @since 3.0.0 */ public class KafkaStreamsBinderMetrics { @@ -84,6 +86,8 @@ public class KafkaStreamsBinderMetrics { private volatile Set currentMeters = new HashSet<>(); + private static final ReentrantLock metricsLock = new ReentrantLock(); + public KafkaStreamsBinderMetrics(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; } @@ -108,9 +112,13 @@ public void bindTo(Set streamsBuilderFactoryBeans) { } public void addMetrics(Set streamsBuilderFactoryBeans) { - synchronized (KafkaStreamsBinderMetrics.this) { + try { + metricsLock.lock(); this.bindTo(streamsBuilderFactoryBeans); } + finally { + metricsLock.unlock(); + } } void prepareToBindMetrics(MeterRegistry registry, Map metrics) { diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java index 1ccb2bcf15..c565634929 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java @@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.ToDoubleFunction; import io.micrometer.core.instrument.Gauge; @@ -68,6 +69,7 @@ * @author Tomek Szmytka * @author Nico Heller * @author Kurt Hong + * @author Omer Celik */ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener, AutoCloseable { @@ -97,6 +99,8 @@ public class KafkaBinderMetrics ScheduledExecutorService scheduler; + private final ReentrantLock consumerFactoryLock = new ReentrantLock(); + public KafkaBinderMetrics(KafkaMessageChannelBinder binder, KafkaBinderConfigurationProperties binderConfigurationProperties, ConsumerFactory defaultConsumerFactory, @@ -231,25 +235,36 @@ else if (beginningOffset != null) { return lag; } - private synchronized ConsumerFactory createConsumerFactory() { + /** + * Double-Checked Locking Optimization was used to avoid unnecessary locking overhead. + */ + private ConsumerFactory createConsumerFactory() { if (this.defaultConsumerFactory == null) { - Map props = new HashMap<>(); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - ByteArrayDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - ByteArrayDeserializer.class); - Map mergedConfig = this.binderConfigurationProperties - .mergedConsumerConfiguration(); - if (!ObjectUtils.isEmpty(mergedConfig)) { - props.putAll(mergedConfig); - } - if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.binderConfigurationProperties + try { + this.consumerFactoryLock.lock(); + if (this.defaultConsumerFactory == null) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + ByteArrayDeserializer.class); + Map mergedConfig = this.binderConfigurationProperties + .mergedConsumerConfiguration(); + if (!ObjectUtils.isEmpty(mergedConfig)) { + props.putAll(mergedConfig); + } + if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.binderConfigurationProperties .getKafkaConnectionString()); + } + this.defaultConsumerFactory = new DefaultKafkaConsumerFactory<>( + props); + } + } + finally { + this.consumerFactoryLock.unlock(); } - this.defaultConsumerFactory = new DefaultKafkaConsumerFactory<>( - props); } return this.defaultConsumerFactory; } diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java index 359a75d6a3..ba771ce88e 100644 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -79,6 +80,7 @@ * @author Oleg Zhurakousky * @author Michael Michailidis * @author Byungjun You + * @author Omer Celik */ // @checkstyle:off public class RabbitExchangeQueueProvisioner @@ -105,6 +107,8 @@ public class RabbitExchangeQueueProvisioner private final AtomicInteger producerExchangeBeanNameQualifier = new AtomicInteger(); + private final ReentrantLock autoDeclareContextLock = new ReentrantLock(); + public RabbitExchangeQueueProvisioner(ConnectionFactory connectionFactory) { this(connectionFactory, Collections.emptyList()); } @@ -320,11 +324,15 @@ private void provisionSuperStream(ExtendedConsumerProperties IntStream.range(0, i) .mapToObj(j -> rk + "-" + j) .collect(Collectors.toList())); - synchronized (this.autoDeclareContext) { + try { + this.autoDeclareContextLock.lock(); if (!this.autoDeclareContext.containsBean(name + ".superStream")) { this.autoDeclareContext.getBeanFactory().registerSingleton(name + ".superStream", ss); } } + finally { + this.autoDeclareContextLock.unlock(); + } try { ss.getDeclarables().forEach(dec -> { if (dec instanceof Exchange exch) { @@ -716,7 +724,8 @@ private Exchange customizeAndDeclare(Exchange exchange) { } private void addToAutoDeclareContext(String name, Declarable bean) { - synchronized (this.autoDeclareContext) { + try { + this.autoDeclareContextLock.lock(); if (!this.autoDeclareContext.containsBean(name)) { this.autoDeclareContext.getBeanFactory().registerSingleton(name, new Declarables(bean)); } @@ -724,6 +733,9 @@ private void addToAutoDeclareContext(String name, Declarable bean) { this.autoDeclareContext.getBean(name, Declarables.class).getDeclarables().add(bean); } } + finally { + this.autoDeclareContextLock.unlock(); + } } private void declareBinding(String rootName, org.springframework.amqp.core.Binding bindingArg) { @@ -756,31 +768,36 @@ private void declareBinding(String rootName, org.springframework.amqp.core.Bindi public void cleanAutoDeclareContext(ConsumerDestination destination, ExtendedConsumerProperties consumerProperties) { - synchronized (this.autoDeclareContext) { + try { + this.autoDeclareContextLock.lock(); Stream.of(StringUtils.tokenizeToStringArray(destination.getName(), ",", true, - true)).forEach(name -> { - String group = null; - String bindingName = null; - if (destination instanceof RabbitConsumerDestination rabbitConsumerDestination) { - group = rabbitConsumerDestination.getGroup(); - bindingName = rabbitConsumerDestination.getBindingName(); - } - RabbitConsumerProperties properties = consumerProperties.getExtension(); - String toRemove = properties.isQueueNameGroupOnly() ? bindingName + "." + group : name.trim(); - boolean partitioned = consumerProperties.isPartitioned(); - if (partitioned) { - toRemove = removePartitionPart(toRemove); - } - removeSingleton(toRemove + ".exchange"); - removeQueueAndBindingBeans(properties, name.trim(), "", group, partitioned); - }); + true)).forEach(name -> { + String group = null; + String bindingName = null; + if (destination instanceof RabbitConsumerDestination rabbitConsumerDestination) { + group = rabbitConsumerDestination.getGroup(); + bindingName = rabbitConsumerDestination.getBindingName(); + } + RabbitConsumerProperties properties = consumerProperties.getExtension(); + String toRemove = properties.isQueueNameGroupOnly() ? bindingName + "." + group : name.trim(); + boolean partitioned = consumerProperties.isPartitioned(); + if (partitioned) { + toRemove = removePartitionPart(toRemove); + } + removeSingleton(toRemove + ".exchange"); + removeQueueAndBindingBeans(properties, name.trim(), "", group, partitioned); + }); + } + finally { + this.autoDeclareContextLock.unlock(); } } public void cleanAutoDeclareContext(ProducerDestination dest, ExtendedProducerProperties properties) { - synchronized (this.autoDeclareContext) { + try { + this.autoDeclareContextLock.lock(); if (dest instanceof RabbitProducerDestination rabbitProducerDestination) { String qual = rabbitProducerDestination.getBeanNameQualifier(); removeSingleton(dest.getName() + "." + qual + ".exchange"); @@ -790,13 +807,13 @@ public void cleanAutoDeclareContext(ProducerDestination dest, if (properties.isPartitioned()) { for (int i = 0; i < properties.getPartitionCount(); i++) { removeQueueAndBindingBeans(properties.getExtension(), - properties.getExtension().isQueueNameGroupOnly() ? "" : dest.getName(), - group + "-" + i, group, true); + properties.getExtension().isQueueNameGroupOnly() ? "" : dest.getName(), + group + "-" + i, group, true); } } else { removeQueueAndBindingBeans(properties.getExtension(), dest.getName() + "." + group, "", - group, false); + group, false); } } } @@ -811,6 +828,9 @@ public void cleanAutoDeclareContext(ProducerDestination dest, } } } + finally { + this.autoDeclareContextLock.unlock(); + } } private void removeQueueAndBindingBeans(RabbitCommonProperties properties, String name, String suffix, diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindableProxyFactory.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindableProxyFactory.java index d6b6958e3a..8284bb54ed 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindableProxyFactory.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindableProxyFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; @@ -43,6 +44,7 @@ * @author Ilayaperumal Gopinathan * @author Oleg Zhurakousky * @author Soby Chacko + * @author Omer Celik */ public class BindableProxyFactory extends AbstractBindableProxyFactory implements MethodInterceptor, FactoryBean, InitializingBean, BeanFactoryAware { @@ -55,21 +57,25 @@ public class BindableProxyFactory extends AbstractBindableProxyFactory protected BeanFactory beanFactory; + private final ReentrantLock lock = new ReentrantLock(); + public BindableProxyFactory(Class type) { super(type); this.type = type; } @Override - public synchronized Object invoke(MethodInvocation invocation) { - Method method = invocation.getMethod(); + public Object invoke(MethodInvocation invocation) { + try { + this.lock.lock(); + Method method = invocation.getMethod(); - // try to use cached target - Object boundTarget = this.targetCache.get(method); - if (boundTarget != null) { - return boundTarget; + // try to use cached target + return this.targetCache.get(method); + } + finally { + this.lock.unlock(); } - return null; } public void replaceInputChannel(String originalChannelName, String newChannelName, SubscribableChannel messageChannel) { @@ -97,11 +103,22 @@ public void afterPropertiesSet() { "'bindingTargetFactories' cannot be empty"); } + /** + * Double-Checked Locking Optimization was used to avoid unnecessary locking overhead. + */ @Override - public synchronized Object getObject() { + public Object getObject() { if (this.proxy == null && this.type != null) { - ProxyFactory factory = new ProxyFactory(this.type, this); - this.proxy = factory.getProxy(); + try { + this.lock.lock(); + if (this.proxy == null && this.type != null) { + ProxyFactory factory = new ProxyFactory(this.type, this); + this.proxy = factory.getProxy(); + } + } + finally { + this.lock.unlock(); + } } return this.proxy; } diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingService.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingService.java index 246ebd1631..1db90cde57 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingService.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingService.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2023 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; import com.fasterxml.jackson.databind.ObjectMapper; @@ -63,6 +64,7 @@ * @author Chris Bono * @author Artem Bilan * @author Byungjun You + * @author Omer Celik */ public class BindingService { @@ -451,35 +453,49 @@ public static class LateBinding implements Binding { private final String bindingName; - private final Object consumerOrProducerproperties; + private final Object consumerOrProducerProperties; private final boolean isInput; final ObjectMapper objectMapper; - LateBinding(String bindingName, String error, Object consumerOrProducerproperties, boolean isInput, ObjectMapper objectMapper) { + private final ReentrantLock lock = new ReentrantLock(); + + LateBinding(String bindingName, String error, Object consumerOrProducerProperties, boolean isInput, ObjectMapper objectMapper) { super(); this.error = error; this.bindingName = bindingName; - this.consumerOrProducerproperties = consumerOrProducerproperties; + this.consumerOrProducerProperties = consumerOrProducerProperties; this.isInput = isInput; this.objectMapper = objectMapper; } - public synchronized void setDelegate(Binding delegate) { - if (this.unbound) { - delegate.unbind(); + public void setDelegate(Binding delegate) { + try { + this.lock.lock(); + if (this.unbound) { + delegate.unbind(); + } + else { + this.delegate = delegate; + } } - else { - this.delegate = delegate; + finally { + this.lock.unlock(); } } @Override - public synchronized void unbind() { - this.unbound = true; - if (this.delegate != null) { - this.delegate.unbind(); + public void unbind() { + try { + this.lock.lock(); + this.unbound = true; + if (this.delegate != null) { + this.delegate.unbind(); + } + } + finally { + this.lock.unlock(); } } @@ -507,8 +523,8 @@ public String toString() { public Map getExtendedInfo() { Map extendedInfo = new LinkedHashMap<>(); extendedInfo.put("bindingDestination", this.getBindingName()); - extendedInfo.put(consumerOrProducerproperties.getClass().getSimpleName(), - this.objectMapper.convertValue(consumerOrProducerproperties, Map.class)); + extendedInfo.put(consumerOrProducerProperties.getClass().getSimpleName(), + this.objectMapper.convertValue(consumerOrProducerProperties, Map.class)); return extendedInfo; } diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/DynamicDestinationsBindable.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/DynamicDestinationsBindable.java index b0658ab464..159ad22704 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/DynamicDestinationsBindable.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/DynamicDestinationsBindable.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; import org.springframework.cloud.stream.binder.Binding; @@ -29,6 +30,7 @@ * * @author Ilayaperumal Gopinathan * @author Oleg Zhurakousky + * @author Omer Celik */ public final class DynamicDestinationsBindable implements Bindable { @@ -37,21 +39,41 @@ public final class DynamicDestinationsBindable implements Bindable { */ private final Map> outputBindings = new HashMap<>(); - public synchronized void addOutputBinding(String name, Binding binding) { - this.outputBindings.put(name, binding); + private static final ReentrantLock outputBindingsLock = new ReentrantLock(); + + public void addOutputBinding(String name, Binding binding) { + try { + outputBindingsLock.lock(); + this.outputBindings.put(name, binding); + } + finally { + outputBindingsLock.unlock(); + } } @Override - public synchronized Set getOutputs() { - return Collections.unmodifiableSet(this.outputBindings.keySet()); + public Set getOutputs() { + try { + outputBindingsLock.lock(); + return Collections.unmodifiableSet(this.outputBindings.keySet()); + } + finally { + outputBindingsLock.unlock(); + } } @Override - public synchronized void unbindOutputs(BindingService adapter) { - for (Map.Entry> entry : this.outputBindings.entrySet()) { - entry.getValue().unbind(); + public void unbindOutputs(BindingService adapter) { + try { + outputBindingsLock.lock(); + for (Map.Entry> entry : this.outputBindings.entrySet()) { + entry.getValue().unbind(); + } + this.outputBindings.clear(); + } + finally { + outputBindingsLock.unlock(); } - this.outputBindings.clear(); } } diff --git a/schema-registry/spring-cloud-stream-schema-registry-core/src/main/java/org/springframework/cloud/stream/schema/registry/controllers/ServerController.java b/schema-registry/spring-cloud-stream-schema-registry-core/src/main/java/org/springframework/cloud/stream/schema/registry/controllers/ServerController.java index 579e2f106b..83963b2e2b 100644 --- a/schema-registry/spring-cloud-stream-schema-registry-core/src/main/java/org/springframework/cloud/stream/schema/registry/controllers/ServerController.java +++ b/schema-registry/spring-cloud-stream-schema-registry-core/src/main/java/org/springframework/cloud/stream/schema/registry/controllers/ServerController.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.locks.ReentrantLock; import org.springframework.cloud.stream.schema.registry.config.SchemaServerProperties; import org.springframework.cloud.stream.schema.registry.model.Schema; @@ -52,6 +53,7 @@ * @author Ilayaperumal Gopinathan * @author Jeff Maxwell * @author Christian Tzolov + * @author Omer Celik */ @RestController @RequestMapping(path = "${spring.cloud.stream.schema.server.path:}") @@ -63,6 +65,8 @@ public class ServerController { private final SchemaServerProperties schemaServerProperties; + private static final ReentrantLock lock = new ReentrantLock(); + public ServerController(SchemaRepository repository, Map validators, SchemaServerProperties schemaServerProperties) { Assert.notNull(repository, "cannot be null"); @@ -73,41 +77,45 @@ public ServerController(SchemaRepository repository, Map register(@RequestBody Schema schema, UriComponentsBuilder builder) { - - SchemaValidator validator = this.validators.get(schema.getFormat()); + public ResponseEntity register(@RequestBody Schema schema, UriComponentsBuilder builder) { + try { + lock.lock(); + SchemaValidator validator = this.validators.get(schema.getFormat()); - if (validator == null) { - throw new UnsupportedFormatException(String.format("Invalid format, supported types are: %s", + if (validator == null) { + throw new UnsupportedFormatException(String.format("Invalid format, supported types are: %s", StringUtils.collectionToCommaDelimitedString(this.validators.keySet()))); - } + } - validator.validate(schema.getDefinition()); + validator.validate(schema.getDefinition()); - Schema result; - List registeredEntities = + Schema result; + List registeredEntities = this.repository.findBySubjectAndFormatOrderByVersion(schema.getSubject(), schema.getFormat()); - if (registeredEntities.isEmpty()) { - schema.setVersion(1); - result = this.repository.save(schema); - } - else { - result = validator.match(registeredEntities, schema.getDefinition()); - if (result == null) { - schema.setVersion(registeredEntities.get(registeredEntities.size() - 1).getVersion() + 1); + if (registeredEntities.isEmpty()) { + schema.setVersion(1); result = this.repository.save(schema); } + else { + result = validator.match(registeredEntities, schema.getDefinition()); + if (result == null) { + schema.setVersion(registeredEntities.get(registeredEntities.size() - 1).getVersion() + 1); + result = this.repository.save(schema); + } - } + } - HttpHeaders headers = new HttpHeaders(); - headers.add(HttpHeaders.LOCATION, builder.path("/{subject}/{format}/v{version}") + HttpHeaders headers = new HttpHeaders(); + headers.add(HttpHeaders.LOCATION, builder.path("/{subject}/{format}/v{version}") .buildAndExpand(result.getSubject(), result.getFormat(), result.getVersion()) .toString()); - ResponseEntity response = new ResponseEntity<>(result, headers, HttpStatus.CREATED); - - return response; + ResponseEntity response = new ResponseEntity<>(result, headers, HttpStatus.CREATED); + return response; + } + finally { + lock.unlock(); + } } @RequestMapping(method = RequestMethod.GET, produces = "application/json", path = "/{subject}/{format}/v{version}")