Skip to content

Commit

Permalink
GH-2852: Provide a way to customize a transactionIdSuffix
Browse files Browse the repository at this point in the history
  • Loading branch information
stillya committed Dec 10, 2023
1 parent 90b6ea9 commit 4e8237d
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,12 @@ This property must have a different value on each application instance.
[[transaction-id-suffix-fixed]]
== `TransactionIdSuffix Fixed`

Since 3.1, when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range.
Since 3.1, introduced a new interface `TransactionSuffixManager` to manage `transactional.id` suffix.
The default implementation is `DefaultTransactionSuffixManager` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
When a transaction producer is requested and `transactional.id` all in use, throw a `NoProducerAvailableException`.
User can use then use a RetryTemplate configured to retry that exception, with a suitably configured back off.
User can then use a RetryTemplate configured to retry that exception, with a suitably configured back off.

[source, java]
[source,java]
----
public static class Config {
Expand All @@ -125,16 +126,20 @@ public static class Config {
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.setTransactionIdPrefix("my.txid.");
pf.setMaxCache(5);
DefaultTransactionSuffixManager sm = new DefaultTransactionSuffixManager();
sm.setMaxCache(5);
pf.setTransactionSuffixManager(sm);
return pf;
}
}
----
When setting `maxCache` to 5, `transactional.id` is `my.txid.`++`{0-4}`+.

IMPORTANT: When use `KafkaTransactionManager` in the `ConcurrentMessageListenerContainer`, `maxCache` must be greater than `concurrency`, also be careful nested transaction.
IMPORTANT: When use `KafkaTransactionManager` in the `ConcurrentMessageListenerContainer` and enable `maxCache`, `maxCache` must be greater than or equal to `concurrency`.
If some `MessageListenerContainer` cannot get the transaction, will throw `NoProducerAvailableException`.
When use nested transactions in `ConcurrentMessageListenerContainer`, `maxCache` needs to increase the number of nested transactions.


