From d982c8e82218484e801d2111cf48684dc921c58c Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Sat, 16 Dec 2023 20:25:42 -0500 Subject: [PATCH] GH-2943: Fix KT.clusterId for concurrency (#2944) Fixes: #2943 The `if (this.kafkaAdmin != null && this.clusterId == null) {` condition might be always true for concurrent threads (especially virtual). Therefore, all of those threads are calling `this.kafkaAdmin.clusterId()` making unnecessary network chats to Kafka broker * Surround `this.kafkaAdmin.clusterId()` call with `Lock` **Cherry-pick to `3.0.x`** --- .../springframework/kafka/core/KafkaTemplate.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index a059808950..9a4166d3d1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import org.apache.commons.logging.LogFactory; @@ -118,6 +120,8 @@ public class KafkaTemplate implements KafkaOperations, ApplicationCo private final Map micrometerTags = new HashMap<>(); + private final Lock clusterIdLock = new ReentrantLock(); + private String beanName = "kafkaTemplate"; private ApplicationContext applicationContext; @@ -501,7 +505,15 @@ else if (this.micrometerEnabled) { @Nullable private String clusterId() { if (this.kafkaAdmin != null && this.clusterId == null) { - this.clusterId = this.kafkaAdmin.clusterId(); + this.clusterIdLock.lock(); + try { + if (this.clusterId == null) { + this.clusterId = this.kafkaAdmin.clusterId(); + } + } + finally { + this.clusterIdLock.unlock(); + } } return this.clusterId; }