-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-2852: support fixed transaction id suffix #2913
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -125,6 +125,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory | |
|
||
private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache = new ConcurrentHashMap<>(); | ||
|
||
private final Map<String, BlockingQueue<String>> suffixCache = new ConcurrentHashMap<>(); | ||
|
||
private final Map<Thread, CloseSafeProducer<K, V>> threadBoundProducers = new ConcurrentHashMap<>(); | ||
|
||
private final AtomicInteger epoch = new AtomicInteger(); | ||
|
@@ -153,6 +155,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory | |
|
||
private boolean producerPerThread; | ||
|
||
private int maxCache; | ||
|
||
private long maxAge; | ||
|
||
private boolean configureSerializers = true; | ||
|
@@ -404,7 +408,7 @@ public Duration getPhysicalCloseTimeout() { | |
|
||
/** | ||
* Set a prefix for the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} config. By | ||
* default a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used | ||
* default, a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used | ||
* as a prefix in the target producer configs. | ||
* @param transactionIdPrefix the prefix. | ||
* @since 1.3 | ||
|
@@ -420,6 +424,15 @@ public final void setTransactionIdPrefix(String transactionIdPrefix) { | |
return this.transactionIdPrefix; | ||
} | ||
|
||
public final boolean initTransactionIdSuffix(int transactionIdSuffix) { | ||
Assert.isTrue(transactionIdSuffix >= 0, "'transactionIdSuffix' initial value must greater than or equal 0"); | ||
return this.transactionIdSuffix.compareAndSet(0, transactionIdSuffix); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, @sobychacko new interface is here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you, but I didn't quite mean that, so I've decided to lend a hand. Could you please take a look? |
||
|
||
public int getCurrTransactionIdSuffix() { | ||
return this.transactionIdSuffix.get(); | ||
} | ||
|
||
/** | ||
* Set to true to create a producer per thread instead of singleton that is shared by | ||
* all clients. Clients <b>must</b> call {@link #closeThreadBoundProducer()} to | ||
|
@@ -488,6 +501,16 @@ public List<ProducerPostProcessor<K, V>> getPostProcessors() { | |
return Collections.unmodifiableList(this.postProcessors); | ||
} | ||
|
||
/** | ||
* Set the maximum size for transaction producer cache. | ||
* @param maxCache the maxCache to set | ||
* @since 3.1 | ||
*/ | ||
public void setMaxCache(int maxCache) { | ||
Assert.isTrue(maxCache >= 0, "'maxCache' must greater than or equal 0"); | ||
this.maxCache = maxCache; | ||
} | ||
|
||
/** | ||
* Set the maximum age for a producer; useful when using transactions and the broker | ||
* might expire a {@code transactional.id} due to inactivity. | ||
|
@@ -709,6 +732,7 @@ public void destroy() { | |
} | ||
}); | ||
this.cache.clear(); | ||
this.suffixCache.clear(); | ||
this.threadBoundProducers.values().forEach(prod -> { | ||
try { | ||
prod.closeDelegate(this.physicalCloseTimeout, this.listeners); | ||
|
@@ -846,7 +870,20 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) { | |
} | ||
} | ||
if (cachedProducer == null) { | ||
return doCreateTxProducer(txIdPrefix, "" + this.transactionIdSuffix.getAndIncrement(), this::cacheReturner); | ||
String suffix; | ||
if (this.maxCache > 0) { | ||
BlockingQueue<String> suffixQueue = getSuffixCache(txIdPrefix, this.maxCache); | ||
Assert.notNull(suffixQueue, | ||
() -> "No suffix cache found for " + txIdPrefix + ", max cache" + this.maxCache); | ||
suffix = suffixQueue.poll(); | ||
if (suffix == null) { | ||
throw new NoProducerAvailableException("No available transaction producer suffix for " + txIdPrefix); | ||
} | ||
} | ||
else { | ||
suffix = String.valueOf(this.transactionIdSuffix.getAndIncrement()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice work, and thanks for your effort! However, I think just incrementing may not be sufficient. The goal of GH-2852 is to allow users to set how There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Via new interface, the first instance of service increments If I understand correctly, I like it. At the same time we need to monitor the generated As an alternative, write a unique ID in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It actually depends on the implementation. It could distribute
Certainly, we need this functionality. Therefore, the interface must include a contract to retrieve the
In my environment, it's not an option because I have a mask for
This comment was marked as abuse.
Sorry, something went wrong. |
||
} | ||
return doCreateTxProducer(txIdPrefix, suffix, this::cacheReturner); | ||
} | ||
else { | ||
return cachedProducer; | ||
|
@@ -856,26 +893,30 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) { | |
private boolean expire(CloseSafeProducer<K, V> producer) { | ||
boolean expired = this.maxAge > 0 && System.currentTimeMillis() - producer.created > this.maxAge; | ||
if (expired) { | ||
producer.closeDelegate(this.physicalCloseTimeout, this.listeners); | ||
closeTransactionProducer(producer, this.physicalCloseTimeout, this.listeners); | ||
} | ||
return expired; | ||
} | ||
|
||
boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout) { | ||
if (producerToRemove.closed) { | ||
producerToRemove.closeDelegate(timeout, this.listeners); | ||
closeTransactionProducer(producerToRemove, timeout, this.listeners); | ||
return true; | ||
} | ||
else { | ||
this.globalLock.lock(); | ||
try { | ||
BlockingQueue<CloseSafeProducer<K, V>> txIdCache = getCache(producerToRemove.txIdPrefix); | ||
if (producerToRemove.epoch != this.epoch.get() | ||
|| (txIdCache != null && !txIdCache.contains(producerToRemove) | ||
&& !txIdCache.offer(producerToRemove))) { | ||
if (producerToRemove.epoch != this.epoch.get()) { | ||
producerToRemove.closeDelegate(timeout, this.listeners); | ||
return true; | ||
} | ||
|
||
BlockingQueue<CloseSafeProducer<K, V>> txIdCache = getCache(producerToRemove.txIdPrefix); | ||
if (txIdCache != null && !txIdCache.contains(producerToRemove) | ||
&& !txIdCache.offer(producerToRemove)) { | ||
closeTransactionProducer(producerToRemove, timeout, this.listeners); | ||
return true; | ||
} | ||
} | ||
finally { | ||
this.globalLock.unlock(); | ||
|
@@ -884,6 +925,38 @@ boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout | |
} | ||
} | ||
|
||
private void closeTransactionProducer(CloseSafeProducer<K, V> producer, Duration timeout, | ||
List<Listener<K, V>> listeners) { | ||
try { | ||
producer.closeDelegate(timeout, listeners); | ||
} | ||
finally { | ||
reuseTransactionIdSuffix(producer); | ||
} | ||
} | ||
|
||
private void reuseTransactionIdSuffix(CloseSafeProducer<K, V> producerToRemove) { | ||
reuseTransactionIdSuffix(producerToRemove.txIdPrefix, producerToRemove.txIdSuffix, producerToRemove.epoch); | ||
} | ||
|
||
private void reuseTransactionIdSuffix(@Nullable String txIdPrefix, @Nullable String suffix, int epoch) { | ||
if (txIdPrefix == null || suffix == null) { | ||
return; | ||
} | ||
this.globalLock.lock(); | ||
try { | ||
if (this.maxCache > 0) { | ||
BlockingQueue<String> queue = getSuffixCache(txIdPrefix, this.maxCache); | ||
if (epoch == this.epoch.get() && queue != null && !queue.contains(suffix)) { | ||
queue.add(suffix); | ||
} | ||
} | ||
} | ||
finally { | ||
this.globalLock.unlock(); | ||
} | ||
} | ||
|
||
private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix, | ||
BiPredicate<CloseSafeProducer<K, V>, Duration> remover) { | ||
Producer<K, V> newProducer = createRawProducer(getTxProducerConfigs(prefix + suffix)); | ||
|
@@ -899,10 +972,13 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix, | |
newEx.addSuppressed(ex2); | ||
throw newEx; // NOSONAR - lost stack trace | ||
} | ||
finally { | ||
reuseTransactionIdSuffix(prefix, suffix, this.epoch.get()); | ||
} | ||
throw new KafkaException("initTransactions() failed", ex); | ||
} | ||
CloseSafeProducer<K, V> closeSafeProducer = | ||
new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName, | ||
new CloseSafeProducer<>(newProducer, remover, prefix, suffix, this.physicalCloseTimeout, this.beanName, | ||
this.epoch.get()); | ||
this.listeners.forEach(listener -> listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer)); | ||
return closeSafeProducer; | ||
|
@@ -923,13 +999,38 @@ protected BlockingQueue<CloseSafeProducer<K, V>> getCache() { | |
} | ||
|
||
@Nullable | ||
protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String txIdPrefix) { | ||
protected BlockingQueue<CloseSafeProducer<K, V>> getCache(@Nullable String txIdPrefix) { | ||
if (txIdPrefix == null) { | ||
return null; | ||
} | ||
return this.cache.computeIfAbsent(txIdPrefix, txId -> new LinkedBlockingQueue<>()); | ||
} | ||
|
||
@Nullable | ||
protected BlockingQueue<String> getSuffixCache() { | ||
return getSuffixCache(this.transactionIdPrefix); | ||
} | ||
|
||
@Nullable | ||
protected BlockingQueue<String> getSuffixCache(@Nullable String txIdPrefix) { | ||
return getSuffixCache(txIdPrefix, this.maxCache); | ||
} | ||
|
||
@Nullable | ||
protected BlockingQueue<String> getSuffixCache(@Nullable String txIdPrefix, int maxCache) { | ||
if (txIdPrefix == null || maxCache <= 0) { | ||
return null; | ||
} | ||
|
||
return this.suffixCache.computeIfAbsent(txIdPrefix, txId -> { | ||
BlockingQueue<String> queue = new LinkedBlockingQueue<>(); | ||
for (int suffix = 0; suffix < maxCache; suffix++) { | ||
queue.add(String.valueOf(this.transactionIdSuffix.getAndIncrement())); | ||
} | ||
return queue; | ||
}); | ||
} | ||
|
||
/** | ||
* When using {@link #setProducerPerThread(boolean)} (true), call this method to close | ||
* and release this thread's producer. Thread bound producers are <b>not</b> closed by | ||
|
@@ -991,6 +1092,8 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> { | |
|
||
final String txIdPrefix; // NOSONAR | ||
|
||
final String txIdSuffix; // NOSONAR | ||
|
||
final long created; // NOSONAR | ||
|
||
private final Duration closeTimeout; | ||
|
@@ -1010,14 +1113,21 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> { | |
this(delegate, removeConsumerProducer, null, closeTimeout, factoryName, epoch); | ||
} | ||
|
||
CloseSafeProducer(Producer<K, V> delegate, BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, | ||
@Nullable String txIdPrefix, Duration closeTimeout, String factoryName, int epoch) { | ||
|
||
this(delegate, removeProducer, txIdPrefix, null, closeTimeout, factoryName, epoch); | ||
} | ||
|
||
CloseSafeProducer(Producer<K, V> delegate, | ||
BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, @Nullable String txIdPrefix, | ||
Duration closeTimeout, String factoryName, int epoch) { | ||
@Nullable String txIdSuffix, Duration closeTimeout, String factoryName, int epoch) { | ||
|
||
Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer"); | ||
this.delegate = delegate; | ||
this.removeProducer = removeProducer; | ||
this.txIdPrefix = txIdPrefix; | ||
this.txIdSuffix = txIdSuffix; | ||
this.closeTimeout = closeTimeout; | ||
Map<MetricName, ? extends Metric> metrics = delegate.metrics(); | ||
Iterator<MetricName> metricIterator = metrics.keySet().iterator(); | ||
|
@@ -1047,17 +1157,12 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) { | |
@Override | ||
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { | ||
LOGGER.trace(() -> toString() + " send(" + record + ")"); | ||
return this.delegate.send(record, new Callback() { | ||
|
||
@Override | ||
public void onCompletion(RecordMetadata metadata, Exception exception) { | ||
if (exception instanceof OutOfOrderSequenceException) { | ||
CloseSafeProducer.this.producerFailed = exception; | ||
close(CloseSafeProducer.this.closeTimeout); | ||
} | ||
callback.onCompletion(metadata, exception); | ||
return this.delegate.send(record, (metadata, exception) -> { | ||
if (exception instanceof OutOfOrderSequenceException) { | ||
CloseSafeProducer.this.producerFailed = exception; | ||
close(CloseSafeProducer.this.closeTimeout); | ||
} | ||
|
||
callback.onCompletion(metadata, exception); | ||
}); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Copyright 2023-2023 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.kafka.core; | ||
|
||
import org.springframework.lang.Nullable; | ||
|
||
/** | ||
* An exception thrown by no transaction producer available exception, when set | ||
* {@link DefaultKafkaProducerFactory} maxCache greater than 0. | ||
* | ||
* @author Wang Zhiyang | ||
* @since 3.1.1 | ||
* | ||
*/ | ||
public class NoProducerAvailableException extends RuntimeException { | ||
|
||
private static final long serialVersionUID = 1L; | ||
|
||
/** | ||
* Constructs a new no producer available exception with the specified detail message and cause. | ||
* @param message the message. | ||
*/ | ||
public NoProducerAvailableException(String message) { | ||
this(message, null); | ||
} | ||
|
||
/** | ||
* Constructs a new no producer available exception with the specified detail message and cause. | ||
* @param message the message. | ||
* @param cause the cause. | ||
*/ | ||
public NoProducerAvailableException(String message, @Nullable Throwable cause) { | ||
super(message, cause); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"also be careful about".
It would be beneficial if you could add a few sentences detailing why the
maxCache
needs to be greater thanconcurrency
. Same thing about nested transactions.