[[tx-template-mixed]]
== `KafkaTemplate` Transactional and non-Transactional Publishing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ When manually assigning partitions, with a `null` consumer `group.id`, the `AckM
See xref:tips.adoc#tip-assign-all-parts[Manually Assigning All Partitions] for more information.

[[x31-dkpf]]
=== DefaultKafkaProducerFactory
=== TransactionSuffixManager

When setting `maxCache` with `transactionIdPrefix`, can restrict `transaction.id` in range.
A new `TransactionSuffixManager` interface with `DefaultTransactionSuffixManager` implementation is provided to restrict `transaction.id` in range.
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private final Map<String, Object> configs;

private final AtomicInteger transactionIdSuffix = new AtomicInteger();

private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache = new ConcurrentHashMap<>();

private final Map<String, BlockingQueue<String>> suffixCache = new ConcurrentHashMap<>();

private final Map<Thread, CloseSafeProducer<K, V>> threadBoundProducers = new ConcurrentHashMap<>();

private final AtomicInteger epoch = new AtomicInteger();
Expand All @@ -139,6 +135,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private final AtomicBoolean running = new AtomicBoolean();

private TransactionSuffixManager transactionSuffixManager = new DefaultTransactionSuffixManager();

private Supplier<Serializer<K>> keySerializerSupplier;

private Supplier<Serializer<V>> valueSerializerSupplier;
Expand All @@ -155,8 +153,6 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private boolean producerPerThread;

private int maxCache;

private long maxAge;

private boolean configureSerializers = true;
Expand Down Expand Up @@ -353,6 +349,15 @@ public void setKeySerializerSupplier(Supplier<Serializer<K>> keySerializerSuppli
public void setValueSerializerSupplier(Supplier<Serializer<V>> valueSerializerSupplier) {
this.valueSerializerSupplier = valueSerializerSupplier(valueSerializerSupplier);
}

Check failure on line 352 in spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

View workflow job for this annotation

GitHub Actions / build-pull-request / build

[Task :spring-kafka:checkstyleMain] [Regexp] Line matches the illegal pattern 'Trailing whitespace'.
/**
* Set the transaction suffix manager.
* @param transactionSuffixManager the manager.
* @since 3.1.1
*/
public void setTransactionSuffixManager(TransactionSuffixManager transactionSuffixManager) {
this.transactionSuffixManager = transactionSuffixManager;
}

/**
* If true (default), programmatically provided serializers (via constructor or
Expand Down Expand Up @@ -424,15 +429,6 @@ public final void setTransactionIdPrefix(String transactionIdPrefix) {
return this.transactionIdPrefix;
}

public final boolean initTransactionIdSuffix(int transactionIdSuffix) {
Assert.isTrue(transactionIdSuffix >= 0, "'transactionIdSuffix' initial value must greater than or equal 0");
return this.transactionIdSuffix.compareAndSet(0, transactionIdSuffix);
}

public int getCurrTransactionIdSuffix() {
return this.transactionIdSuffix.get();
}

/**
* Set to true to create a producer per thread instead of singleton that is shared by
* all clients. Clients <b>must</b> call {@link #closeThreadBoundProducer()} to
Expand Down Expand Up @@ -501,16 +497,6 @@ public List<ProducerPostProcessor<K, V>> getPostProcessors() {
return Collections.unmodifiableList(this.postProcessors);
}

/**
* Set the maximum size for transaction producer cache.
* @param maxCache the maxCache to set
* @since 3.1
*/
public void setMaxCache(int maxCache) {
Assert.isTrue(maxCache >= 0, "'maxCache' must greater than or equal 0");
this.maxCache = maxCache;
}

/**
* Set the maximum age for a producer; useful when using transactions and the broker
* might expire a {@code transactional.id} due to inactivity.
Expand Down Expand Up @@ -732,7 +718,7 @@ public void destroy() {
}
});
this.cache.clear();
this.suffixCache.clear();
this.transactionSuffixManager.reset();
this.threadBoundProducers.values().forEach(prod -> {
try {
prod.closeDelegate(this.physicalCloseTimeout, this.listeners);
Expand Down Expand Up @@ -870,19 +856,7 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
}
}
if (cachedProducer == null) {
String suffix;
if (this.maxCache > 0) {
BlockingQueue<String> suffixQueue = getSuffixCache(txIdPrefix, this.maxCache);
Assert.notNull(suffixQueue,
() -> "No suffix cache found for " + txIdPrefix + ", max cache" + this.maxCache);
suffix = suffixQueue.poll();
if (suffix == null) {
throw new NoProducerAvailableException("No available transaction producer suffix for " + txIdPrefix);
}
}
else {
suffix = String.valueOf(this.transactionIdSuffix.getAndIncrement());
}
String suffix = this.transactionSuffixManager.retrieveSuffix(txIdPrefix);
return doCreateTxProducer(txIdPrefix, suffix, this::cacheReturner);
}
else {
Expand Down Expand Up @@ -931,29 +905,7 @@ private void closeTransactionProducer(CloseSafeProducer<K, V> producer, Duration
producer.closeDelegate(timeout, listeners);
}
finally {
reuseTransactionIdSuffix(producer);
}
}

private void reuseTransactionIdSuffix(CloseSafeProducer<K, V> producerToRemove) {
reuseTransactionIdSuffix(producerToRemove.txIdPrefix, producerToRemove.txIdSuffix, producerToRemove.epoch);
}

private void reuseTransactionIdSuffix(@Nullable String txIdPrefix, @Nullable String suffix, int epoch) {
if (txIdPrefix == null || suffix == null) {
return;
}
this.globalLock.lock();
try {
if (this.maxCache > 0) {
BlockingQueue<String> queue = getSuffixCache(txIdPrefix, this.maxCache);
if (epoch == this.epoch.get() && queue != null && !queue.contains(suffix)) {
queue.add(suffix);
}
}
}
finally {
this.globalLock.unlock();
this.transactionSuffixManager.returnSuffix(producer.txIdPrefix, producer.txIdSuffix);
}
}

Expand All @@ -973,7 +925,7 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
throw newEx; // NOSONAR - lost stack trace
}
finally {
reuseTransactionIdSuffix(prefix, suffix, this.epoch.get());
this.transactionSuffixManager.returnSuffix(prefix, suffix);
}
throw new KafkaException("initTransactions() failed", ex);
}
Expand Down Expand Up @@ -1006,31 +958,6 @@ protected BlockingQueue<CloseSafeProducer<K, V>> getCache(@Nullable String txIdP
return this.cache.computeIfAbsent(txIdPrefix, txId -> new LinkedBlockingQueue<>());
}

@Nullable
protected BlockingQueue<String> getSuffixCache() {
return getSuffixCache(this.transactionIdPrefix);
}

@Nullable
protected BlockingQueue<String> getSuffixCache(@Nullable String txIdPrefix) {
return getSuffixCache(txIdPrefix, this.maxCache);
}

@Nullable
protected BlockingQueue<String> getSuffixCache(@Nullable String txIdPrefix, int maxCache) {
if (txIdPrefix == null || maxCache <= 0) {
return null;
}

return this.suffixCache.computeIfAbsent(txIdPrefix, txId -> {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
for (int suffix = 0; suffix < maxCache; suffix++) {
queue.add(String.valueOf(this.transactionIdSuffix.getAndIncrement()));
}
return queue;
});
}

/**
* When using {@link #setProducerPerThread(boolean)} (true), call this method to close
* and release this thread's producer. Thread bound producers are <b>not</b> closed by
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.core;

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
* The {@link TransactionSuffixManager} implementation for managing transactional producer suffixes.
* If the {@link #maxCache} is greater than 0, the suffixes will be cached and reused.
* Otherwise, the suffixes will be generated on the fly.
*
* <p>
* Setting the {@link #setMaxCache(int)} enables caching to restrict the number of `transactional.id`.
* The cache is per `transactional.id` prefix. The cache size is limited by the {@link #maxCache}.
* Default is 0, which means no caching and restriction, so the `transactional.id` will be generated on the fly.
*
* @author Ilya Starchenko
*
* @since 3.1.1
*/
public class DefaultTransactionSuffixManager implements TransactionSuffixManager {

private final AtomicInteger transactionIdSuffix = new AtomicInteger();

private final Map<String, BlockingQueue<String>> suffixCache = new ConcurrentHashMap<>();

private int maxCache;

/**
* Retrieve the suffix for the transactional producer from the cache or generate a new one

Check failure on line 51 in spring-kafka/src/main/java/org/springframework/kafka/core/DefaultTransactionSuffixManager.java

View workflow job for this annotation

GitHub Actions / build-pull-request / build

[Task :spring-kafka:checkstyleMain] [Regexp] Line matches the illegal pattern 'Trailing whitespace'.
* if caching is disabled.
* @param txIdPrefix the transaction id prefix.
* @return the suffix.
* @throws NoProducerAvailableException if caching is enabled and no suffixes are available.
*/
@Override
public String retrieveSuffix(String txIdPrefix) {
BlockingQueue<String> cache = getSuffixCache(txIdPrefix);
if (cache == null) {
return String.valueOf(this.transactionIdSuffix.getAndIncrement());
}

String suffix = cache.poll();
if (suffix == null) {
throw new NoProducerAvailableException("No available transaction producer suffix for " + txIdPrefix);
}
return suffix;
}

@Override
public void returnSuffix(@Nullable String txIdPrefix, @Nullable String suffix) {
if (this.maxCache <= 0 || suffix == null) {
return;
}
BlockingQueue<String> queue = getSuffixCache(txIdPrefix);
if (queue != null && !queue.contains(suffix)) {
queue.add(suffix);
}
}

@Override
public void reset() {
this.suffixCache.clear();
}

/**
* Set the maximum size for transaction producer cache.
* @param maxCache the maxCache to set
*/
public void setMaxCache(int maxCache) {
Assert.isTrue(maxCache >= 0, "'maxCache' must greater than or equal 0");
this.maxCache = maxCache;
}

@Nullable
private BlockingQueue<String> getSuffixCache(@Nullable String txIdPrefix) {
if (txIdPrefix == null || this.maxCache <= 0) {
return null;
}

return this.suffixCache.computeIfAbsent(txIdPrefix, txId -> {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
for (int suffix = 0; suffix < this.maxCache; suffix++) {
queue.add(String.valueOf(this.transactionIdSuffix.getAndIncrement()));
}
return queue;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.core;

/**
* The strategy for managing transactional producer suffixes.
*
* @author Ilya Starchenko
*
* @since 3.1.1
*/
public interface TransactionSuffixManager {

/**
* Retrieve the suffix for the transactional producer.
*
* @param txIdPrefix the transaction id prefix.
* @return the suffix.
*/
String retrieveSuffix(String txIdPrefix);

/**
* Return the suffix for the transactional producer back for reuse.
*
* @param txIdPrefix the transaction id prefix.
* @param suffix the suffix.
*/
void returnSuffix(String txIdPrefix, String suffix);

/**
* Clear all suffixes for the transactional producer.
*/
void reset();
}
Loading

0 comments on commit 4e8237d

Please sign in to comment.