diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java index 0751e7dbd3..8b6f62cb79 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,6 +55,7 @@ * * @author Gary Russell * @author Christian Tzolov + * @author Ngoc Nhan * @since 2.4 * */ @@ -107,29 +108,29 @@ public RabbitStreamTemplate(Environment environment, String streamName) { private Producer createOrGetProducer() { - this.lock.lock(); - try { - if (this.producer == null) { - ProducerBuilder builder = this.environment.producerBuilder(); - if (this.superStreamRouting == null) { - builder.stream(this.streamName); - } - else { - builder.superStream(this.streamName) - .routing(this.superStreamRouting); - } - this.producerCustomizer.accept(this.beanName, builder); - this.producer = builder.build(); - if (!this.streamConverterSet) { - ((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier( - () -> this.producer.messageBuilder()); - } + if (this.producer == null) { + this.lock.lock(); + try { + ProducerBuilder builder = this.environment.producerBuilder(); + if (this.superStreamRouting == null) { + builder.stream(this.streamName); + } + else { + builder.superStream(this.streamName) + .routing(this.superStreamRouting); + } + this.producerCustomizer.accept(this.beanName, builder); + this.producer = builder.build(); + if (!this.streamConverterSet) { + ((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier( + () -> this.producer.messageBuilder()); + } + } + finally { + this.lock.unlock(); } - return this.producer; - } - finally { - this.lock.unlock(); } + return this.producer; } @Override @@ -305,24 +306,13 @@ private ConfirmationHandler handleConfirm(CompletableFuture future, Obs } else { int code = confStatus.getCode(); - String errorMessage; - switch (code) { - case Constants.CODE_MESSAGE_ENQUEUEING_FAILED: - errorMessage = "Message Enqueueing Failed"; - break; - case Constants.CODE_PRODUCER_CLOSED: - errorMessage = "Producer Closed"; - break; - case Constants.CODE_PRODUCER_NOT_AVAILABLE: - errorMessage = "Producer Not Available"; - break; - case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT: - errorMessage = "Publish Confirm Timeout"; - break; - default: - errorMessage = "Unknown code: " + code; - break; - } + String errorMessage = switch (code) { + case Constants.CODE_MESSAGE_ENQUEUEING_FAILED -> "Message Enqueueing Failed"; + case Constants.CODE_PRODUCER_CLOSED -> "Producer Closed"; + case Constants.CODE_PRODUCER_NOT_AVAILABLE -> "Producer Not Available"; + case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT -> "Publish Confirm Timeout"; + default -> "Unknown code: " + code; + }; StreamSendException ex = new StreamSendException(errorMessage, code); observation.error(ex); observation.stop(); @@ -339,15 +329,15 @@ private ConfirmationHandler handleConfirm(CompletableFuture future, Obs */ @Override public void close() { - this.lock.lock(); - try { - if (this.producer != null) { - this.producer.close(); - this.producer = null; + if (this.producer != null) { + this.lock.lock(); + try { + this.producer.close(); + this.producer = null; + } + finally { + this.lock.unlock(); } - } - finally { - this.lock.unlock(); } }