Skip to content

Commit

Permalink
GH-2852: support fixed transaction id suffix
Browse files Browse the repository at this point in the history
add properties `maxCache` at `DefaultKafkaProducerFactory`, setting `maxCache` greater than zero can reuse `transactional.id`.

Resolves #2852
  • Loading branch information
Zhiyang.Wang1 committed Nov 23, 2023
1 parent 1a18d28 commit d30de4e
Show file tree
Hide file tree
Showing 9 changed files with 472 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,40 @@ NOTE: If there is a `KafkaTransactionManager` (or synchronized) transaction in p
Instead, a new "nested" transaction is used.

[[transaction-id-prefix]]
== `transactionIdPrefix`
== `TransactionIdPrefix`

With `EOSMode.V2` (aka `BETA`), the only supported mode, it is no longer necessary to use the same `transactional.id`, even for consumer-initiated transactions; in fact, it must be unique on each instance the same as for producer-initiated transactions.
This property must have a different value on each application instance.

[[transaction-id-suffix-fixed]]
== `TransactionIdSuffix Fixed`

Since 3.1, when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range.
When a transaction producer is requested and `transactional.id` all in use, throw a `NoProducerAvailableException`.
User can use then use a RetryTemplate configured to retry that exception, with a suitably configured back off.

[source, java]
----
public static class Config {
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.setTransactionIdPrefix("my.txid.");
pf.setMaxCache(5);
return pf;
}
}
----
When setting `maxCache` to 5, `transactional.id` is `my.txid.`++`{0-4}`+.

IMPORTANT: When use `KafkaTransactionManager` in the `ConcurrentMessageListenerContainer`, `maxCache` must be greater than `concurrency`, also be careful nested transaction.

[[tx-template-mixed]]
== `KafkaTemplate` Transactional and non-Transactional Publishing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ See xref:retrytopic/topic-naming.adoc[Topic Naming] for more information.

When manually assigning partitions, with a `null` consumer `group.id`, the `AckMode` is now automatically coerced to `MANUAL`.
See xref:tips.adoc#tip-assign-all-parts[Manually Assigning All Partitions] for more information.

[[x31-dkpf]]
=== DefaultKafkaProducerFactory

When setting `maxCache` with `transactionIdPrefix`, can restrict `transaction.id` in range.
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache = new ConcurrentHashMap<>();

private final Map<String, BlockingQueue<String>> suffixCache = new ConcurrentHashMap<>();

private final Map<Thread, CloseSafeProducer<K, V>> threadBoundProducers = new ConcurrentHashMap<>();

private final AtomicInteger epoch = new AtomicInteger();
Expand Down Expand Up @@ -153,6 +155,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private boolean producerPerThread;

private int maxCache;

private long maxAge;

