Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2852: support fixed transaction id suffix #2913

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,40 @@ NOTE: If there is a `KafkaTransactionManager` (or synchronized) transaction in p
Instead, a new "nested" transaction is used.

[[transaction-id-prefix]]
== `transactionIdPrefix`
== `TransactionIdPrefix`

With `EOSMode.V2` (aka `BETA`), the only supported mode, it is no longer necessary to use the same `transactional.id`, even for consumer-initiated transactions; in fact, it must be unique on each instance the same as for producer-initiated transactions.
This property must have a different value on each application instance.

[[transaction-id-suffix-fixed]]
== `TransactionIdSuffix Fixed`

Since 3.1, when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range.
When a transaction producer is requested and `transactional.id` all in use, throw a `NoProducerAvailableException`.
User can use then use a RetryTemplate configured to retry that exception, with a suitably configured back off.
Copy link
Contributor

@sobychacko sobychacko Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the wording to - "We can then use a"


[source, java]
----
public static class Config {

@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.setTransactionIdPrefix("my.txid.");
pf.setMaxCache(5);
return pf;
}

}
----
When setting `maxCache` to 5, `transactional.id` is `my.txid.`++`{0-4}`+.

IMPORTANT: When use `KafkaTransactionManager` in the `ConcurrentMessageListenerContainer`, `maxCache` must be greater than `concurrency`, also be careful nested transaction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"also be careful about".
It would be beneficial if you could add a few sentences detailing why the maxCache needs to be greater than concurrency. Same thing about nested transactions.

[[tx-template-mixed]]
== `KafkaTemplate` Transactional and non-Transactional Publishing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ See xref:retrytopic/topic-naming.adoc[Topic Naming] for more information.

When manually assigning partitions, with a `null` consumer `group.id`, the `AckMode` is now automatically coerced to `MANUAL`.
See xref:tips.adoc#tip-assign-all-parts[Manually Assigning All Partitions] for more information.

[[x31-dkpf]]
=== DefaultKafkaProducerFactory

When setting `maxCache` with `transactionIdPrefix`, can restrict `transaction.id` in range.
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache = new ConcurrentHashMap<>();

private final Map<String, BlockingQueue<String>> suffixCache = new ConcurrentHashMap<>();

private final Map<Thread, CloseSafeProducer<K, V>> threadBoundProducers = new ConcurrentHashMap<>();

private final AtomicInteger epoch = new AtomicInteger();
Expand Down Expand Up @@ -153,6 +155,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private boolean producerPerThread;

private int maxCache;

private long maxAge;

