diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc index 1b5e07d093..cbb3b82513 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc @@ -110,11 +110,12 @@ This property must have a different value on each application instance. [[transaction-id-suffix-fixed]] == `TransactionIdSuffix Fixed` -Since 3.1, when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range. +Since 3.1, introduced a new interface `TransactionSuffixManager` to manage `transactional.id` suffix. +The default implementation is `DefaultTransactionSuffixManager` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter. When a transaction producer is requested and `transactional.id` all in use, throw a `NoProducerAvailableException`. -User can use then use a RetryTemplate configured to retry that exception, with a suitably configured back off. +User can then use a RetryTemplate configured to retry that exception, with a suitably configured back off. -[source, java] +[source,java] ---- public static class Config { @@ -125,8 +126,9 @@ public static class Config { ... DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(configs); ... - pf.setTransactionIdPrefix("my.txid."); - pf.setMaxCache(5); + DefaultTransactionSuffixManager sm = new DefaultTransactionSuffixManager(); + sm.setMaxCache(5); + pf.setTransactionSuffixManager(sm); return pf; } @@ -134,7 +136,10 @@ public static class Config { ---- When setting `maxCache` to 5, `transactional.id` is `my.txid.`++`{0-4}`+. -IMPORTANT: When use `KafkaTransactionManager` in the `ConcurrentMessageListenerContainer`, `maxCache` must be greater than `concurrency`, also be careful nested transaction. +IMPORTANT: When use `KafkaTransactionManager` in the `ConcurrentMessageListenerContainer` and enable `maxCache`, `maxCache` must be greater than or equal to `concurrency`. +If some `MessageListenerContainer` cannot get the transaction, will throw `NoProducerAvailableException`. +When use nested transactions in `ConcurrentMessageListenerContainer`, `maxCache` needs to increase the number of nested transactions. + [[tx-template-mixed]] == `KafkaTemplate` Transactional and non-Transactional Publishing diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index ec09a53b36..894cfa6f02 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -52,7 +52,7 @@ When manually assigning partitions, with a `null` consumer `group.id`, the `AckM See xref:tips.adoc#tip-assign-all-parts[Manually Assigning All Partitions] for more information. [[x31-dkpf]] -=== DefaultKafkaProducerFactory +=== TransactionSuffixManager -When setting `maxCache` with `transactionIdPrefix`, can restrict `transaction.id` in range. +A new `TransactionSuffixManager` interface with `DefaultTransactionSuffixManager` implementation is provided to restrict `transaction.id` in range. See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index 072b6abeec..4b8783a63b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -121,12 +121,8 @@ public class DefaultKafkaProducerFactory extends KafkaResourceFactory private final Map configs; - private final AtomicInteger transactionIdSuffix = new AtomicInteger(); - private final Map>> cache = new ConcurrentHashMap<>(); - private final Map> suffixCache = new ConcurrentHashMap<>(); - private final Map> threadBoundProducers = new ConcurrentHashMap<>(); private final AtomicInteger epoch = new AtomicInteger(); @@ -139,6 +135,8 @@ public class DefaultKafkaProducerFactory extends KafkaResourceFactory private final AtomicBoolean running = new AtomicBoolean(); + private TransactionSuffixManager transactionSuffixManager = new DefaultTransactionSuffixManager(); + private Supplier> keySerializerSupplier; private Supplier> valueSerializerSupplier; @@ -155,8 +153,6 @@ public class DefaultKafkaProducerFactory extends KafkaResourceFactory private boolean producerPerThread; - private int maxCache; - private long maxAge; private boolean configureSerializers = true; @@ -353,6 +349,15 @@ public void setKeySerializerSupplier(Supplier> keySerializerSuppli public void setValueSerializerSupplier(Supplier> valueSerializerSupplier) { this.valueSerializerSupplier = valueSerializerSupplier(valueSerializerSupplier); } + + /** + * Set the transaction suffix manager. + * @param transactionSuffixManager the manager. + * @since 3.1.1 + */ + public void setTransactionSuffixManager(TransactionSuffixManager transactionSuffixManager) { + this.transactionSuffixManager = transactionSuffixManager; + } /** * If true (default), programmatically provided serializers (via constructor or @@ -424,15 +429,6 @@ public final void setTransactionIdPrefix(String transactionIdPrefix) { return this.transactionIdPrefix; } - public final boolean initTransactionIdSuffix(int transactionIdSuffix) { - Assert.isTrue(transactionIdSuffix >= 0, "'transactionIdSuffix' initial value must greater than or equal 0"); - return this.transactionIdSuffix.compareAndSet(0, transactionIdSuffix); - } - - public int getCurrTransactionIdSuffix() { - return this.transactionIdSuffix.get(); - } - /** * Set to true to create a producer per thread instead of singleton that is shared by * all clients. Clients must call {@link #closeThreadBoundProducer()} to @@ -501,16 +497,6 @@ public List> getPostProcessors() { return Collections.unmodifiableList(this.postProcessors); } - /** - * Set the maximum size for transaction producer cache. - * @param maxCache the maxCache to set - * @since 3.1 - */ - public void setMaxCache(int maxCache) { - Assert.isTrue(maxCache >= 0, "'maxCache' must greater than or equal 0"); - this.maxCache = maxCache; - } - /** * Set the maximum age for a producer; useful when using transactions and the broker * might expire a {@code transactional.id} due to inactivity. @@ -732,7 +718,7 @@ public void destroy() { } }); this.cache.clear(); - this.suffixCache.clear(); + this.transactionSuffixManager.reset(); this.threadBoundProducers.values().forEach(prod -> { try { prod.closeDelegate(this.physicalCloseTimeout, this.listeners); @@ -870,19 +856,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) { } } if (cachedProducer == null) { - String suffix; - if (this.maxCache > 0) { - BlockingQueue suffixQueue = getSuffixCache(txIdPrefix, this.maxCache); - Assert.notNull(suffixQueue, - () -> "No suffix cache found for " + txIdPrefix + ", max cache" + this.maxCache); - suffix = suffixQueue.poll(); - if (suffix == null) { - throw new NoProducerAvailableException("No available transaction producer suffix for " + txIdPrefix); - } - } - else { - suffix = String.valueOf(this.transactionIdSuffix.getAndIncrement()); - } + String suffix = this.transactionSuffixManager.retrieveSuffix(txIdPrefix); return doCreateTxProducer(txIdPrefix, suffix, this::cacheReturner); } else { @@ -931,29 +905,7 @@ private void closeTransactionProducer(CloseSafeProducer producer, Duration producer.closeDelegate(timeout, listeners); } finally { - reuseTransactionIdSuffix(producer); - } - } - - private void reuseTransactionIdSuffix(CloseSafeProducer producerToRemove) { - reuseTransactionIdSuffix(producerToRemove.txIdPrefix, producerToRemove.txIdSuffix, producerToRemove.epoch); - } - - private void reuseTransactionIdSuffix(@Nullable String txIdPrefix, @Nullable String suffix, int epoch) { - if (txIdPrefix == null || suffix == null) { - return; - } - this.globalLock.lock(); - try { - if (this.maxCache > 0) { - BlockingQueue queue = getSuffixCache(txIdPrefix, this.maxCache); - if (epoch == this.epoch.get() && queue != null && !queue.contains(suffix)) { - queue.add(suffix); - } - } - } - finally { - this.globalLock.unlock(); + this.transactionSuffixManager.returnSuffix(producer.txIdPrefix, producer.txIdSuffix); } } @@ -973,7 +925,7 @@ private CloseSafeProducer doCreateTxProducer(String prefix, String suffix, throw newEx; // NOSONAR - lost stack trace } finally { - reuseTransactionIdSuffix(prefix, suffix, this.epoch.get()); + this.transactionSuffixManager.returnSuffix(prefix, suffix); } throw new KafkaException("initTransactions() failed", ex); } @@ -1006,31 +958,6 @@ protected BlockingQueue> getCache(@Nullable String txIdP return this.cache.computeIfAbsent(txIdPrefix, txId -> new LinkedBlockingQueue<>()); } - @Nullable - protected BlockingQueue getSuffixCache() { - return getSuffixCache(this.transactionIdPrefix); - } - - @Nullable - protected BlockingQueue getSuffixCache(@Nullable String txIdPrefix) { - return getSuffixCache(txIdPrefix, this.maxCache); - } - - @Nullable - protected BlockingQueue getSuffixCache(@Nullable String txIdPrefix, int maxCache) { - if (txIdPrefix == null || maxCache <= 0) { - return null; - } - - return this.suffixCache.computeIfAbsent(txIdPrefix, txId -> { - BlockingQueue queue = new LinkedBlockingQueue<>(); - for (int suffix = 0; suffix < maxCache; suffix++) { - queue.add(String.valueOf(this.transactionIdSuffix.getAndIncrement())); - } - return queue; - }); - } - /** * When using {@link #setProducerPerThread(boolean)} (true), call this method to close * and release this thread's producer. Thread bound producers are not closed by diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultTransactionSuffixManager.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultTransactionSuffixManager.java new file mode 100644 index 0000000000..bd6e5be991 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultTransactionSuffixManager.java @@ -0,0 +1,110 @@ +/* + * Copyright 2016-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * The {@link TransactionSuffixManager} implementation for managing transactional producer suffixes. + * If the {@link #maxCache} is greater than 0, the suffixes will be cached and reused. + * Otherwise, the suffixes will be generated on the fly. + * + *

+ * Setting the {@link #setMaxCache(int)} enables caching to restrict the number of `transactional.id`. + * The cache is per `transactional.id` prefix. The cache size is limited by the {@link #maxCache}. + * Default is 0, which means no caching and restriction, so the `transactional.id` will be generated on the fly. + * + * @author Ilya Starchenko + * + * @since 3.1.1 + */ +public class DefaultTransactionSuffixManager implements TransactionSuffixManager { + + private final AtomicInteger transactionIdSuffix = new AtomicInteger(); + + private final Map> suffixCache = new ConcurrentHashMap<>(); + + private int maxCache; + + /** + * Retrieve the suffix for the transactional producer from the cache or generate a new one + * if caching is disabled. + * @param txIdPrefix the transaction id prefix. + * @return the suffix. + * @throws NoProducerAvailableException if caching is enabled and no suffixes are available. + */ + @Override + public String retrieveSuffix(String txIdPrefix) { + BlockingQueue cache = getSuffixCache(txIdPrefix); + if (cache == null) { + return String.valueOf(this.transactionIdSuffix.getAndIncrement()); + } + + String suffix = cache.poll(); + if (suffix == null) { + throw new NoProducerAvailableException("No available transaction producer suffix for " + txIdPrefix); + } + return suffix; + } + + @Override + public void returnSuffix(@Nullable String txIdPrefix, @Nullable String suffix) { + if (this.maxCache <= 0 || suffix == null) { + return; + } + BlockingQueue queue = getSuffixCache(txIdPrefix); + if (queue != null && !queue.contains(suffix)) { + queue.add(suffix); + } + } + + @Override + public void reset() { + this.suffixCache.clear(); + } + + /** + * Set the maximum size for transaction producer cache. + * @param maxCache the maxCache to set + */ + public void setMaxCache(int maxCache) { + Assert.isTrue(maxCache >= 0, "'maxCache' must greater than or equal 0"); + this.maxCache = maxCache; + } + + @Nullable + private BlockingQueue getSuffixCache(@Nullable String txIdPrefix) { + if (txIdPrefix == null || this.maxCache <= 0) { + return null; + } + + return this.suffixCache.computeIfAbsent(txIdPrefix, txId -> { + BlockingQueue queue = new LinkedBlockingQueue<>(); + for (int suffix = 0; suffix < this.maxCache; suffix++) { + queue.add(String.valueOf(this.transactionIdSuffix.getAndIncrement())); + } + return queue; + }); + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/TransactionSuffixManager.java b/spring-kafka/src/main/java/org/springframework/kafka/core/TransactionSuffixManager.java new file mode 100644 index 0000000000..c69bbe07f8 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/TransactionSuffixManager.java @@ -0,0 +1,48 @@ +/* + * Copyright 2016-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +/** + * The strategy for managing transactional producer suffixes. + * + * @author Ilya Starchenko + * + * @since 3.1.1 + */ +public interface TransactionSuffixManager { + + /** + * Retrieve the suffix for the transactional producer. + * + * @param txIdPrefix the transaction id prefix. + * @return the suffix. + */ + String retrieveSuffix(String txIdPrefix); + + /** + * Return the suffix for the transactional producer back for reuse. + * + * @param txIdPrefix the transaction id prefix. + * @param suffix the suffix. + */ + void returnSuffix(String txIdPrefix, String suffix); + + /** + * Clear all suffixes for the transactional producer. + */ + void reset(); +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index 1f9ebc0f98..a211eea579 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -384,7 +384,9 @@ public void testNestedTxProducerIsFixed() throws Exception { KafkaTemplate template = new KafkaTemplate<>(pf); DefaultKafkaProducerFactory pfTx = new DefaultKafkaProducerFactory<>(producerProps); pfTx.setTransactionIdPrefix("fooTx.fixed."); - pfTx.setMaxCache(3); + DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager(); + tsManager.setMaxCache(3); + pfTx.setTransactionSuffixManager(tsManager); KafkaOperations templateTx = new KafkaTemplate<>(pfTx); Map consumerProps = KafkaTestUtils.consumerProps("txCache1FixedGroup", "false", this.embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); @@ -417,9 +419,8 @@ public void testNestedTxProducerIsFixed() throws Exception { assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1); assertThat(pfTx.getCache()).hasSize(1); - assertThat(KafkaTestUtils.getPropertyValue(pfTx, "suffixCache", Map.class)).hasSize(1); + assertThat(KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class)).hasSize(1); // 1 tm tx producer and 1 templateTx tx producer - assertThat(pfTx.getSuffixCache()).hasSize(1); assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer")).isSameAs(wrapped.get()); } finally { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java index 0679b93d3d..c2fc1c846f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java @@ -137,7 +137,9 @@ protected Producer createRawProducer(Map configs) { }; pf.setTransactionIdPrefix("foo"); - pf.setMaxCache(2); + DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager(); + tsManager.setMaxCache(2); + pf.setTransactionSuffixManager(tsManager); final AtomicInteger flag = new AtomicInteger(); willAnswer(i -> { @@ -182,7 +184,7 @@ protected Producer createRawProducer(Map configs) { assertThat(cache).hasSize(1); Queue queue = (Queue) cache.get("foo"); assertThat(queue).hasSize(1); - Map suffixCache = KafkaTestUtils.getPropertyValue(pf, "suffixCache", Map.class); + Map suffixCache = KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class); assertThat(suffixCache).hasSize(1); Queue suffixQueue = (Queue) suffixCache.get("foo"); assertThat(suffixQueue).hasSize(1); @@ -308,7 +310,9 @@ protected Producer createRawProducer(Map configs) { }; pf.setApplicationContext(ctx); pf.setTransactionIdPrefix("foo"); - pf.setMaxCache(3); + DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager(); + tsManager.setMaxCache(3); + pf.setTransactionSuffixManager(tsManager); Producer aProducer = pf.createProducer(); assertThat(aProducer).isNotNull(); aProducer.close(); @@ -320,7 +324,7 @@ protected Producer createRawProducer(Map configs) { assertThat(cache.size()).isEqualTo(1); Queue queue = (Queue) cache.get("foo"); assertThat(queue.size()).isEqualTo(1); - Map suffixCache = KafkaTestUtils.getPropertyValue(pf, "suffixCache", Map.class); + Map suffixCache = KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class); assertThat(suffixCache.size()).isEqualTo(1); Queue suffixQueue = (Queue) suffixCache.get("foo"); assertThat(suffixQueue.size()).isEqualTo(2); @@ -385,7 +389,9 @@ protected Producer createRawProducer(Map configs) { }; pf.setApplicationContext(ctx); pf.setTransactionIdPrefix("foo"); - pf.setMaxCache(3); + DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager(); + tsManager.setMaxCache(3); + pf.setTransactionSuffixManager(tsManager); Producer aProducer = pf.createProducer(); assertThat(aProducer).isNotNull(); aProducer.close(); @@ -397,7 +403,7 @@ protected Producer createRawProducer(Map configs) { assertThat(cache.size()).isEqualTo(1); Queue queue = (Queue) cache.get("foo"); assertThat(queue.size()).isEqualTo(1); - Map suffixCache = KafkaTestUtils.getPropertyValue(pf, "suffixCache", Map.class); + Map suffixCache = KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class); assertThat(suffixCache.size()).isEqualTo(1); Queue suffixQueue = (Queue) suffixCache.get("foo"); assertThat(suffixQueue.size()).isEqualTo(2); @@ -546,8 +552,10 @@ protected Producer createRawProducer(Map configs) { }; pf.setTransactionIdPrefix("foo"); - pf.setMaxCache(2); - Map suffixCache = KafkaTestUtils.getPropertyValue(pf, "suffixCache", Map.class); + DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager(); + tsManager.setMaxCache(2); + pf.setTransactionSuffixManager(tsManager); + Map suffixCache = KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class); pf.createProducer(); Queue queue = (Queue) suffixCache.get("foo"); assertThat(queue).hasSize(1); @@ -666,38 +674,6 @@ protected Producer createRawProducer(Map configs) { pf.destroy(); } - @Test - @SuppressWarnings({ "rawtypes", "unchecked" }) - void testTransactionId() throws InterruptedException { - final Producer producer = mock(Producer.class); - final Map configPassedToKafkaConsumer = new HashMap<>(); - DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { - - @Override - protected Producer createRawProducer(Map configs) { - configPassedToKafkaConsumer.clear(); - configPassedToKafkaConsumer.putAll(configs); - return producer; - } - - }; - pf.setBootstrapServersSupplier(() -> "foo"); - pf.setTransactionIdPrefix("tx."); - pf.setMaxCache(2); - assertThat(pf.initTransactionIdSuffix(10)).isTrue(); - Producer aProducer = pf.createProducer(); - assertThat(configPassedToKafkaConsumer.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).isEqualTo("tx.10"); - assertThat(pf.initTransactionIdSuffix(20)).isFalse(); - Producer bProducer = pf.createProducer(); - assertThat(configPassedToKafkaConsumer.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).isEqualTo("tx.11"); - bProducer.close(Duration.ofSeconds(20)); - pf.setMaxAge(Duration.ofMillis(10)); - Thread.sleep(50); - Producer cProducer = pf.createProducer(); - assertThat(configPassedToKafkaConsumer.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).isEqualTo("tx.11"); - assertThat(pf.getCurrTransactionIdSuffix()).isEqualTo(12); - } - @SuppressWarnings({ "rawtypes", "unchecked" }) @Test diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultTransactionManagerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultTransactionManagerTests.java new file mode 100644 index 0000000000..d4d5f23f81 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultTransactionManagerTests.java @@ -0,0 +1,114 @@ +/* + * Copyright 2016-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Map; +import java.util.Queue; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.kafka.test.utils.KafkaTestUtils; + +/** + * @author Ilya Starchenko + * + * @since 3.1.1 + */ +class DefaultTransactionSuffixManagerTest { + + private DefaultTransactionSuffixManager manager; + + @BeforeEach + void setUp() { + manager = new DefaultTransactionSuffixManager(); + } + + @Test + void testRetrieveSuffixWithoutCache() { + String txIdPrefix = "txIdPrefix"; + String suffix = manager.retrieveSuffix(txIdPrefix); + assertNotNull(suffix); + } + + @Test + void testRetrieveSuffixWithCache() { + String txIdPrefix = "txIdPrefix"; + String suffix = "suffix"; + assertDoesNotThrow(() -> manager.returnSuffix(txIdPrefix, suffix)); + } + + + @Test + void testRetrieveSuffixWithCacheExhausted() { + String txIdPrefix = "txIdPrefix"; + manager.setMaxCache(2); + String suffix1 = manager.retrieveSuffix(txIdPrefix); + String suffix2 = manager.retrieveSuffix(txIdPrefix); + assertThrows(NoProducerAvailableException.class, () -> manager.retrieveSuffix(txIdPrefix)); + } + + @Test + void testReturnSuffixWithCache() { + String txIdPrefix = "txIdPrefix"; + manager.setMaxCache(2); + String suffix = manager.retrieveSuffix(txIdPrefix); + assertDoesNotThrow(() -> manager.returnSuffix(txIdPrefix, suffix)); + } + + @SuppressWarnings("rawtypes") + @Test + void testReturnAllSuffixesWithCache() { + String txIdPrefix = "txIdPrefix"; + manager.setMaxCache(2); + String suffix1 = manager.retrieveSuffix(txIdPrefix); + String suffix2 = manager.retrieveSuffix(txIdPrefix); + assertDoesNotThrow(() -> manager.returnSuffix(txIdPrefix, suffix1)); + Map suffixCache = KafkaTestUtils.getPropertyValue(manager, "suffixCache", Map.class); + assertThat(suffixCache).hasSize(1); + Queue queue = (Queue) suffixCache.get(txIdPrefix); + assertThat(queue).hasSize(1); + assertThat(queue.poll()).isEqualTo(suffix1); + } + + @Test + void testReset() { + String txIdPrefix = "txIdPrefix"; + manager.setMaxCache(2); + String suffix1 = manager.retrieveSuffix(txIdPrefix); + String suffix2 = manager.retrieveSuffix(txIdPrefix); + assertDoesNotThrow(() -> manager.returnSuffix(txIdPrefix, suffix1)); + manager.reset(); + Map suffixCache = KafkaTestUtils.getPropertyValue(manager, "suffixCache", Map.class); + assertThat(suffixCache).hasSize(0); + } + + @Test + void testSetMaxCacheIsNegative() { + assertThrows(IllegalArgumentException.class, () -> manager.setMaxCache(-1)); + } + + @Test + void testSetMaxCacheIsZero() { + assertDoesNotThrow(() -> manager.setMaxCache(0)); + } +} \ No newline at end of file diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index e9cc1d0e24..6d3df380e9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -38,6 +38,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingDeque; @@ -160,7 +161,7 @@ record = iterator.next(); assertThat(pf.getCache()).hasSize(0); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void testLocalTransactionIsFixed() { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); @@ -168,7 +169,9 @@ public void testLocalTransactionIsFixed() { senderProps.put(ProducerConfig.CLIENT_ID_CONFIG, "customClientIdFixed"); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); pf.setKeySerializer(new StringSerializer()); - pf.setMaxCache(3); + DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager(); + tsManager.setMaxCache(3); + pf.setTransactionSuffixManager(tsManager); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(STRING_KEY_TOPIC); Map consumerProps = KafkaTestUtils.consumerProps("testLocalTxFixed", "false", embeddedKafka); @@ -213,12 +216,13 @@ record = iterator.next(); return null; }); assertThat(pf.getCache("tx.template.override.fixed.")).hasSize(1); - assertThat(pf.getSuffixCache("tx.template.override.fixed.")).hasSize(2); + Map suffixCache = KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class); + assertThat((Queue) suffixCache.get("tx.template.override.fixed.")).hasSize(2); assertThat(pf.getCache("testCustomClientIdIsUniqueFixed")).hasSize(1); - assertThat(pf.getSuffixCache("testCustomClientIdIsUniqueFixed")).hasSize(2); + assertThat((Queue) suffixCache.get("testCustomClientIdIsUniqueFixed")).hasSize(2); pf.destroy(); assertThat(pf.getCache()).hasSize(0); - assertThat(KafkaTestUtils.getPropertyValue(pf, "suffixCache", Map.class)).hasSize(0); + assertThat(KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class)).hasSize(0); } @Test