private boolean configureSerializers = true;
Expand Down Expand Up @@ -404,7 +408,7 @@ public Duration getPhysicalCloseTimeout() {

/**
* Set a prefix for the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} config. By
* default a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used
* default, a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used
* as a prefix in the target producer configs.
* @param transactionIdPrefix the prefix.
* @since 1.3
Expand Down Expand Up @@ -488,6 +492,16 @@ public List<ProducerPostProcessor<K, V>> getPostProcessors() {
return Collections.unmodifiableList(this.postProcessors);
}

/**
* Set the maximum size for transaction producer cache.
* @param maxCache the maxCache to set
* @since 3.1
*/
public void setMaxCache(int maxCache) {
Assert.isTrue(maxCache >= 0, "max cache must greater than or equal 0");
this.maxCache = maxCache;
}

/**
* Set the maximum age for a producer; useful when using transactions and the broker
* might expire a {@code transactional.id} due to inactivity.
Expand Down Expand Up @@ -709,6 +723,7 @@ public void destroy() {
}
});
this.cache.clear();
this.suffixCache.clear();
this.threadBoundProducers.values().forEach(prod -> {
try {
prod.closeDelegate(this.physicalCloseTimeout, this.listeners);
Expand Down Expand Up @@ -846,7 +861,20 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
}
}
if (cachedProducer == null) {
return doCreateTxProducer(txIdPrefix, "" + this.transactionIdSuffix.getAndIncrement(), this::cacheReturner);
String suffix;
if (this.maxCache > 0) {
BlockingQueue<String> suffixQueue = getSuffixCache(txIdPrefix, this.maxCache);
Assert.notNull(suffixQueue,
() -> "No suffix cache found for " + txIdPrefix + ", max cache" + this.maxCache);
suffix = suffixQueue.poll();
if (suffix == null) {
throw new NoProducerAvailableException("No transaction producer available for " + txIdPrefix);
}
}
else {
suffix = String.valueOf(this.transactionIdSuffix.getAndIncrement());
}
return doCreateTxProducer(txIdPrefix, suffix, this::cacheReturner);
}
else {
return cachedProducer;
Expand All @@ -856,26 +884,30 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
private boolean expire(CloseSafeProducer<K, V> producer) {
boolean expired = this.maxAge > 0 && System.currentTimeMillis() - producer.created > this.maxAge;
if (expired) {
producer.closeDelegate(this.physicalCloseTimeout, this.listeners);
closeTransactionProducer(producer, this.physicalCloseTimeout, this.listeners);
}
return expired;
}

boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
if (producerToRemove.closed) {
producerToRemove.closeDelegate(timeout, this.listeners);
closeTransactionProducer(producerToRemove, timeout, this.listeners);
return true;
}
else {
this.globalLock.lock();
try {
BlockingQueue<CloseSafeProducer<K, V>> txIdCache = getCache(producerToRemove.txIdPrefix);
if (producerToRemove.epoch != this.epoch.get()
|| (txIdCache != null && !txIdCache.contains(producerToRemove)
&& !txIdCache.offer(producerToRemove))) {
if (producerToRemove.epoch != this.epoch.get()) {
producerToRemove.closeDelegate(timeout, this.listeners);
return true;
}

BlockingQueue<CloseSafeProducer<K, V>> txIdCache = getCache(producerToRemove.txIdPrefix);
if (txIdCache != null && !txIdCache.contains(producerToRemove)
&& !txIdCache.offer(producerToRemove)) {
closeTransactionProducer(producerToRemove, timeout, this.listeners);
return true;
}
}
finally {
this.globalLock.unlock();
Expand All @@ -884,6 +916,38 @@ boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout
}
}

private void closeTransactionProducer(CloseSafeProducer<K, V> producer, Duration timeout,
List<Listener<K, V>> listeners) {
try {
producer.closeDelegate(timeout, listeners);
}
finally {
reuseTransactionIdSuffix(producer);
}
}

private void reuseTransactionIdSuffix(CloseSafeProducer<K, V> producerToRemove) {
reuseTransactionIdSuffix(producerToRemove.txIdPrefix, producerToRemove.txIdSuffix, producerToRemove.epoch);
}

private void reuseTransactionIdSuffix(@Nullable String txIdPrefix, @Nullable String suffix, int epoch) {
if (txIdPrefix == null || suffix == null) {
return;
}
this.globalLock.lock();
try {
if (this.maxCache > 0) {
BlockingQueue<String> queue = getSuffixCache(txIdPrefix, this.maxCache);
if (epoch == this.epoch.get() && queue != null && !queue.contains(suffix)) {
queue.add(suffix);
}
}
}
finally {
this.globalLock.unlock();
}
}

private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
BiPredicate<CloseSafeProducer<K, V>, Duration> remover) {
Producer<K, V> newProducer = createRawProducer(getTxProducerConfigs(prefix + suffix));
Expand All @@ -899,10 +963,13 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
newEx.addSuppressed(ex2);
throw newEx; // NOSONAR - lost stack trace
}
finally {
reuseTransactionIdSuffix(prefix, suffix, this.epoch.get());
}
throw new KafkaException("initTransactions() failed", ex);
}
CloseSafeProducer<K, V> closeSafeProducer =
new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName,
new CloseSafeProducer<>(newProducer, remover, prefix, suffix, this.physicalCloseTimeout, this.beanName,
this.epoch.get());
this.listeners.forEach(listener -> listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer));
return closeSafeProducer;
Expand All @@ -923,13 +990,38 @@ protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
}

@Nullable
protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String txIdPrefix) {
protected BlockingQueue<CloseSafeProducer<K, V>> getCache(@Nullable String txIdPrefix) {
if (txIdPrefix == null) {
return null;
}
return this.cache.computeIfAbsent(txIdPrefix, txId -> new LinkedBlockingQueue<>());
}

@Nullable
protected BlockingQueue<String> getSuffixCache() {
return getSuffixCache(this.transactionIdPrefix);
}

@Nullable
protected BlockingQueue<String> getSuffixCache(@Nullable String txIdPrefix) {
return getSuffixCache(txIdPrefix, this.maxCache);
}

@Nullable
protected BlockingQueue<String> getSuffixCache(@Nullable String txIdPrefix, int maxCache) {
if (txIdPrefix == null || maxCache <= 0) {
return null;
}

return this.suffixCache.computeIfAbsent(txIdPrefix, txId -> {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
for (int suffix = 0; suffix < maxCache; suffix++) {
queue.add(String.valueOf(this.transactionIdSuffix.getAndIncrement()));
}
return queue;
});
}

/**
* When using {@link #setProducerPerThread(boolean)} (true), call this method to close
* and release this thread's producer. Thread bound producers are <b>not</b> closed by
Expand Down Expand Up @@ -991,6 +1083,8 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {

final String txIdPrefix; // NOSONAR

final String txIdSuffix; // NOSONAR

final long created; // NOSONAR

private final Duration closeTimeout;
Expand All @@ -1010,14 +1104,21 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
this(delegate, removeConsumerProducer, null, closeTimeout, factoryName, epoch);
}

CloseSafeProducer(Producer<K, V> delegate, BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer,
@Nullable String txIdPrefix, Duration closeTimeout, String factoryName, int epoch) {

this(delegate, removeProducer, txIdPrefix, null, closeTimeout, factoryName, epoch);
}

CloseSafeProducer(Producer<K, V> delegate,
BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, @Nullable String txIdPrefix,
Duration closeTimeout, String factoryName, int epoch) {
@Nullable String txIdSuffix, Duration closeTimeout, String factoryName, int epoch) {

Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
this.delegate = delegate;
this.removeProducer = removeProducer;
this.txIdPrefix = txIdPrefix;
this.txIdSuffix = txIdSuffix;
this.closeTimeout = closeTimeout;
Map<MetricName, ? extends Metric> metrics = delegate.metrics();
Iterator<MetricName> metricIterator = metrics.keySet().iterator();
Expand Down Expand Up @@ -1047,17 +1148,12 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
LOGGER.trace(() -> toString() + " send(" + record + ")");
return this.delegate.send(record, new Callback() {

@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception instanceof OutOfOrderSequenceException) {
CloseSafeProducer.this.producerFailed = exception;
close(CloseSafeProducer.this.closeTimeout);
}
callback.onCompletion(metadata, exception);
return this.delegate.send(record, (metadata, exception) -> {
if (exception instanceof OutOfOrderSequenceException) {
CloseSafeProducer.this.producerFailed = exception;
close(CloseSafeProducer.this.closeTimeout);
}

callback.onCompletion(metadata, exception);
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.core;

import org.springframework.lang.Nullable;

/**
* An exception thrown by no transaction producer available exception, when set
* {@link DefaultKafkaProducerFactory} maxCache greater than 0.
*
* @author Wang Zhiyang
* @since 3.1.1
*
*/
public class NoProducerAvailableException extends RuntimeException {

private static final long serialVersionUID = 1L;

/**
* Constructs a new no producer available exception with the specified detail message and cause.
* @param message the message.
*/
public NoProducerAvailableException(String message) {
this(message, null);
}

/**
* Constructs a new no producer available exception with the specified detail message and cause.
* @param message the message.
* @param cause the cause.
*/
public NoProducerAvailableException(String message, @Nullable Throwable cause) {
super(message, cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,6 @@ else if (listener instanceof MessageListener) {
if (this.containerProperties.isLogContainerConfig()) {
this.logger.info(toString());
}
Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
ApplicationContext applicationContext = getApplicationContext();
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
Expand Down
Loading

0 comments on commit d30de4e

Please sign in to comment.