Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2852: Provide a way to customize a transactionIdSuffix #2930

Merged
merged 2 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,43 @@ NOTE: If there is a `KafkaTransactionManager` (or synchronized) transaction in p
Instead, a new "nested" transaction is used.

[[transaction-id-prefix]]
== `transactionIdPrefix`
== `TransactionIdPrefix`

With `EOSMode.V2` (aka `BETA`), the only supported mode, it is no longer necessary to use the same `transactional.id`, even for consumer-initiated transactions; in fact, it must be unique on each instance the same as for producer-initiated transactions.
This property must have a different value on each application instance.

[[transaction-id-suffix-fixed]]
== `TransactionIdSuffix Fixed`

Since 3.2, a new `TransactionIdSuffixStrategy` interface was introduced to manage `transactional.id` suffix.
The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
When a transaction producer is requested and `transactional.id` all in use, throw a `NoProducerAvailableException`.
User can then use a `RetryTemplate` configured to retry that exception, with a suitably configured back off.

[source,java]
----
public static class Config {

@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
pf.setTransactionIdSuffixStrategy(ss);
return pf;
}

}
----
When setting `maxCache` to 5, `transactional.id` is `my.txid.`++`{0-4}`+.

IMPORTANT: When using `KafkaTransactionManager` with the `ConcurrentMessageListenerContainer` and enabling `maxCache`, it is necessary to set `maxCache` to a value greater than or equal to `concurrency`.
If a `MessageListenerContainer` is unable to acquire a `transactional.id` suffix, it will throw a `NoProducerAvailableException`.
When using nested transactions in the `ConcurrentMessageListenerContainer`, it is necessary to adjust the maxCache setting to handle the increased number of nested transactions.

[[tx-template-mixed]]
== `KafkaTemplate` Transactional and non-Transactional Publishing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@
This section covers the changes made from version 3.1 to version 3.2.
For changes in earlier version, see xref:appendix/change-history.adoc[Change History].

[[x32-tiss]]
=== TransactionIdSuffixStrategy

A new `TransactionIdSuffixStrategy` interface was introduced to manage `transactional.id` suffix.
The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -121,8 +121,6 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private final Map<String, Object> configs;

private final AtomicInteger transactionIdSuffix = new AtomicInteger();

private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache = new ConcurrentHashMap<>();

private final Map<Thread, CloseSafeProducer<K, V>> threadBoundProducers = new ConcurrentHashMap<>();
Expand All @@ -137,6 +135,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private final AtomicBoolean running = new AtomicBoolean();

private TransactionIdSuffixStrategy transactionIdSuffixStrategy = new DefaultTransactionIdSuffixStrategy(0);

private Supplier<Serializer<K>> keySerializerSupplier;

private Supplier<Serializer<V>> valueSerializerSupplier;
Expand Down Expand Up @@ -350,6 +350,16 @@ public void setValueSerializerSupplier(Supplier<Serializer<V>> valueSerializerSu
this.valueSerializerSupplier = valueSerializerSupplier(valueSerializerSupplier);
}

