From d30de4efa8d149daf1ac0c3aec89a012175610ee Mon Sep 17 00:00:00 2001 From: "Zhiyang.Wang1" Date: Thu, 23 Nov 2023 23:50:13 +0800 Subject: [PATCH 1/3] GH-2852: support fixed transaction id suffix add properties `maxCache` at `DefaultKafkaProducerFactory`, setting `maxCache` greater than zero can reuse `transactional.id`. Resolves #2852 --- .../ROOT/pages/kafka/transactions.adoc | 31 +++- .../antora/modules/ROOT/pages/whats-new.adoc | 6 + .../core/DefaultKafkaProducerFactory.java | 138 +++++++++++++--- .../core/NoProducerAvailableException.java | 50 ++++++ .../KafkaMessageListenerContainer.java | 1 - .../DefaultKafkaConsumerFactoryTests.java | 53 ++++++ .../DefaultKafkaProducerFactoryTests.java | 151 +++++++++++++++++- .../core/KafkaTemplateTransactionTests.java | 70 +++++++- .../listener/TransactionalContainerTests.java | 4 +- 9 files changed, 472 insertions(+), 32 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/NoProducerAvailableException.java diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc index e5100aec55..1b5e07d093 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc @@ -102,11 +102,40 @@ NOTE: If there is a `KafkaTransactionManager` (or synchronized) transaction in p Instead, a new "nested" transaction is used. [[transaction-id-prefix]] -== `transactionIdPrefix` +== `TransactionIdPrefix` With `EOSMode.V2` (aka `BETA`), the only supported mode, it is no longer necessary to use the same `transactional.id`, even for consumer-initiated transactions; in fact, it must be unique on each instance the same as for producer-initiated transactions. This property must have a different value on each application instance. +[[transaction-id-suffix-fixed]] +== `TransactionIdSuffix Fixed` + +Since 3.1, when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range. +When a transaction producer is requested and `transactional.id` all in use, throw a `NoProducerAvailableException`. +User can use then use a RetryTemplate configured to retry that exception, with a suitably configured back off. + +[source, java] +---- +public static class Config { + + @Bean + public ProducerFactory myProducerFactory() { + Map configs = producerConfigs(); + configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId"); + ... + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(configs); + ... + pf.setTransactionIdPrefix("my.txid."); + pf.setMaxCache(5); + return pf; + } + +} +---- +When setting `maxCache` to 5, `transactional.id` is `my.txid.`++`{0-4}`+. + +IMPORTANT: When use `KafkaTransactionManager` in the `ConcurrentMessageListenerContainer`, `maxCache` must be greater than `concurrency`, also be careful nested transaction. + [[tx-template-mixed]] == `KafkaTemplate` Transactional and non-Transactional Publishing diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index a13581b5a0..ec09a53b36 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -50,3 +50,9 @@ See xref:retrytopic/topic-naming.adoc[Topic Naming] for more information. When manually assigning partitions, with a `null` consumer `group.id`, the `AckMode` is now automatically coerced to `MANUAL`. See xref:tips.adoc#tip-assign-all-parts[Manually Assigning All Partitions] for more information. + +[[x31-dkpf]] +=== DefaultKafkaProducerFactory + +When setting `maxCache` with `transactionIdPrefix`, can restrict `transaction.id` in range. +See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index 66318c5de3..d83afd9fa9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -125,6 +125,8 @@ public class DefaultKafkaProducerFactory extends KafkaResourceFactory private final Map>> cache = new ConcurrentHashMap<>(); + private final Map> suffixCache = new ConcurrentHashMap<>(); + private final Map> threadBoundProducers = new ConcurrentHashMap<>(); private final AtomicInteger epoch = new AtomicInteger(); @@ -153,6 +155,8 @@ public class DefaultKafkaProducerFactory extends KafkaResourceFactory private boolean producerPerThread; + private int maxCache; + private long maxAge; private boolean configureSerializers = true; @@ -404,7 +408,7 @@ public Duration getPhysicalCloseTimeout() { /** * Set a prefix for the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} config. By - * default a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used + * default, a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used * as a prefix in the target producer configs. * @param transactionIdPrefix the prefix. * @since 1.3 @@ -488,6 +492,16 @@ public List> getPostProcessors() { return Collections.unmodifiableList(this.postProcessors); } + /** + * Set the maximum size for transaction producer cache. + * @param maxCache the maxCache to set + * @since 3.1 + */ + public void setMaxCache(int maxCache) { + Assert.isTrue(maxCache >= 0, "max cache must greater than or equal 0"); + this.maxCache = maxCache; + } + /** * Set the maximum age for a producer; useful when using transactions and the broker * might expire a {@code transactional.id} due to inactivity. @@ -709,6 +723,7 @@ public void destroy() { } }); this.cache.clear(); + this.suffixCache.clear(); this.threadBoundProducers.values().forEach(prod -> { try { prod.closeDelegate(this.physicalCloseTimeout, this.listeners); @@ -846,7 +861,20 @@ protected Producer createTransactionalProducer(String txIdPrefix) { } } if (cachedProducer == null) { - return doCreateTxProducer(txIdPrefix, "" + this.transactionIdSuffix.getAndIncrement(), this::cacheReturner); + String suffix; + if (this.maxCache > 0) { + BlockingQueue suffixQueue = getSuffixCache(txIdPrefix, this.maxCache); + Assert.notNull(suffixQueue, + () -> "No suffix cache found for " + txIdPrefix + ", max cache" + this.maxCache); + suffix = suffixQueue.poll(); + if (suffix == null) { + throw new NoProducerAvailableException("No transaction producer available for " + txIdPrefix); + } + } + else { + suffix = String.valueOf(this.transactionIdSuffix.getAndIncrement()); + } + return doCreateTxProducer(txIdPrefix, suffix, this::cacheReturner); } else { return cachedProducer; @@ -856,26 +884,30 @@ protected Producer createTransactionalProducer(String txIdPrefix) { private boolean expire(CloseSafeProducer producer) { boolean expired = this.maxAge > 0 && System.currentTimeMillis() - producer.created > this.maxAge; if (expired) { - producer.closeDelegate(this.physicalCloseTimeout, this.listeners); + closeTransactionProducer(producer, this.physicalCloseTimeout, this.listeners); } return expired; } boolean cacheReturner(CloseSafeProducer producerToRemove, Duration timeout) { if (producerToRemove.closed) { - producerToRemove.closeDelegate(timeout, this.listeners); + closeTransactionProducer(producerToRemove, timeout, this.listeners); return true; } else { this.globalLock.lock(); try { - BlockingQueue> txIdCache = getCache(producerToRemove.txIdPrefix); - if (producerToRemove.epoch != this.epoch.get() - || (txIdCache != null && !txIdCache.contains(producerToRemove) - && !txIdCache.offer(producerToRemove))) { + if (producerToRemove.epoch != this.epoch.get()) { producerToRemove.closeDelegate(timeout, this.listeners); return true; } + + BlockingQueue> txIdCache = getCache(producerToRemove.txIdPrefix); + if (txIdCache != null && !txIdCache.contains(producerToRemove) + && !txIdCache.offer(producerToRemove)) { + closeTransactionProducer(producerToRemove, timeout, this.listeners); + return true; + } } finally { this.globalLock.unlock(); @@ -884,6 +916,38 @@ boolean cacheReturner(CloseSafeProducer producerToRemove, Duration timeout } } + private void closeTransactionProducer(CloseSafeProducer producer, Duration timeout, + List> listeners) { + try { + producer.closeDelegate(timeout, listeners); + } + finally { + reuseTransactionIdSuffix(producer); + } + } + + private void reuseTransactionIdSuffix(CloseSafeProducer producerToRemove) { + reuseTransactionIdSuffix(producerToRemove.txIdPrefix, producerToRemove.txIdSuffix, producerToRemove.epoch); + } + + private void reuseTransactionIdSuffix(@Nullable String txIdPrefix, @Nullable String suffix, int epoch) { + if (txIdPrefix == null || suffix == null) { + return; + } + this.globalLock.lock(); + try { + if (this.maxCache > 0) { + BlockingQueue queue = getSuffixCache(txIdPrefix, this.maxCache); + if (epoch == this.epoch.get() && queue != null && !queue.contains(suffix)) { + queue.add(suffix); + } + } + } + finally { + this.globalLock.unlock(); + } + } + private CloseSafeProducer doCreateTxProducer(String prefix, String suffix, BiPredicate, Duration> remover) { Producer newProducer = createRawProducer(getTxProducerConfigs(prefix + suffix)); @@ -899,10 +963,13 @@ private CloseSafeProducer doCreateTxProducer(String prefix, String suffix, newEx.addSuppressed(ex2); throw newEx; // NOSONAR - lost stack trace } + finally { + reuseTransactionIdSuffix(prefix, suffix, this.epoch.get()); + } throw new KafkaException("initTransactions() failed", ex); } CloseSafeProducer closeSafeProducer = - new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName, + new CloseSafeProducer<>(newProducer, remover, prefix, suffix, this.physicalCloseTimeout, this.beanName, this.epoch.get()); this.listeners.forEach(listener -> listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer)); return closeSafeProducer; @@ -923,13 +990,38 @@ protected BlockingQueue> getCache() { } @Nullable - protected BlockingQueue> getCache(String txIdPrefix) { + protected BlockingQueue> getCache(@Nullable String txIdPrefix) { if (txIdPrefix == null) { return null; } return this.cache.computeIfAbsent(txIdPrefix, txId -> new LinkedBlockingQueue<>()); } + @Nullable + protected BlockingQueue getSuffixCache() { + return getSuffixCache(this.transactionIdPrefix); + } + + @Nullable + protected BlockingQueue getSuffixCache(@Nullable String txIdPrefix) { + return getSuffixCache(txIdPrefix, this.maxCache); + } + + @Nullable + protected BlockingQueue getSuffixCache(@Nullable String txIdPrefix, int maxCache) { + if (txIdPrefix == null || maxCache <= 0) { + return null; + } + + return this.suffixCache.computeIfAbsent(txIdPrefix, txId -> { + BlockingQueue queue = new LinkedBlockingQueue<>(); + for (int suffix = 0; suffix < maxCache; suffix++) { + queue.add(String.valueOf(this.transactionIdSuffix.getAndIncrement())); + } + return queue; + }); + } + /** * When using {@link #setProducerPerThread(boolean)} (true), call this method to close * and release this thread's producer. Thread bound producers are not closed by @@ -991,6 +1083,8 @@ protected static class CloseSafeProducer implements Producer { final String txIdPrefix; // NOSONAR + final String txIdSuffix; // NOSONAR + final long created; // NOSONAR private final Duration closeTimeout; @@ -1010,14 +1104,21 @@ protected static class CloseSafeProducer implements Producer { this(delegate, removeConsumerProducer, null, closeTimeout, factoryName, epoch); } + CloseSafeProducer(Producer delegate, BiPredicate, Duration> removeProducer, + @Nullable String txIdPrefix, Duration closeTimeout, String factoryName, int epoch) { + + this(delegate, removeProducer, txIdPrefix, null, closeTimeout, factoryName, epoch); + } + CloseSafeProducer(Producer delegate, BiPredicate, Duration> removeProducer, @Nullable String txIdPrefix, - Duration closeTimeout, String factoryName, int epoch) { + @Nullable String txIdSuffix, Duration closeTimeout, String factoryName, int epoch) { Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer"); this.delegate = delegate; this.removeProducer = removeProducer; this.txIdPrefix = txIdPrefix; + this.txIdSuffix = txIdSuffix; this.closeTimeout = closeTimeout; Map metrics = delegate.metrics(); Iterator metricIterator = metrics.keySet().iterator(); @@ -1047,17 +1148,12 @@ public Future send(ProducerRecord record) { @Override public Future send(ProducerRecord record, Callback callback) { LOGGER.trace(() -> toString() + " send(" + record + ")"); - return this.delegate.send(record, new Callback() { - - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception instanceof OutOfOrderSequenceException) { - CloseSafeProducer.this.producerFailed = exception; - close(CloseSafeProducer.this.closeTimeout); - } - callback.onCompletion(metadata, exception); + return this.delegate.send(record, (metadata, exception) -> { + if (exception instanceof OutOfOrderSequenceException) { + CloseSafeProducer.this.producerFailed = exception; + close(CloseSafeProducer.this.closeTimeout); } - + callback.onCompletion(metadata, exception); }); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/NoProducerAvailableException.java b/spring-kafka/src/main/java/org/springframework/kafka/core/NoProducerAvailableException.java new file mode 100644 index 0000000000..682a3a3773 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/NoProducerAvailableException.java @@ -0,0 +1,50 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import org.springframework.lang.Nullable; + +/** + * An exception thrown by no transaction producer available exception, when set + * {@link DefaultKafkaProducerFactory} maxCache greater than 0. + * + * @author Wang Zhiyang + * @since 3.1.1 + * + */ +public class NoProducerAvailableException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + /** + * Constructs a new no producer available exception with the specified detail message and cause. + * @param message the message. + */ + public NoProducerAvailableException(String message) { + this(message, null); + } + + /** + * Constructs a new no producer available exception with the specified detail message and cause. + * @param message the message. + * @param cause the cause. + */ + public NoProducerAvailableException(String message, @Nullable Throwable cause) { + super(message, cause); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 4419d5d90c..bdb489c98b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -926,7 +926,6 @@ else if (listener instanceof MessageListener) { if (this.containerProperties.isLogContainerConfig()) { this.logger.info(toString()); } - Map props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties(); ApplicationContext applicationContext = getApplicationContext(); this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull() || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index ae12c8e6aa..1f9ebc0f98 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -376,6 +376,59 @@ public void testNestedTxProducerIsCached() throws Exception { } } + @SuppressWarnings("unchecked") + @Test + public void testNestedTxProducerIsFixed() throws Exception { + Map producerProps = KafkaTestUtils.producerProps(this.embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(producerProps); + KafkaTemplate template = new KafkaTemplate<>(pf); + DefaultKafkaProducerFactory pfTx = new DefaultKafkaProducerFactory<>(producerProps); + pfTx.setTransactionIdPrefix("fooTx.fixed."); + pfTx.setMaxCache(3); + KafkaOperations templateTx = new KafkaTemplate<>(pfTx); + Map consumerProps = KafkaTestUtils.consumerProps("txCache1FixedGroup", "false", this.embeddedKafka); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); + AtomicReference> wrapped = new AtomicReference<>(); + cf.addPostProcessor(consumer -> { + ProxyFactory prox = new ProxyFactory(); + prox.setTarget(consumer); + @SuppressWarnings("unchecked") + Consumer proxy = (Consumer) prox.getProxy(); + wrapped.set(proxy); + return proxy; + }); + ContainerProperties containerProps = new ContainerProperties("txCache1Fixed"); + CountDownLatch latch = new CountDownLatch(1); + containerProps.setMessageListener((MessageListener) r -> { + templateTx.executeInTransaction(t -> t.send("txCacheSendFromListener", "bar")); + templateTx.executeInTransaction(t -> t.send("txCacheSendFromListener", "baz")); + latch.countDown(); + }); + KafkaTransactionManager tm = new KafkaTransactionManager<>(pfTx); + containerProps.setTransactionManager(tm); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, + containerProps); + container.start(); + try { + CompletableFuture> future = template.send("txCache1Fixed", "foo"); + future.get(10, TimeUnit.SECONDS); + pf.getCache(); + assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", Map.class)).hasSize(0); + assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1); + assertThat(pfTx.getCache()).hasSize(1); + assertThat(KafkaTestUtils.getPropertyValue(pfTx, "suffixCache", Map.class)).hasSize(1); + // 1 tm tx producer and 1 templateTx tx producer + assertThat(pfTx.getSuffixCache()).hasSize(1); + assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer")).isSameAs(wrapped.get()); + } + finally { + container.stop(); + pf.destroy(); + pfTx.destroy(); + } + } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void listener() { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java index 7a53519a41..46e5694755 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java @@ -70,7 +70,7 @@ public class DefaultKafkaProducerFactoryTests { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test - void testProducerClosedAfterBadTransition() throws Exception { + void testProducerClosedAfterBadTransition() { final Producer producer = mock(Producer.class); given(producer.send(any(), any())).willReturn(new CompletableFuture()); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { @@ -123,6 +123,118 @@ protected Producer createRawProducer(Map configs) { pf.destroy(); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void testMaxCacheProducerClosedAfterBadTransition() { + final Producer producer = mock(Producer.class); + given(producer.send(any(), any())).willReturn(new CompletableFuture()); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { + + @Override + protected Producer createRawProducer(Map configs) { + return producer; + } + + }; + pf.setTransactionIdPrefix("foo"); + pf.setMaxCache(2); + + final AtomicInteger flag = new AtomicInteger(); + willAnswer(i -> { + if (flag.incrementAndGet() == 2) { + throw new KafkaException("Invalid begin transition ..."); + } + return null; + }).given(producer).beginTransaction(); + + final AtomicInteger commitFlag = new AtomicInteger(); + willAnswer(i -> { + if (commitFlag.incrementAndGet() == 2) { + throw new KafkaException("Invalid commit transition ..."); + } + return null; + }).given(producer).commitTransaction(); + + final AtomicInteger abortFlag = new AtomicInteger(); + willAnswer(i -> { + if (abortFlag.incrementAndGet() == 1) { + throw new KafkaException("Invalid abort transition ..."); + } + return null; + }).given(producer).abortTransaction(); + + final AtomicInteger initFlag = new AtomicInteger(); + willAnswer(i -> { + if (initFlag.incrementAndGet() == 4) { + throw new KafkaException("Invalid init transition ..."); + } + return null; + }).given(producer).initTransactions(); + + final KafkaTemplate kafkaTemplate = new KafkaTemplate(pf); + KafkaTransactionManager tm = new KafkaTransactionManager(pf); + TransactionTemplate transactionTemplate = new TransactionTemplate(tm); + transactionTemplate.execute(s -> { + kafkaTemplate.send("foo", "bar"); + return null; + }); + Map cache = KafkaTestUtils.getPropertyValue(pf, "cache", Map.class); + assertThat(cache).hasSize(1); + Queue queue = (Queue) cache.get("foo"); + assertThat(queue).hasSize(1); + Map suffixCache = KafkaTestUtils.getPropertyValue(pf, "suffixCache", Map.class); + assertThat(suffixCache).hasSize(1); + Queue suffixQueue = (Queue) suffixCache.get("foo"); + assertThat(suffixQueue).hasSize(1); + + assertThatExceptionOfType(CannotCreateTransactionException.class) + .isThrownBy(() -> transactionTemplate.execute(s -> null)); + assertThat(queue).hasSize(0); + assertThat(suffixQueue).hasSize(2); + + assertThatExceptionOfType(KafkaException.class) + .isThrownBy(() -> transactionTemplate.execute(s -> null)) + .withStackTraceContaining("Invalid commit transition"); + assertThat(queue).hasSize(0); + assertThat(suffixQueue).hasSize(2); + + assertThatExceptionOfType(KafkaException.class) + .isThrownBy(() -> { + transactionTemplate.execute(s -> { + throw new RuntimeException("rollback exception"); + }); + }) + .withStackTraceContaining("Invalid abort transition"); + assertThat(queue).hasSize(0); + assertThat(suffixQueue).hasSize(2); + + assertThatExceptionOfType(CannotCreateTransactionException.class) + .isThrownBy(() -> transactionTemplate.execute(s -> null)) + .withStackTraceContaining("Could not create Kafka transaction"); + assertThat(queue).hasSize(0); + assertThat(suffixQueue).hasSize(2); + + InOrder inOrder = inOrder(producer); + inOrder.verify(producer).initTransactions(); + inOrder.verify(producer).beginTransaction(); + inOrder.verify(producer).send(any(), any()); + inOrder.verify(producer).commitTransaction(); + inOrder.verify(producer).beginTransaction(); + inOrder.verify(producer).close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT); + inOrder.verify(producer).initTransactions(); + inOrder.verify(producer).beginTransaction(); + inOrder.verify(producer).commitTransaction(); + inOrder.verify(producer).close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT); + inOrder.verify(producer).initTransactions(); + inOrder.verify(producer).beginTransaction(); + inOrder.verify(producer).abortTransaction(); + inOrder.verify(producer).close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT); + inOrder.verify(producer).initTransactions(); + inOrder.verify(producer).close(DefaultKafkaProducerFactory.DEFAULT_PHYSICAL_CLOSE_TIMEOUT); + inOrder.verifyNoMoreInteractions(); + pf.destroy(); + } + @Test @SuppressWarnings({ "rawtypes", "unchecked" }) void testResetSingle() throws InterruptedException { @@ -196,6 +308,7 @@ protected Producer createRawProducer(Map configs) { }; pf.setApplicationContext(ctx); pf.setTransactionIdPrefix("foo"); + pf.setMaxCache(3); Producer aProducer = pf.createProducer(); assertThat(aProducer).isNotNull(); aProducer.close(); @@ -207,12 +320,17 @@ protected Producer createRawProducer(Map configs) { assertThat(cache.size()).isEqualTo(1); Queue queue = (Queue) cache.get("foo"); assertThat(queue.size()).isEqualTo(1); + Map suffixCache = KafkaTestUtils.getPropertyValue(pf, "suffixCache", Map.class); + assertThat(suffixCache.size()).isEqualTo(1); + Queue suffixQueue = (Queue) suffixCache.get("foo"); + assertThat(suffixQueue.size()).isEqualTo(2); pf.setMaxAge(Duration.ofMillis(10)); Thread.sleep(50); aProducer = pf.createProducer(); assertThat(aProducer).isNotSameAs(bProducer); pf.onApplicationEvent(new ContextStoppedEvent(ctx)); assertThat(queue.size()).isEqualTo(0); + assertThat(suffixCache.size()).isEqualTo(0); verify(producer).close(any(Duration.class)); } @@ -267,6 +385,7 @@ protected Producer createRawProducer(Map configs) { }; pf.setApplicationContext(ctx); pf.setTransactionIdPrefix("foo"); + pf.setMaxCache(3); Producer aProducer = pf.createProducer(); assertThat(aProducer).isNotNull(); aProducer.close(); @@ -278,12 +397,17 @@ protected Producer createRawProducer(Map configs) { assertThat(cache.size()).isEqualTo(1); Queue queue = (Queue) cache.get("foo"); assertThat(queue.size()).isEqualTo(1); + Map suffixCache = KafkaTestUtils.getPropertyValue(pf, "suffixCache", Map.class); + assertThat(suffixCache.size()).isEqualTo(1); + Queue suffixQueue = (Queue) suffixCache.get("foo"); + assertThat(suffixQueue.size()).isEqualTo(2); bProducer = pf.createProducer(); assertThat(bProducer).isSameAs(aProducer); assertThat(queue.size()).isEqualTo(0); pf.reset(); bProducer.close(); assertThat(queue.size()).isEqualTo(0); + assertThat(suffixCache.size()).isEqualTo(0); pf.destroy(); } @@ -325,7 +449,7 @@ protected Producer createKafkaProducer() { @Test @SuppressWarnings({ "rawtypes", "unchecked" }) - void threadLocalLifecycle() throws InterruptedException { + void threadLocalLifecycle() { final Producer producer = mock(Producer.class); AtomicBoolean created = new AtomicBoolean(); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { @@ -409,6 +533,29 @@ protected Producer createKafkaProducer() { assertThat(bProducer).isNotSameAs(aProducer); } + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + void testNoProducerException() { + final Producer producer = mock(Producer.class); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { + + @Override + protected Producer createRawProducer(Map configs) { + return producer; + } + + }; + pf.setTransactionIdPrefix("foo"); + pf.setMaxCache(2); + Map suffixCache = KafkaTestUtils.getPropertyValue(pf, "suffixCache", Map.class); + pf.createProducer(); + Queue queue = (Queue) suffixCache.get("foo"); + assertThat(queue).hasSize(1); + pf.createProducer(); + assertThat(queue).hasSize(0); + assertThatExceptionOfType(NoProducerAvailableException.class).isThrownBy(() -> pf.createProducer()); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void listener() { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index b28fae476d..e9cc1d0e24 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -93,17 +93,18 @@ * */ @EmbeddedKafka(topics = { KafkaTemplateTransactionTests.STRING_KEY_TOPIC, - KafkaTemplateTransactionTests.LOCAL_TX_IN_TOPIC }, brokerProperties = { - "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" }) + KafkaTemplateTransactionTests.LOCAL_TX_IN_TOPIC, KafkaTemplateTransactionTests.LOCAL_FIXED_TX_IN_TOPIC }, + brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" }) public class KafkaTemplateTransactionTests { public static final String STRING_KEY_TOPIC = "stringKeyTopic"; public static final String LOCAL_TX_IN_TOPIC = "localTxInTopic"; + public static final String LOCAL_FIXED_TX_IN_TOPIC = "localFixedTxInTopic"; + private final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker(); - @SuppressWarnings("deprecation") @Test public void testLocalTransaction() { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); @@ -118,7 +119,7 @@ public void testLocalTransaction() { DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); cf.setKeyDeserializer(new StringDeserializer()); Consumer consumer = cf.createConsumer(); - embeddedKafka.consumeFromAllEmbeddedTopics(consumer); + embeddedKafka.consumeFromEmbeddedTopics(consumer, STRING_KEY_TOPIC, LOCAL_TX_IN_TOPIC); template.executeInTransaction(kt -> kt.send(LOCAL_TX_IN_TOPIC, "one")); ConsumerRecord singleRecord = KafkaTestUtils.getSingleRecord(consumer, LOCAL_TX_IN_TOPIC); template.executeInTransaction(t -> { @@ -159,6 +160,67 @@ record = iterator.next(); assertThat(pf.getCache()).hasSize(0); } + @SuppressWarnings("unchecked") + @Test + public void testLocalTransactionIsFixed() { + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + senderProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my.transaction.fixed."); + senderProps.put(ProducerConfig.CLIENT_ID_CONFIG, "customClientIdFixed"); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + pf.setKeySerializer(new StringSerializer()); + pf.setMaxCache(3); + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(STRING_KEY_TOPIC); + Map consumerProps = KafkaTestUtils.consumerProps("testLocalTxFixed", "false", embeddedKafka); + consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); + cf.setKeyDeserializer(new StringDeserializer()); + Consumer consumer = cf.createConsumer(); + embeddedKafka.consumeFromEmbeddedTopics(consumer, STRING_KEY_TOPIC, LOCAL_FIXED_TX_IN_TOPIC); + template.executeInTransaction(kt -> kt.send(LOCAL_FIXED_TX_IN_TOPIC, "one")); // suffix range {0-2} + ConsumerRecord singleRecord = KafkaTestUtils.getSingleRecord(consumer, LOCAL_FIXED_TX_IN_TOPIC); + template.executeInTransaction(t -> { + pf.createProducer("testCustomClientIdIsUniqueFixed").close(); // suffix range {3-5} + t.sendDefault("foo", "bar"); + t.sendDefault("baz", "qux"); + t.sendOffsetsToTransaction(Collections.singletonMap( + new TopicPartition(LOCAL_FIXED_TX_IN_TOPIC, singleRecord.partition()), + new OffsetAndMetadata(singleRecord.offset() + 1L)), consumer.groupMetadata()); + assertThat(KafkaTestUtils.getPropertyValue( + KafkaTestUtils.getPropertyValue(template, "producers", Map.class).get(Thread.currentThread()), + "delegate.transactionManager.transactionalId")).isEqualTo("my.transaction.fixed.0"); + return null; + }); + ConsumerRecords records = KafkaTestUtils.getRecords(consumer); + Iterator> iterator = records.iterator(); + ConsumerRecord record = iterator.next(); + assertThat(record).has(Assertions.>allOf(key("foo"), value("bar"))); + if (!iterator.hasNext()) { + records = KafkaTestUtils.getRecords(consumer); + iterator = records.iterator(); + } + record = iterator.next(); + assertThat(record).has(Assertions.>allOf(key("baz"), value("qux"))); + // 2 log slots, 1 for the record, 1 for the commit + assertThat(consumer.position(new TopicPartition(LOCAL_FIXED_TX_IN_TOPIC, singleRecord.partition()))).isEqualTo(2L); + consumer.close(); + assertThat(pf.getCache()).hasSize(1); + template.setTransactionIdPrefix("tx.template.override.fixed."); // suffix range {6-8} + template.executeInTransaction(t -> { + assertThat(KafkaTestUtils.getPropertyValue( + KafkaTestUtils.getPropertyValue(template, "producers", Map.class).get(Thread.currentThread()), + "delegate.transactionManager.transactionalId")).isEqualTo("tx.template.override.fixed.6"); + return null; + }); + assertThat(pf.getCache("tx.template.override.fixed.")).hasSize(1); + assertThat(pf.getSuffixCache("tx.template.override.fixed.")).hasSize(2); + assertThat(pf.getCache("testCustomClientIdIsUniqueFixed")).hasSize(1); + assertThat(pf.getSuffixCache("testCustomClientIdIsUniqueFixed")).hasSize(2); + pf.destroy(); + assertThat(pf.getCache()).hasSize(0); + assertThat(KafkaTestUtils.getPropertyValue(pf, "suffixCache", Map.class)).hasSize(0); + } + @Test public void testGlobalTransaction() { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index d7bfed9d5b..1368d195a4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -496,12 +496,11 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception { verify(pf).createProducer(isNull()); } - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings({ "unchecked"}) @Test public void testRollbackRecord() throws Exception { logger.info("Start testRollbackRecord"); Map props = KafkaTestUtils.consumerProps("txTest1", "false", embeddedKafka); -// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); test fallback to EOSMode.ALPHA props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); @@ -512,7 +511,6 @@ public void testRollbackRecord() throws Exception { containerProps.setFixTxOffsets(true); Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); -// senderProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); pf.setTransactionIdPrefix("rr."); From 90b6ea9f345dcc04060ba514de60ed67faa7229d Mon Sep 17 00:00:00 2001 From: "Zhiyang.Wang1" Date: Mon, 27 Nov 2023 17:47:59 +0800 Subject: [PATCH 2/3] GH-2852: support fixed transaction id suffix Introducing a new interface for generating transactionIdSuffix with a default increment implementation. Introducing a new interface for return the current transactionIdSuffix. Resolves #2852 --- .../core/DefaultKafkaProducerFactory.java | 13 ++++++-- .../DefaultKafkaProducerFactoryTests.java | 33 +++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index d83afd9fa9..072b6abeec 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -424,6 +424,15 @@ public final void setTransactionIdPrefix(String transactionIdPrefix) { return this.transactionIdPrefix; } + public final boolean initTransactionIdSuffix(int transactionIdSuffix) { + Assert.isTrue(transactionIdSuffix >= 0, "'transactionIdSuffix' initial value must greater than or equal 0"); + return this.transactionIdSuffix.compareAndSet(0, transactionIdSuffix); + } + + public int getCurrTransactionIdSuffix() { + return this.transactionIdSuffix.get(); + } + /** * Set to true to create a producer per thread instead of singleton that is shared by * all clients. Clients must call {@link #closeThreadBoundProducer()} to @@ -498,7 +507,7 @@ public List> getPostProcessors() { * @since 3.1 */ public void setMaxCache(int maxCache) { - Assert.isTrue(maxCache >= 0, "max cache must greater than or equal 0"); + Assert.isTrue(maxCache >= 0, "'maxCache' must greater than or equal 0"); this.maxCache = maxCache; } @@ -868,7 +877,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) { () -> "No suffix cache found for " + txIdPrefix + ", max cache" + this.maxCache); suffix = suffixQueue.poll(); if (suffix == null) { - throw new NoProducerAvailableException("No transaction producer available for " + txIdPrefix); + throw new NoProducerAvailableException("No available transaction producer suffix for " + txIdPrefix); } } else { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java index 46e5694755..0679b93d3d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java @@ -666,6 +666,39 @@ protected Producer createRawProducer(Map configs) { pf.destroy(); } + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + void testTransactionId() throws InterruptedException { + final Producer producer = mock(Producer.class); + final Map configPassedToKafkaConsumer = new HashMap<>(); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { + + @Override + protected Producer createRawProducer(Map configs) { + configPassedToKafkaConsumer.clear(); + configPassedToKafkaConsumer.putAll(configs); + return producer; + } + + }; + pf.setBootstrapServersSupplier(() -> "foo"); + pf.setTransactionIdPrefix("tx."); + pf.setMaxCache(2); + assertThat(pf.initTransactionIdSuffix(10)).isTrue(); + Producer aProducer = pf.createProducer(); + assertThat(configPassedToKafkaConsumer.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).isEqualTo("tx.10"); + assertThat(pf.initTransactionIdSuffix(20)).isFalse(); + Producer bProducer = pf.createProducer(); + assertThat(configPassedToKafkaConsumer.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).isEqualTo("tx.11"); + bProducer.close(Duration.ofSeconds(20)); + pf.setMaxAge(Duration.ofMillis(10)); + Thread.sleep(50); + Producer cProducer = pf.createProducer(); + assertThat(configPassedToKafkaConsumer.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).isEqualTo("tx.11"); + assertThat(pf.getCurrTransactionIdSuffix()).isEqualTo(12); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void configUpdates() { From c19811f8122296831c79d7d9b70d8a58c8ee5702 Mon Sep 17 00:00:00 2001 From: "Zhiyang.Wang1" Date: Thu, 7 Dec 2023 12:58:18 +0800 Subject: [PATCH 3/3] Improving `transactions.adoc` --- .../antora/modules/ROOT/pages/kafka/transactions.adoc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc index 1b5e07d093..7f60cdc013 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc @@ -108,11 +108,11 @@ With `EOSMode.V2` (aka `BETA`), the only supported mode, it is no longer necessa This property must have a different value on each application instance. [[transaction-id-suffix-fixed]] -== `TransactionIdSuffix Fixed` +== `TransactionIdSuffix` Fixed Since 3.1, when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range. When a transaction producer is requested and `transactional.id` all in use, throw a `NoProducerAvailableException`. -User can use then use a RetryTemplate configured to retry that exception, with a suitably configured back off. +We can then use a RetryTemplate configured to retry that exception, with a suitably configured back off. [source, java] ---- @@ -134,7 +134,10 @@ public static class Config { ---- When setting `maxCache` to 5, `transactional.id` is `my.txid.`++`{0-4}`+. -IMPORTANT: When use `KafkaTransactionManager` in the `ConcurrentMessageListenerContainer`, `maxCache` must be greater than `concurrency`, also be careful nested transaction. +IMPORTANT: When use `KafkaTransactionManager` in the `ConcurrentMessageListenerContainer` and enable `maxCache`, `maxCache` must be greater than or equal to `concurrency`. +If some `MessageListenerContainer` cannot get the transaction, will throw `NoProducerAvailableException`. +When use nested transactions in `ConcurrentMessageListenerContainer`, `maxCache` needs to increase the number of nested transactions. + [[tx-template-mixed]] == `KafkaTemplate` Transactional and non-Transactional Publishing