Skip to content

Commit

Permalink
GH-2943: Fix KT.clusterId for concurrency (#2944)
Browse files Browse the repository at this point in the history
Fixes: #2943

The `if (this.kafkaAdmin != null && this.clusterId == null) {` condition
might be always true for concurrent threads (especially virtual).
Therefore, all of those threads are calling `this.kafkaAdmin.clusterId()`
making unnecessary network chats to Kafka broker

* Surround `this.kafkaAdmin.clusterId()` call with `Lock`

**Cherry-pick to `3.0.x`**
  • Loading branch information
artembilan authored Dec 17, 2023
1 parent eb68d6d commit d982c8e
Showing 1 changed file with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -118,6 +120,8 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo

private final Map<String, String> micrometerTags = new HashMap<>();

private final Lock clusterIdLock = new ReentrantLock();

private String beanName = "kafkaTemplate";

private ApplicationContext applicationContext;
Expand Down Expand Up @@ -501,7 +505,15 @@ else if (this.micrometerEnabled) {
@Nullable
private String clusterId() {
if (this.kafkaAdmin != null && this.clusterId == null) {
this.clusterId = this.kafkaAdmin.clusterId();
this.clusterIdLock.lock();
try {
if (this.clusterId == null) {
this.clusterId = this.kafkaAdmin.clusterId();
}
}
finally {
this.clusterIdLock.unlock();
}
}
return this.clusterId;
}
Expand Down

0 comments on commit d982c8e

Please sign in to comment.