/**
* Set the transaction suffix strategy.
* @param transactionIdSuffixStrategy the strategy.
* @since 3.2
*/
public void setTransactionIdSuffixStrategy(TransactionIdSuffixStrategy transactionIdSuffixStrategy) {
Assert.notNull(transactionIdSuffixStrategy, "'transactionIdSuffixStrategy' cannot be null");
this.transactionIdSuffixStrategy = transactionIdSuffixStrategy;
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* If true (default), programmatically provided serializers (via constructor or
* setters) will be configured using the producer configuration. Set to false if the
Expand Down Expand Up @@ -404,7 +414,7 @@ public Duration getPhysicalCloseTimeout() {

/**
* Set a prefix for the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} config. By
* default a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used
* default, a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used
* as a prefix in the target producer configs.
* @param transactionIdPrefix the prefix.
* @since 1.3
Expand Down Expand Up @@ -536,9 +546,9 @@ public ProducerFactory<K, V> copyWithConfigurationOverride(Map<String, Object> o
producerProperties.putAll(overrideProperties);
producerProperties = ensureExistingTransactionIdPrefixInProperties(producerProperties);
DefaultKafkaProducerFactory<K, V> newFactory = new DefaultKafkaProducerFactory<>(producerProperties,
getKeySerializerSupplier(),
getValueSerializerSupplier(),
isConfigureSerializers());
getKeySerializerSupplier(),
getValueSerializerSupplier(),
isConfigureSerializers());
newFactory.setPhysicalCloseTimeout((int) getPhysicalCloseTimeout().getSeconds());
newFactory.setProducerPerThread(isProducerPerThread());
for (ProducerPostProcessor<K, V> templatePostProcessor : getPostProcessors()) {
Expand Down Expand Up @@ -690,7 +700,7 @@ public void destroy() {
}
if (producerToClose != null) {
try {
producerToClose.closeDelegate(this.physicalCloseTimeout, this.listeners);
producerToClose.closeDelegate(this.physicalCloseTimeout);
}
catch (Exception e) {
LOGGER.error(e, "Exception while closing producer");
Expand All @@ -700,7 +710,7 @@ public void destroy() {
CloseSafeProducer<K, V> next = queue.poll();
while (next != null) {
try {
next.closeDelegate(this.physicalCloseTimeout, this.listeners);
next.closeDelegate(this.physicalCloseTimeout);
}
catch (Exception e) {
LOGGER.error(e, "Exception while closing producer");
Expand All @@ -711,7 +721,7 @@ public void destroy() {
this.cache.clear();
this.threadBoundProducers.values().forEach(prod -> {
try {
prod.closeDelegate(this.physicalCloseTimeout, this.listeners);
prod.closeDelegate(this.physicalCloseTimeout);
}
catch (Exception e) {
LOGGER.error(e, "Exception while closing producer");
Expand Down Expand Up @@ -769,7 +779,7 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
this.globalLock.lock();
try {
if (this.producer != null && this.producer.closed) {
this.producer.closeDelegate(this.physicalCloseTimeout, this.listeners);
this.producer.closeDelegate(this.physicalCloseTimeout);
this.producer = null;
}
if (this.producer != null && expire(this.producer)) {
Expand Down Expand Up @@ -820,6 +830,9 @@ protected Producer<K, V> createKafkaProducer() {
* @since 2.2.13
*/
protected final boolean removeProducer(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
if (producerToRemove.closed) {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
this.listeners.forEach(listener -> listener.producerRemoved(producerToRemove.clientId, producerToRemove));
}
return producerToRemove.closed;
}

Expand All @@ -846,7 +859,8 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
}
}
if (cachedProducer == null) {
return doCreateTxProducer(txIdPrefix, "" + this.transactionIdSuffix.getAndIncrement(), this::cacheReturner);
String suffix = this.transactionIdSuffixStrategy.acquireSuffix(txIdPrefix);
return doCreateTxProducer(txIdPrefix, suffix, this::cacheReturner);
}
else {
return cachedProducer;
Expand All @@ -856,24 +870,28 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
private boolean expire(CloseSafeProducer<K, V> producer) {
boolean expired = this.maxAge > 0 && System.currentTimeMillis() - producer.created > this.maxAge;
if (expired) {
producer.closeDelegate(this.physicalCloseTimeout, this.listeners);
producer.closeDelegate(this.physicalCloseTimeout);
}
return expired;
}

boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
if (producerToRemove.closed) {
producerToRemove.closeDelegate(timeout, this.listeners);
this.removeTransactionProducer(producerToRemove, timeout, this.listeners);
return true;
}
else {
this.globalLock.lock();
try {
if (producerToRemove.epoch != this.epoch.get()) {
this.removeTransactionProducer(producerToRemove, timeout, this.listeners);
return true;
}
BlockingQueue<CloseSafeProducer<K, V>> txIdCache = getCache(producerToRemove.txIdPrefix);
if (producerToRemove.epoch != this.epoch.get()
|| (txIdCache != null && !txIdCache.contains(producerToRemove)
&& !txIdCache.offer(producerToRemove))) {
producerToRemove.closeDelegate(timeout, this.listeners);
this.removeTransactionProducer(producerToRemove, timeout, this.listeners);
return true;
}
}
Expand All @@ -884,6 +902,12 @@ boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout
}
}