private boolean configureSerializers = true;
Expand Down Expand Up @@ -404,7 +408,7 @@ public Duration getPhysicalCloseTimeout() {

/**
* Set a prefix for the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} config. By
* default a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used
* default, a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used
* as a prefix in the target producer configs.
* @param transactionIdPrefix the prefix.
* @since 1.3
Expand Down Expand Up @@ -488,6 +492,16 @@ public List<ProducerPostProcessor<K, V>> getPostProcessors() {
return Collections.unmodifiableList(this.postProcessors);
}

/**
* Set the maximum size for transaction producer cache.
* @param maxCache the maxCache to set
* @since 3.1
*/
public void setMaxCache(int maxCache) {
Assert.isTrue(maxCache >= 0, "max cache must greater than or equal 0");
this.maxCache = maxCache;
}

/**
* Set the maximum age for a producer; useful when using transactions and the broker
* might expire a {@code transactional.id} due to inactivity.
Expand Down Expand Up @@ -709,6 +723,7 @@ public void destroy() {
}
});
this.cache.clear();
this.suffixCache.clear();
this.threadBoundProducers.values().forEach(prod -> {
try {
prod.closeDelegate(this.physicalCloseTimeout, this.listeners);
Expand Down Expand Up @@ -846,7 +861,20 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
}
}
if (cachedProducer == null) {
return doCreateTxProducer(txIdPrefix, "" + this.transactionIdSuffix.getAndIncrement(), this::cacheReturner);
String suffix;
if (this.maxCache > 0) {
BlockingQueue<String> suffixQueue = getSuffixCache(txIdPrefix, this.maxCache);
Assert.notNull(suffixQueue,
() -> "No suffix cache found for " + txIdPrefix + ", max cache" + this.maxCache);
suffix = suffixQueue.poll();
if (suffix == null) {
throw new NoProducerAvailableException("No transaction producer available for " + txIdPrefix);
}
}
else {
suffix = String.valueOf(this.transactionIdSuffix.getAndIncrement());
Copy link
Contributor

@stillya stillya Nov 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work, and thanks for your effort! However, I think just incrementing may not be sufficient. The goal of GH-2852 is to allow users to set how transactionIds are issued. For example, with a topic having 9 partitions, 3 replicas of my service (each with 3 producers), and limits on 9 txIds, restarting my service becomes a bit of a pain. It takes time to distribute transaction IDs correctly(without getting Fenced all the time), so instead of that, I would prefer to use external storage as a source of truth.
I suggest introducing a new interface for generating transactionIdSuffix with a default increment implementation. WDYT?

Copy link
Contributor Author

@Wzy19930507 Wzy19930507 Nov 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest introducing a new interface for generating transactionIdSuffix with a default increment implementation. WDYT?

Via new interface, the first instance of service increments transactionIdSuffix from 0 and the second instance of service increments from 3?

If I understand correctly, I like it.

At the same time we need to monitor the generated trasaction.id, such as provide method return current transactionIdSuffix or pushlish event carry curr transactionIdSuffix at DefaultKafkaProducerFactory.destroy().


As an alternative, write a unique ID in the transactionIdPrefix, such as UUD is feasible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Via the new interface, does the first instance of the service increment transactionIdSuffix from 0, and the second instance increments it from 3?

It actually depends on the implementation. It could distribute transactionIdSuffix between replicas as described, or it could provide the first free suffix from the source of truth storage. However, the default implementation remains the same as it is now.

At the same time, we need to monitor the generated transaction.id. For example, provide a method to return the current transactionIdSuffix or publish an event carrying the current transactionIdSuffix at DefaultKafkaProducerFactory.destroy().

Certainly, we need this functionality. Therefore, the interface must include a contract to retrieve the transactionId and return it back.

As an alternative, writing a unique ID in the transactionIdPrefix, such as UUID, is feasible?

In my environment, it's not an option because I have a mask for transactionIds like topic-name-{0..n}(e.g. mytopic-1).

This comment was marked as abuse.

}
return doCreateTxProducer(txIdPrefix, suffix, this::cacheReturner);
}
else {
return cachedProducer;
Expand All @@ -856,26 +884,30 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
private boolean expire(CloseSafeProducer<K, V> producer) {
boolean expired = this.maxAge > 0 && System.currentTimeMillis() - producer.created > this.maxAge;
if (expired) {
producer.closeDelegate(this.physicalCloseTimeout, this.listeners);
closeTransactionProducer(producer, this.physicalCloseTimeout, this.listeners);
}
return expired;
}

boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
if (producerToRemove.closed) {
producerToRemove.closeDelegate(timeout, this.listeners);
closeTransactionProducer(producerToRemove, timeout, this.listeners);
return true;
}
else {
this.globalLock.lock();
try {
BlockingQueue<CloseSafeProducer<K, V>> txIdCache = getCache(producerToRemove.txIdPrefix);
if (producerToRemove.epoch != this.epoch.get()
|| (txIdCache != null && !txIdCache.contains(producerToRemove)
&& !txIdCache.offer(producerToRemove))) {
if (producerToRemove.epoch != this.epoch.get()) {
producerToRemove.closeDelegate(timeout, this.listeners);
return true;
}

BlockingQueue<CloseSafeProducer<K, V>> txIdCache = getCache(producerToRemove.txIdPrefix);
if (txIdCache != null && !txIdCache.contains(producerToRemove)
&& !txIdCache.offer(producerToRemove)) {
closeTransactionProducer(producerToRemove, timeout, this.listeners);
return true;
}
}
finally {
this.globalLock.unlock();
Expand All @@ -884,6 +916,38 @@ boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout
}
}

private void closeTransactionProducer(CloseSafeProducer<K, V> producer, Duration timeout,
List<Listener<K, V>> listeners) {
try {
producer.closeDelegate(timeout, listeners);
}
finally {
reuseTransactionIdSuffix(producer);
}
}

private void reuseTransactionIdSuffix(CloseSafeProducer<K, V> producerToRemove) {
reuseTransactionIdSuffix(producerToRemove.txIdPrefix, producerToRemove.txIdSuffix, producerToRemove.epoch);
}

private void reuseTransactionIdSuffix(@Nullable String txIdPrefix, @Nullable String suffix, int epoch) {
if (txIdPrefix == null || suffix == null) {
return;
}
this.globalLock.lock();
try {
if (this.maxCache > 0) {
BlockingQueue<String> queue = getSuffixCache(txIdPrefix, this.maxCache);
if (epoch == this.epoch.get() && queue != null && !queue.contains(suffix)) {
queue.add(suffix);
}
}
}
finally {
this.globalLock.unlock();
}
}

private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
BiPredicate<CloseSafeProducer<K, V>, Duration> remover) {
Producer<K, V> newProducer = createRawProducer(getTxProducerConfigs(prefix + suffix));
Expand All @@ -899,10 +963,13 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
newEx.addSuppressed(ex2);
throw newEx; // NOSONAR - lost stack trace
}
finally {
reuseTransactionIdSuffix(prefix, suffix, this.epoch.get());
}
throw new KafkaException("initTransactions() failed", ex);
}
CloseSafeProducer<K, V> closeSafeProducer =
new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName,
new CloseSafeProducer<>(newProducer, remover, prefix, suffix, this.physicalCloseTimeout, this.beanName,
this.epoch.get());
this.listeners.forEach(listener -> listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer));
return closeSafeProducer;
Expand All @@ -923,13 +990,38 @@ protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
}

@Nullable
protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String txIdPrefix) {
protected BlockingQueue<CloseSafeProducer<K, V>> getCache(@Nullable String txIdPrefix) {
if (txIdPrefix == null) {
return null;
}
return this.cache.computeIfAbsent(txIdPrefix, txId -> new LinkedBlockingQueue<>());
}

@Nullable
protected BlockingQueue<String> getSuffixCache() {
return getSuffixCache(this.transactionIdPrefix);
}

@Nullable
protected BlockingQueue<String> getSuffixCache(@Nullable String txIdPrefix) {
return getSuffixCache(txIdPrefix, this.maxCache);
}

@Nullable
protected BlockingQueue<String> getSuffixCache(@Nullable String txIdPrefix, int maxCache) {
if (txIdPrefix == null || maxCache <= 0) {
return null;
}

return this.suffixCache.computeIfAbsent(txIdPrefix, txId -> {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
for (int suffix = 0; suffix < maxCache; suffix++) {
queue.add(String.valueOf(this.transactionIdSuffix.getAndIncrement()));
}
return queue;
});
}

/**
* When using {@link #setProducerPerThread(boolean)} (true), call this method to close
* and release this thread's producer. Thread bound producers are <b>not</b> closed by
Expand Down Expand Up @@ -991,6 +1083,8 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {

final String txIdPrefix; // NOSONAR

final String txIdSuffix; // NOSONAR

final long created; // NOSONAR

private final Duration closeTimeout;
Expand All @@ -1010,14 +1104,21 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
this(delegate, removeConsumerProducer, null, closeTimeout, factoryName, epoch);
}

CloseSafeProducer(Producer<K, V> delegate, BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer,
@Nullable String txIdPrefix, Duration closeTimeout, String factoryName, int epoch) {

this(delegate, removeProducer, txIdPrefix, null, closeTimeout, factoryName, epoch);
}

CloseSafeProducer(Producer<K, V> delegate,
BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, @Nullable String txIdPrefix,
Duration closeTimeout, String factoryName, int epoch) {
@Nullable String txIdSuffix, Duration closeTimeout, String factoryName, int epoch) {

Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
this.delegate = delegate;
this.removeProducer = removeProducer;
this.txIdPrefix = txIdPrefix;
this.txIdSuffix = txIdSuffix;
this.closeTimeout = closeTimeout;
Map<MetricName, ? extends Metric> metrics = delegate.metrics();
Iterator<MetricName> metricIterator = metrics.keySet().iterator();
Expand Down Expand Up @@ -1047,17 +1148,12 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
LOGGER.trace(() -> toString() + " send(" + record + ")");
return this.delegate.send(record, new Callback() {

@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception instanceof OutOfOrderSequenceException) {
CloseSafeProducer.this.producerFailed = exception;
close(CloseSafeProducer.this.closeTimeout);
}
callback.onCompletion(metadata, exception);
return this.delegate.send(record, (metadata, exception) -> {
if (exception instanceof OutOfOrderSequenceException) {
CloseSafeProducer.this.producerFailed = exception;
close(CloseSafeProducer.this.closeTimeout);
}

callback.onCompletion(metadata, exception);
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.core;

import org.springframework.lang.Nullable;

/**
* An exception thrown by no transaction producer available exception, when set
* {@link DefaultKafkaProducerFactory} maxCache greater than 0.
*
* @author Wang Zhiyang
* @since 3.1.1
*
*/
public class NoProducerAvailableException extends RuntimeException {

private static final long serialVersionUID = 1L;

/**
* Constructs a new no producer available exception with the specified detail message and cause.
* @param message the message.
*/
public NoProducerAvailableException(String message) {
this(message, null);
}

/**
* Constructs a new no producer available exception with the specified detail message and cause.
* @param message the message.
* @param cause the cause.
*/
public NoProducerAvailableException(String message, @Nullable Throwable cause) {
super(message, cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,6 @@ else if (listener instanceof MessageListener) {
if (this.containerProperties.isLogContainerConfig()) {
this.logger.info(toString());
}
Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
ApplicationContext applicationContext = getApplicationContext();
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
Expand Down
Loading
Loading