diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc index cbb3b82513..01f24ebcd3 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc @@ -110,8 +110,8 @@ This property must have a different value on each application instance. [[transaction-id-suffix-fixed]] == `TransactionIdSuffix Fixed` -Since 3.1, introduced a new interface `TransactionSuffixManager` to manage `transactional.id` suffix. -The default implementation is `DefaultTransactionSuffixManager` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter. +Since 3.1, introduced a new interface `TransactionIdSuffixStrategy` to manage `transactional.id` suffix. +The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter. When a transaction producer is requested and `transactional.id` all in use, throw a `NoProducerAvailableException`. User can then use a RetryTemplate configured to retry that exception, with a suitably configured back off. @@ -126,9 +126,9 @@ public static class Config { ... DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(configs); ... - DefaultTransactionSuffixManager sm = new DefaultTransactionSuffixManager(); - sm.setMaxCache(5); - pf.setTransactionSuffixManager(sm); + DefaultTransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(); + ss.setMaxCache(5); + pf.setTransactionIdSuffixStrategy(ss); return pf; } diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 894cfa6f02..e670406eda 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -52,7 +52,7 @@ When manually assigning partitions, with a `null` consumer `group.id`, the `AckM See xref:tips.adoc#tip-assign-all-parts[Manually Assigning All Partitions] for more information. [[x31-dkpf]] -=== TransactionSuffixManager +=== TransactionIdSuffixStrategy -A new `TransactionSuffixManager` interface with `DefaultTransactionSuffixManager` implementation is provided to restrict `transaction.id` in range. +A new `TransactionIdSuffixStrategy` interface with `DefaultTransactionIdSuffixStrategy` implementation is provided to restrict `transaction.id` in range. See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index 3f835c2f54..9940c9e69d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -135,7 +135,7 @@ public class DefaultKafkaProducerFactory extends KafkaResourceFactory private final AtomicBoolean running = new AtomicBoolean(); - private TransactionSuffixManager transactionSuffixManager = new DefaultTransactionSuffixManager(); + private TransactionIdSuffixStrategy transactionIdSuffixStrategy = new DefaultTransactionIdSuffixStrategy(); private Supplier> keySerializerSupplier; @@ -351,12 +351,12 @@ public void setValueSerializerSupplier(Supplier> valueSerializerSu } /** - * Set the transaction suffix manager. - * @param transactionSuffixManager the manager. + * Set the transaction suffix strategy. + * @param transactionIdSuffixStrategy the strategy. * @since 3.1.1 */ - public void setTransactionSuffixManager(TransactionSuffixManager transactionSuffixManager) { - this.transactionSuffixManager = transactionSuffixManager; + public void setTransactionIdSuffixStrategy(TransactionIdSuffixStrategy transactionIdSuffixStrategy) { + this.transactionIdSuffixStrategy = transactionIdSuffixStrategy; } /** @@ -718,7 +718,7 @@ public void destroy() { } }); this.cache.clear(); - this.transactionSuffixManager.reset(); + this.transactionIdSuffixStrategy.reset(); this.threadBoundProducers.values().forEach(prod -> { try { prod.closeDelegate(this.physicalCloseTimeout, this.listeners); @@ -856,7 +856,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) { } } if (cachedProducer == null) { - String suffix = this.transactionSuffixManager.retrieveSuffix(txIdPrefix); + String suffix = this.transactionIdSuffixStrategy.acquireSuffix(txIdPrefix); return doCreateTxProducer(txIdPrefix, suffix, this::cacheReturner); } else { @@ -905,7 +905,7 @@ private void closeTransactionProducer(CloseSafeProducer producer, Duration producer.closeDelegate(timeout, listeners); } finally { - this.transactionSuffixManager.returnSuffix(producer.txIdPrefix, producer.txIdSuffix); + this.transactionIdSuffixStrategy.releaseSuffix(producer.txIdPrefix, producer.txIdSuffix); } } @@ -925,7 +925,7 @@ private CloseSafeProducer doCreateTxProducer(String prefix, String suffix, throw newEx; // NOSONAR - lost stack trace } finally { - this.transactionSuffixManager.returnSuffix(prefix, suffix); + this.transactionIdSuffixStrategy.releaseSuffix(prefix, suffix); } throw new KafkaException("initTransactions() failed", ex); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultTransactionSuffixManager.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultTransactionIdSuffixStrategy.java similarity index 83% rename from spring-kafka/src/main/java/org/springframework/kafka/core/DefaultTransactionSuffixManager.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/DefaultTransactionIdSuffixStrategy.java index b10fd9cd42..ec7c6c86ad 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultTransactionSuffixManager.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultTransactionIdSuffixStrategy.java @@ -26,7 +26,7 @@ import org.springframework.util.Assert; /** - * The {@link TransactionSuffixManager} implementation for managing transactional producer suffixes. + * The {@link TransactionIdSuffixStrategy} implementation for managing transactional producer suffixes. * If the {@link #maxCache} is greater than 0, the suffixes will be cached and reused. * Otherwise, the suffixes will be generated on the fly. * @@ -39,7 +39,7 @@ * * @since 3.1.1 */ -public class DefaultTransactionSuffixManager implements TransactionSuffixManager { +public class DefaultTransactionIdSuffixStrategy implements TransactionIdSuffixStrategy { private final AtomicInteger transactionIdSuffix = new AtomicInteger(); @@ -55,7 +55,8 @@ public class DefaultTransactionSuffixManager implements TransactionSuffixManager * @throws NoProducerAvailableException if caching is enabled and no suffixes are available. */ @Override - public String retrieveSuffix(String txIdPrefix) { + public String acquireSuffix(String txIdPrefix) { + Assert.notNull(txIdPrefix, "'txIdPrefix' must not be null"); BlockingQueue cache = getSuffixCache(txIdPrefix); if (cache == null) { return String.valueOf(this.transactionIdSuffix.getAndIncrement()); @@ -69,8 +70,10 @@ public String retrieveSuffix(String txIdPrefix) { } @Override - public void returnSuffix(@Nullable String txIdPrefix, @Nullable String suffix) { - if (this.maxCache <= 0 || suffix == null) { + public void releaseSuffix(String txIdPrefix, String suffix) { + Assert.notNull(txIdPrefix, "'txIdPrefix' must not be null"); + Assert.notNull(suffix, "'suffix' must not be null"); + if (this.maxCache <= 0) { return; } BlockingQueue queue = getSuffixCache(txIdPrefix); @@ -94,8 +97,8 @@ public void setMaxCache(int maxCache) { } @Nullable - private BlockingQueue getSuffixCache(@Nullable String txIdPrefix) { - if (txIdPrefix == null || this.maxCache <= 0) { + private BlockingQueue getSuffixCache(String txIdPrefix) { + if (this.maxCache <= 0) { return null; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/TransactionSuffixManager.java b/spring-kafka/src/main/java/org/springframework/kafka/core/TransactionIdSuffixStrategy.java similarity index 79% rename from spring-kafka/src/main/java/org/springframework/kafka/core/TransactionSuffixManager.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/TransactionIdSuffixStrategy.java index c69bbe07f8..70597c7cb9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/TransactionSuffixManager.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/TransactionIdSuffixStrategy.java @@ -23,23 +23,23 @@ * * @since 3.1.1 */ -public interface TransactionSuffixManager { +public interface TransactionIdSuffixStrategy { /** - * Retrieve the suffix for the transactional producer. + * Acquire the suffix for the transactional producer. * * @param txIdPrefix the transaction id prefix. * @return the suffix. */ - String retrieveSuffix(String txIdPrefix); + String acquireSuffix(String txIdPrefix); /** - * Return the suffix for the transactional producer back for reuse. + * Release the suffix for the transactional producer. * * @param txIdPrefix the transaction id prefix. * @param suffix the suffix. */ - void returnSuffix(String txIdPrefix, String suffix); + void releaseSuffix(String txIdPrefix, String suffix); /** * Clear all suffixes for the transactional producer. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index bdb489c98b..4419d5d90c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -926,6 +926,7 @@ else if (listener instanceof MessageListener) { if (this.containerProperties.isLogContainerConfig()) { this.logger.info(toString()); } + Map props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties(); ApplicationContext applicationContext = getApplicationContext(); this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull() || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index a211eea579..c73f6b1b40 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -384,9 +384,9 @@ public void testNestedTxProducerIsFixed() throws Exception { KafkaTemplate template = new KafkaTemplate<>(pf); DefaultKafkaProducerFactory pfTx = new DefaultKafkaProducerFactory<>(producerProps); pfTx.setTransactionIdPrefix("fooTx.fixed."); - DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager(); - tsManager.setMaxCache(3); - pfTx.setTransactionSuffixManager(tsManager); + DefaultTransactionIdSuffixStrategy suffixStrategy = new DefaultTransactionIdSuffixStrategy(); + suffixStrategy.setMaxCache(3); + pfTx.setTransactionIdSuffixStrategy(suffixStrategy); KafkaOperations templateTx = new KafkaTemplate<>(pfTx); Map consumerProps = KafkaTestUtils.consumerProps("txCache1FixedGroup", "false", this.embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); @@ -419,7 +419,7 @@ public void testNestedTxProducerIsFixed() throws Exception { assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1); assertThat(pfTx.getCache()).hasSize(1); - assertThat(KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class)).hasSize(1); + assertThat(KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class)).hasSize(1); // 1 tm tx producer and 1 templateTx tx producer assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer")).isSameAs(wrapped.get()); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java index c2fc1c846f..7059597c48 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java @@ -137,9 +137,9 @@ protected Producer createRawProducer(Map configs) { }; pf.setTransactionIdPrefix("foo"); - DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager(); - tsManager.setMaxCache(2); - pf.setTransactionSuffixManager(tsManager); + DefaultTransactionIdSuffixStrategy suffixStrategy = new DefaultTransactionIdSuffixStrategy(); + suffixStrategy.setMaxCache(2); + pf.setTransactionIdSuffixStrategy(suffixStrategy); final AtomicInteger flag = new AtomicInteger(); willAnswer(i -> { @@ -184,7 +184,7 @@ protected Producer createRawProducer(Map configs) { assertThat(cache).hasSize(1); Queue queue = (Queue) cache.get("foo"); assertThat(queue).hasSize(1); - Map suffixCache = KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class); + Map suffixCache = KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class); assertThat(suffixCache).hasSize(1); Queue suffixQueue = (Queue) suffixCache.get("foo"); assertThat(suffixQueue).hasSize(1); @@ -310,9 +310,9 @@ protected Producer createRawProducer(Map configs) { }; pf.setApplicationContext(ctx); pf.setTransactionIdPrefix("foo"); - DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager(); - tsManager.setMaxCache(3); - pf.setTransactionSuffixManager(tsManager); + DefaultTransactionIdSuffixStrategy suffixStrategy = new DefaultTransactionIdSuffixStrategy(); + suffixStrategy.setMaxCache(3); + pf.setTransactionIdSuffixStrategy(suffixStrategy); Producer aProducer = pf.createProducer(); assertThat(aProducer).isNotNull(); aProducer.close(); @@ -324,7 +324,7 @@ protected Producer createRawProducer(Map configs) { assertThat(cache.size()).isEqualTo(1); Queue queue = (Queue) cache.get("foo"); assertThat(queue.size()).isEqualTo(1); - Map suffixCache = KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class); + Map suffixCache = KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class); assertThat(suffixCache.size()).isEqualTo(1); Queue suffixQueue = (Queue) suffixCache.get("foo"); assertThat(suffixQueue.size()).isEqualTo(2); @@ -389,9 +389,9 @@ protected Producer createRawProducer(Map configs) { }; pf.setApplicationContext(ctx); pf.setTransactionIdPrefix("foo"); - DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager(); - tsManager.setMaxCache(3); - pf.setTransactionSuffixManager(tsManager); + DefaultTransactionIdSuffixStrategy suffixStrategy = new DefaultTransactionIdSuffixStrategy(); + suffixStrategy.setMaxCache(3); + pf.setTransactionIdSuffixStrategy(suffixStrategy); Producer aProducer = pf.createProducer(); assertThat(aProducer).isNotNull(); aProducer.close(); @@ -403,7 +403,7 @@ protected Producer createRawProducer(Map configs) { assertThat(cache.size()).isEqualTo(1); Queue queue = (Queue) cache.get("foo"); assertThat(queue.size()).isEqualTo(1); - Map suffixCache = KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class); + Map suffixCache = KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class); assertThat(suffixCache.size()).isEqualTo(1); Queue suffixQueue = (Queue) suffixCache.get("foo"); assertThat(suffixQueue.size()).isEqualTo(2); @@ -552,10 +552,10 @@ protected Producer createRawProducer(Map configs) { }; pf.setTransactionIdPrefix("foo"); - DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager(); - tsManager.setMaxCache(2); - pf.setTransactionSuffixManager(tsManager); - Map suffixCache = KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class); + DefaultTransactionIdSuffixStrategy suffixStrategy = new DefaultTransactionIdSuffixStrategy(); + suffixStrategy.setMaxCache(2); + pf.setTransactionIdSuffixStrategy(suffixStrategy); + Map suffixCache = KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class); pf.createProducer(); Queue queue = (Queue) suffixCache.get("foo"); assertThat(queue).hasSize(1); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultTransactionSuffixManagerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultTransactionIdSuffixStrategyTests.java similarity index 57% rename from spring-kafka/src/test/java/org/springframework/kafka/core/DefaultTransactionSuffixManagerTests.java rename to spring-kafka/src/test/java/org/springframework/kafka/core/DefaultTransactionIdSuffixStrategyTests.java index 4f4d1d2d9d..7300428a3c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultTransactionSuffixManagerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultTransactionIdSuffixStrategyTests.java @@ -33,19 +33,19 @@ * * @since 3.1.1 */ -class DefaultTransactionSuffixManagerTests { +class DefaultTransactionIdSuffixStrategyTests { - private DefaultTransactionSuffixManager manager; + private DefaultTransactionIdSuffixStrategy suffixStrategy; @BeforeEach void setUp() { - manager = new DefaultTransactionSuffixManager(); + suffixStrategy = new DefaultTransactionIdSuffixStrategy(); } @Test void testRetrieveSuffixWithoutCache() { String txIdPrefix = "txIdPrefix"; - String suffix = manager.retrieveSuffix(txIdPrefix); + String suffix = suffixStrategy.acquireSuffix(txIdPrefix); assertThat(suffix).isNotNull(); } @@ -53,36 +53,36 @@ void testRetrieveSuffixWithoutCache() { void testRetrieveSuffixWithCache() { String txIdPrefix = "txIdPrefix"; String suffix = "suffix"; - assertThatNoException().isThrownBy(() -> manager.returnSuffix(txIdPrefix, suffix)); + assertThatNoException().isThrownBy(() -> suffixStrategy.releaseSuffix(txIdPrefix, suffix)); } @Test void testRetrieveSuffixWithCacheExhausted() { String txIdPrefix = "txIdPrefix"; - manager.setMaxCache(2); - String suffix1 = manager.retrieveSuffix(txIdPrefix); - String suffix2 = manager.retrieveSuffix(txIdPrefix); - assertThatExceptionOfType(NoProducerAvailableException.class).isThrownBy(() -> manager.retrieveSuffix(txIdPrefix)); + suffixStrategy.setMaxCache(2); + String suffix1 = suffixStrategy.acquireSuffix(txIdPrefix); + String suffix2 = suffixStrategy.acquireSuffix(txIdPrefix); + assertThatExceptionOfType(NoProducerAvailableException.class).isThrownBy(() -> suffixStrategy.acquireSuffix(txIdPrefix)); } @Test void testReturnSuffixWithCache() { String txIdPrefix = "txIdPrefix"; - manager.setMaxCache(2); - String suffix = manager.retrieveSuffix(txIdPrefix); - assertThatNoException().isThrownBy(() -> manager.returnSuffix(txIdPrefix, suffix)); + suffixStrategy.setMaxCache(2); + String suffix = suffixStrategy.acquireSuffix(txIdPrefix); + assertThatNoException().isThrownBy(() -> suffixStrategy.releaseSuffix(txIdPrefix, suffix)); } @SuppressWarnings("rawtypes") @Test void testReturnAllSuffixesWithCache() { String txIdPrefix = "txIdPrefix"; - manager.setMaxCache(2); - String suffix1 = manager.retrieveSuffix(txIdPrefix); - String suffix2 = manager.retrieveSuffix(txIdPrefix); - assertThatNoException().isThrownBy(() -> manager.returnSuffix(txIdPrefix, suffix1)); - Map suffixCache = KafkaTestUtils.getPropertyValue(manager, "suffixCache", Map.class); + suffixStrategy.setMaxCache(2); + String suffix1 = suffixStrategy.acquireSuffix(txIdPrefix); + String suffix2 = suffixStrategy.acquireSuffix(txIdPrefix); + assertThatNoException().isThrownBy(() -> suffixStrategy.releaseSuffix(txIdPrefix, suffix1)); + Map suffixCache = KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class); assertThat(suffixCache).hasSize(1); Queue queue = (Queue) suffixCache.get(txIdPrefix); assertThat(queue).hasSize(1); @@ -92,22 +92,22 @@ void testReturnAllSuffixesWithCache() { @Test void testReset() { String txIdPrefix = "txIdPrefix"; - manager.setMaxCache(2); - String suffix1 = manager.retrieveSuffix(txIdPrefix); - String suffix2 = manager.retrieveSuffix(txIdPrefix); - assertThatNoException().isThrownBy(() -> manager.returnSuffix(txIdPrefix, suffix1)); - manager.reset(); - Map suffixCache = KafkaTestUtils.getPropertyValue(manager, "suffixCache", Map.class); + suffixStrategy.setMaxCache(2); + String suffix1 = suffixStrategy.acquireSuffix(txIdPrefix); + String suffix2 = suffixStrategy.acquireSuffix(txIdPrefix); + assertThatNoException().isThrownBy(() -> suffixStrategy.releaseSuffix(txIdPrefix, suffix1)); + suffixStrategy.reset(); + Map suffixCache = KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class); assertThat(suffixCache).hasSize(0); } @Test void testSetMaxCacheIsNegative() { - assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> manager.setMaxCache(-1)); + assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> suffixStrategy.setMaxCache(-1)); } @Test void testSetMaxCacheIsZero() { - assertThatNoException().isThrownBy(() -> manager.setMaxCache(0)); + assertThatNoException().isThrownBy(() -> suffixStrategy.setMaxCache(0)); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index 6d3df380e9..ad475b1279 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -169,9 +169,9 @@ public void testLocalTransactionIsFixed() { senderProps.put(ProducerConfig.CLIENT_ID_CONFIG, "customClientIdFixed"); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); pf.setKeySerializer(new StringSerializer()); - DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager(); - tsManager.setMaxCache(3); - pf.setTransactionSuffixManager(tsManager); + DefaultTransactionIdSuffixStrategy suffixStrategy = new DefaultTransactionIdSuffixStrategy(); + suffixStrategy.setMaxCache(3); + pf.setTransactionIdSuffixStrategy(suffixStrategy); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(STRING_KEY_TOPIC); Map consumerProps = KafkaTestUtils.consumerProps("testLocalTxFixed", "false", embeddedKafka); @@ -216,13 +216,13 @@ record = iterator.next(); return null; }); assertThat(pf.getCache("tx.template.override.fixed.")).hasSize(1); - Map suffixCache = KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class); + Map suffixCache = KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class); assertThat((Queue) suffixCache.get("tx.template.override.fixed.")).hasSize(2); assertThat(pf.getCache("testCustomClientIdIsUniqueFixed")).hasSize(1); assertThat((Queue) suffixCache.get("testCustomClientIdIsUniqueFixed")).hasSize(2); pf.destroy(); assertThat(pf.getCache()).hasSize(0); - assertThat(KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class)).hasSize(0); + assertThat(KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class)).hasSize(0); } @Test