From 90b6ea9f345dcc04060ba514de60ed67faa7229d Mon Sep 17 00:00:00 2001 From: "Zhiyang.Wang1" Date: Mon, 27 Nov 2023 17:47:59 +0800 Subject: [PATCH] GH-2852: support fixed transaction id suffix Introducing a new interface for generating transactionIdSuffix with a default increment implementation. Introducing a new interface for return the current transactionIdSuffix. Resolves #2852 --- .../core/DefaultKafkaProducerFactory.java | 13 ++++++-- .../DefaultKafkaProducerFactoryTests.java | 33 +++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index d83afd9fa9..072b6abeec 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -424,6 +424,15 @@ public final void setTransactionIdPrefix(String transactionIdPrefix) { return this.transactionIdPrefix; } + public final boolean initTransactionIdSuffix(int transactionIdSuffix) { + Assert.isTrue(transactionIdSuffix >= 0, "'transactionIdSuffix' initial value must greater than or equal 0"); + return this.transactionIdSuffix.compareAndSet(0, transactionIdSuffix); + } + + public int getCurrTransactionIdSuffix() { + return this.transactionIdSuffix.get(); + } + /** * Set to true to create a producer per thread instead of singleton that is shared by * all clients. Clients must call {@link #closeThreadBoundProducer()} to @@ -498,7 +507,7 @@ public List> getPostProcessors() { * @since 3.1 */ public void setMaxCache(int maxCache) { - Assert.isTrue(maxCache >= 0, "max cache must greater than or equal 0"); + Assert.isTrue(maxCache >= 0, "'maxCache' must greater than or equal 0"); this.maxCache = maxCache; } @@ -868,7 +877,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) { () -> "No suffix cache found for " + txIdPrefix + ", max cache" + this.maxCache); suffix = suffixQueue.poll(); if (suffix == null) { - throw new NoProducerAvailableException("No transaction producer available for " + txIdPrefix); + throw new NoProducerAvailableException("No available transaction producer suffix for " + txIdPrefix); } } else { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java index 46e5694755..0679b93d3d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java @@ -666,6 +666,39 @@ protected Producer createRawProducer(Map configs) { pf.destroy(); } + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + void testTransactionId() throws InterruptedException { + final Producer producer = mock(Producer.class); + final Map configPassedToKafkaConsumer = new HashMap<>(); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { + + @Override + protected Producer createRawProducer(Map configs) { + configPassedToKafkaConsumer.clear(); + configPassedToKafkaConsumer.putAll(configs); + return producer; + } + + }; + pf.setBootstrapServersSupplier(() -> "foo"); + pf.setTransactionIdPrefix("tx."); + pf.setMaxCache(2); + assertThat(pf.initTransactionIdSuffix(10)).isTrue(); + Producer aProducer = pf.createProducer(); + assertThat(configPassedToKafkaConsumer.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).isEqualTo("tx.10"); + assertThat(pf.initTransactionIdSuffix(20)).isFalse(); + Producer bProducer = pf.createProducer(); + assertThat(configPassedToKafkaConsumer.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).isEqualTo("tx.11"); + bProducer.close(Duration.ofSeconds(20)); + pf.setMaxAge(Duration.ofMillis(10)); + Thread.sleep(50); + Producer cProducer = pf.createProducer(); + assertThat(configPassedToKafkaConsumer.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).isEqualTo("tx.11"); + assertThat(pf.getCurrTransactionIdSuffix()).isEqualTo(12); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void configUpdates() {