private void removeTransactionProducer(CloseSafeProducer<K, V> producer, Duration timeout,
List<Listener<K, V>> listeners) {
this.transactionIdSuffixStrategy.releaseSuffix(producer.txIdPrefix, producer.txIdSuffix);
listeners.forEach(listener -> listener.producerRemoved(producer.clientId, producer));
}

private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
BiPredicate<CloseSafeProducer<K, V>, Duration> remover) {
Producer<K, V> newProducer = createRawProducer(getTxProducerConfigs(prefix + suffix));
Expand All @@ -899,10 +923,13 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
newEx.addSuppressed(ex2);
throw newEx; // NOSONAR - lost stack trace
}
finally {
this.transactionIdSuffixStrategy.releaseSuffix(prefix, suffix);
}
throw new KafkaException("initTransactions() failed", ex);
}
CloseSafeProducer<K, V> closeSafeProducer =
new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName,
new CloseSafeProducer<>(newProducer, remover, prefix, suffix, this.physicalCloseTimeout, this.beanName,
this.epoch.get());
this.listeners.forEach(listener -> listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer));
return closeSafeProducer;
Expand All @@ -923,7 +950,7 @@ protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
}

@Nullable
protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String txIdPrefix) {
protected BlockingQueue<CloseSafeProducer<K, V>> getCache(@Nullable String txIdPrefix) {
if (txIdPrefix == null) {
return null;
}
Expand All @@ -941,7 +968,7 @@ protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String txIdPrefix) {
public void closeThreadBoundProducer() {
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.remove(Thread.currentThread());
if (tlProducer != null) {
tlProducer.closeDelegate(this.physicalCloseTimeout, this.listeners);
tlProducer.closeDelegate(this.physicalCloseTimeout);
}
}

Expand Down Expand Up @@ -991,6 +1018,8 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {

final String txIdPrefix; // NOSONAR

final String txIdSuffix; // NOSONAR

final long created; // NOSONAR

private final Duration closeTimeout;
Expand All @@ -1010,14 +1039,21 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
this(delegate, removeConsumerProducer, null, closeTimeout, factoryName, epoch);
}

CloseSafeProducer(Producer<K, V> delegate, BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer,
@Nullable String txIdPrefix, Duration closeTimeout, String factoryName, int epoch) {

this(delegate, removeProducer, txIdPrefix, null, closeTimeout, factoryName, epoch);
}

CloseSafeProducer(Producer<K, V> delegate,
BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, @Nullable String txIdPrefix,
Duration closeTimeout, String factoryName, int epoch) {
@Nullable String txIdSuffix, Duration closeTimeout, String factoryName, int epoch) {

Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
this.delegate = delegate;
this.removeProducer = removeProducer;
this.txIdPrefix = txIdPrefix;
this.txIdSuffix = txIdSuffix;
this.closeTimeout = closeTimeout;
Map<MetricName, ? extends Metric> metrics = delegate.metrics();
Iterator<MetricName> metricIterator = metrics.keySet().iterator();
Expand Down Expand Up @@ -1057,7 +1093,6 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
}
callback.onCompletion(metadata, exception);
}

});
}

Expand Down Expand Up @@ -1159,22 +1194,30 @@ public void close(@Nullable Duration timeout) {
this.removeProducer.test(this, this.producerFailed instanceof TimeoutException
? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT
: timeout);
this.delegate.close(timeout == null ? this.closeTimeout : this.producerFailed instanceof TimeoutException
? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT
: timeout);
}
else {
this.closed = this.removeProducer.test(this, timeout);
if (this.closed) {
this.delegate.close(timeout == null ? this.closeTimeout : timeout);
}
}
}
}

void closeDelegate(Duration timeout, List<Listener<K, V>> listeners) {
void closeDelegate(Duration timeout) {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
try {
this.delegate.close(timeout == null ? this.closeTimeout : timeout);
if (!this.closed) {
this.delegate.close(timeout == null ? this.closeTimeout : timeout);
this.closed = true;
this.removeProducer.test(this, timeout == null ? this.closeTimeout : timeout);
}
}
catch (Exception ex) {
LOGGER.warn(ex, () -> "Failed to close " + this.delegate);
}
listeners.forEach(listener -> listener.producerRemoved(this.clientId, this));
this.closed = true;
}

@Override
Expand Down
Loading