Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
stillya committed Dec 12, 2023
1 parent e5048c0 commit defa439
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ This property must have a different value on each application instance.
[[transaction-id-suffix-fixed]]
== `TransactionIdSuffix Fixed`

Since 3.1, introduced a new interface `TransactionSuffixManager` to manage `transactional.id` suffix.
The default implementation is `DefaultTransactionSuffixManager` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
Since 3.1, introduced a new interface `TransactionIdSuffixStrategy` to manage `transactional.id` suffix.
The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
When a transaction producer is requested and `transactional.id` all in use, throw a `NoProducerAvailableException`.
User can then use a RetryTemplate configured to retry that exception, with a suitably configured back off.

Expand All @@ -126,9 +126,9 @@ public static class Config {
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
DefaultTransactionSuffixManager sm = new DefaultTransactionSuffixManager();
sm.setMaxCache(5);
pf.setTransactionSuffixManager(sm);
DefaultTransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy();
ss.setMaxCache(5);
pf.setTransactionIdSuffixStrategy(ss);
return pf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ When manually assigning partitions, with a `null` consumer `group.id`, the `AckM
See xref:tips.adoc#tip-assign-all-parts[Manually Assigning All Partitions] for more information.

[[x31-dkpf]]
=== TransactionSuffixManager
=== TransactionIdSuffixStrategy

A new `TransactionSuffixManager` interface with `DefaultTransactionSuffixManager` implementation is provided to restrict `transaction.id` in range.
A new `TransactionIdSuffixStrategy` interface with `DefaultTransactionIdSuffixStrategy` implementation is provided to restrict `transaction.id` in range.
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private final AtomicBoolean running = new AtomicBoolean();

private TransactionSuffixManager transactionSuffixManager = new DefaultTransactionSuffixManager();
private TransactionIdSuffixStrategy transactionIdSuffixStrategy = new DefaultTransactionIdSuffixStrategy();

private Supplier<Serializer<K>> keySerializerSupplier;

Expand Down Expand Up @@ -351,12 +351,12 @@ public void setValueSerializerSupplier(Supplier<Serializer<V>> valueSerializerSu
}

/**
* Set the transaction suffix manager.
* @param transactionSuffixManager the manager.
* Set the transaction suffix strategy.
* @param transactionIdSuffixStrategy the strategy.
* @since 3.1.1
*/
public void setTransactionSuffixManager(TransactionSuffixManager transactionSuffixManager) {
this.transactionSuffixManager = transactionSuffixManager;
public void setTransactionIdSuffixStrategy(TransactionIdSuffixStrategy transactionIdSuffixStrategy) {
this.transactionIdSuffixStrategy = transactionIdSuffixStrategy;
}

/**
Expand Down Expand Up @@ -718,7 +718,7 @@ public void destroy() {
}
});
this.cache.clear();
this.transactionSuffixManager.reset();
this.transactionIdSuffixStrategy.reset();
this.threadBoundProducers.values().forEach(prod -> {
try {
prod.closeDelegate(this.physicalCloseTimeout, this.listeners);
Expand Down Expand Up @@ -856,7 +856,7 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
}
}
if (cachedProducer == null) {
String suffix = this.transactionSuffixManager.retrieveSuffix(txIdPrefix);
String suffix = this.transactionIdSuffixStrategy.acquireSuffix(txIdPrefix);
return doCreateTxProducer(txIdPrefix, suffix, this::cacheReturner);
}
else {
Expand Down Expand Up @@ -905,7 +905,7 @@ private void closeTransactionProducer(CloseSafeProducer<K, V> producer, Duration
producer.closeDelegate(timeout, listeners);
}
finally {
this.transactionSuffixManager.returnSuffix(producer.txIdPrefix, producer.txIdSuffix);
this.transactionIdSuffixStrategy.releaseSuffix(producer.txIdPrefix, producer.txIdSuffix);
}
}

Expand All @@ -925,7 +925,7 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
throw newEx; // NOSONAR - lost stack trace
}
finally {
this.transactionSuffixManager.returnSuffix(prefix, suffix);
this.transactionIdSuffixStrategy.releaseSuffix(prefix, suffix);
}
throw new KafkaException("initTransactions() failed", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.springframework.util.Assert;

/**
* The {@link TransactionSuffixManager} implementation for managing transactional producer suffixes.
* The {@link TransactionIdSuffixStrategy} implementation for managing transactional producer suffixes.
* If the {@link #maxCache} is greater than 0, the suffixes will be cached and reused.
* Otherwise, the suffixes will be generated on the fly.
*
Expand All @@ -39,7 +39,7 @@
*
* @since 3.1.1
*/
public class DefaultTransactionSuffixManager implements TransactionSuffixManager {
public class DefaultTransactionIdSuffixStrategy implements TransactionIdSuffixStrategy {

private final AtomicInteger transactionIdSuffix = new AtomicInteger();

Expand All @@ -55,7 +55,8 @@ public class DefaultTransactionSuffixManager implements TransactionSuffixManager
* @throws NoProducerAvailableException if caching is enabled and no suffixes are available.
*/
@Override
public String retrieveSuffix(String txIdPrefix) {
public String acquireSuffix(String txIdPrefix) {
Assert.notNull(txIdPrefix, "'txIdPrefix' must not be null");
BlockingQueue<String> cache = getSuffixCache(txIdPrefix);
if (cache == null) {
return String.valueOf(this.transactionIdSuffix.getAndIncrement());
Expand All @@ -69,8 +70,10 @@ public String retrieveSuffix(String txIdPrefix) {
}

@Override
public void returnSuffix(@Nullable String txIdPrefix, @Nullable String suffix) {
if (this.maxCache <= 0 || suffix == null) {
public void releaseSuffix(String txIdPrefix, String suffix) {
Assert.notNull(txIdPrefix, "'txIdPrefix' must not be null");
Assert.notNull(suffix, "'suffix' must not be null");
if (this.maxCache <= 0) {
return;
}
BlockingQueue<String> queue = getSuffixCache(txIdPrefix);
Expand All @@ -94,8 +97,8 @@ public void setMaxCache(int maxCache) {
}

@Nullable
private BlockingQueue<String> getSuffixCache(@Nullable String txIdPrefix) {
if (txIdPrefix == null || this.maxCache <= 0) {
private BlockingQueue<String> getSuffixCache(String txIdPrefix) {
if (this.maxCache <= 0) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,23 @@
*
* @since 3.1.1
*/
public interface TransactionSuffixManager {
public interface TransactionIdSuffixStrategy {

/**
* Retrieve the suffix for the transactional producer.
* Acquire the suffix for the transactional producer.
*
* @param txIdPrefix the transaction id prefix.
* @return the suffix.
*/
String retrieveSuffix(String txIdPrefix);
String acquireSuffix(String txIdPrefix);

/**
* Return the suffix for the transactional producer back for reuse.
* Release the suffix for the transactional producer.
*
* @param txIdPrefix the transaction id prefix.
* @param suffix the suffix.
*/
void returnSuffix(String txIdPrefix, String suffix);
void releaseSuffix(String txIdPrefix, String suffix);

/**
* Clear all suffixes for the transactional producer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,7 @@ else if (listener instanceof MessageListener) {
if (this.containerProperties.isLogContainerConfig()) {
this.logger.info(toString());
}
Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
ApplicationContext applicationContext = getApplicationContext();
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,9 +384,9 @@ public void testNestedTxProducerIsFixed() throws Exception {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
DefaultKafkaProducerFactory<Integer, String> pfTx = new DefaultKafkaProducerFactory<>(producerProps);
pfTx.setTransactionIdPrefix("fooTx.fixed.");
DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager();
tsManager.setMaxCache(3);
pfTx.setTransactionSuffixManager(tsManager);
DefaultTransactionIdSuffixStrategy suffixStrategy = new DefaultTransactionIdSuffixStrategy();
suffixStrategy.setMaxCache(3);
pfTx.setTransactionIdSuffixStrategy(suffixStrategy);
KafkaOperations<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache1FixedGroup", "false", this.embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Expand Down Expand Up @@ -419,7 +419,7 @@ public void testNestedTxProducerIsFixed() throws Exception {
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1);
assertThat(pfTx.getCache()).hasSize(1);
assertThat(KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class)).hasSize(1);
assertThat(KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class)).hasSize(1);
// 1 tm tx producer and 1 templateTx tx producer
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer")).isSameAs(wrapped.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ protected Producer createRawProducer(Map configs) {

};
pf.setTransactionIdPrefix("foo");
DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager();
tsManager.setMaxCache(2);
pf.setTransactionSuffixManager(tsManager);
DefaultTransactionIdSuffixStrategy suffixStrategy = new DefaultTransactionIdSuffixStrategy();
suffixStrategy.setMaxCache(2);
pf.setTransactionIdSuffixStrategy(suffixStrategy);

final AtomicInteger flag = new AtomicInteger();
willAnswer(i -> {
Expand Down Expand Up @@ -184,7 +184,7 @@ protected Producer createRawProducer(Map configs) {
assertThat(cache).hasSize(1);
Queue queue = (Queue) cache.get("foo");
assertThat(queue).hasSize(1);
Map<?, ?> suffixCache = KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class);
Map<?, ?> suffixCache = KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class);
assertThat(suffixCache).hasSize(1);
Queue suffixQueue = (Queue) suffixCache.get("foo");
assertThat(suffixQueue).hasSize(1);
Expand Down Expand Up @@ -310,9 +310,9 @@ protected Producer createRawProducer(Map configs) {
};
pf.setApplicationContext(ctx);
pf.setTransactionIdPrefix("foo");
DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager();
tsManager.setMaxCache(3);
pf.setTransactionSuffixManager(tsManager);
DefaultTransactionIdSuffixStrategy suffixStrategy = new DefaultTransactionIdSuffixStrategy();
suffixStrategy.setMaxCache(3);
pf.setTransactionIdSuffixStrategy(suffixStrategy);
Producer aProducer = pf.createProducer();
assertThat(aProducer).isNotNull();
aProducer.close();
Expand All @@ -324,7 +324,7 @@ protected Producer createRawProducer(Map configs) {
assertThat(cache.size()).isEqualTo(1);
Queue queue = (Queue) cache.get("foo");
assertThat(queue.size()).isEqualTo(1);
Map<?, ?> suffixCache = KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class);
Map<?, ?> suffixCache = KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class);
assertThat(suffixCache.size()).isEqualTo(1);
Queue suffixQueue = (Queue) suffixCache.get("foo");
assertThat(suffixQueue.size()).isEqualTo(2);
Expand Down Expand Up @@ -389,9 +389,9 @@ protected Producer createRawProducer(Map configs) {
};
pf.setApplicationContext(ctx);
pf.setTransactionIdPrefix("foo");
DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager();
tsManager.setMaxCache(3);
pf.setTransactionSuffixManager(tsManager);
DefaultTransactionIdSuffixStrategy suffixStrategy = new DefaultTransactionIdSuffixStrategy();
suffixStrategy.setMaxCache(3);
pf.setTransactionIdSuffixStrategy(suffixStrategy);
Producer aProducer = pf.createProducer();
assertThat(aProducer).isNotNull();
aProducer.close();
Expand All @@ -403,7 +403,7 @@ protected Producer createRawProducer(Map configs) {
assertThat(cache.size()).isEqualTo(1);
Queue queue = (Queue) cache.get("foo");
assertThat(queue.size()).isEqualTo(1);
Map<?, ?> suffixCache = KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class);
Map<?, ?> suffixCache = KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class);
assertThat(suffixCache.size()).isEqualTo(1);
Queue suffixQueue = (Queue) suffixCache.get("foo");
assertThat(suffixQueue.size()).isEqualTo(2);
Expand Down Expand Up @@ -552,10 +552,10 @@ protected Producer createRawProducer(Map configs) {

};
pf.setTransactionIdPrefix("foo");
DefaultTransactionSuffixManager tsManager = new DefaultTransactionSuffixManager();
tsManager.setMaxCache(2);
pf.setTransactionSuffixManager(tsManager);
Map<?, ?> suffixCache = KafkaTestUtils.getPropertyValue(tsManager, "suffixCache", Map.class);
DefaultTransactionIdSuffixStrategy suffixStrategy = new DefaultTransactionIdSuffixStrategy();
suffixStrategy.setMaxCache(2);
pf.setTransactionIdSuffixStrategy(suffixStrategy);
Map<?, ?> suffixCache = KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class);
pf.createProducer();
Queue queue = (Queue) suffixCache.get("foo");
assertThat(queue).hasSize(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,56 +33,56 @@
*
* @since 3.1.1
*/
class DefaultTransactionSuffixManagerTests {
class DefaultTransactionIdSuffixStrategyTests {

private DefaultTransactionSuffixManager manager;
private DefaultTransactionIdSuffixStrategy suffixStrategy;

@BeforeEach
void setUp() {
manager = new DefaultTransactionSuffixManager();
suffixStrategy = new DefaultTransactionIdSuffixStrategy();
}

@Test
void testRetrieveSuffixWithoutCache() {
String txIdPrefix = "txIdPrefix";
String suffix = manager.retrieveSuffix(txIdPrefix);
String suffix = suffixStrategy.acquireSuffix(txIdPrefix);
assertThat(suffix).isNotNull();
}

@Test
void testRetrieveSuffixWithCache() {
String txIdPrefix = "txIdPrefix";
String suffix = "suffix";
assertThatNoException().isThrownBy(() -> manager.returnSuffix(txIdPrefix, suffix));
assertThatNoException().isThrownBy(() -> suffixStrategy.releaseSuffix(txIdPrefix, suffix));
}


@Test
void testRetrieveSuffixWithCacheExhausted() {
String txIdPrefix = "txIdPrefix";
manager.setMaxCache(2);
String suffix1 = manager.retrieveSuffix(txIdPrefix);
String suffix2 = manager.retrieveSuffix(txIdPrefix);
assertThatExceptionOfType(NoProducerAvailableException.class).isThrownBy(() -> manager.retrieveSuffix(txIdPrefix));
suffixStrategy.setMaxCache(2);
String suffix1 = suffixStrategy.acquireSuffix(txIdPrefix);
String suffix2 = suffixStrategy.acquireSuffix(txIdPrefix);
assertThatExceptionOfType(NoProducerAvailableException.class).isThrownBy(() -> suffixStrategy.acquireSuffix(txIdPrefix));
}

@Test
void testReturnSuffixWithCache() {
String txIdPrefix = "txIdPrefix";
manager.setMaxCache(2);
String suffix = manager.retrieveSuffix(txIdPrefix);
assertThatNoException().isThrownBy(() -> manager.returnSuffix(txIdPrefix, suffix));
suffixStrategy.setMaxCache(2);
String suffix = suffixStrategy.acquireSuffix(txIdPrefix);
assertThatNoException().isThrownBy(() -> suffixStrategy.releaseSuffix(txIdPrefix, suffix));
}

@SuppressWarnings("rawtypes")
@Test
void testReturnAllSuffixesWithCache() {
String txIdPrefix = "txIdPrefix";
manager.setMaxCache(2);
String suffix1 = manager.retrieveSuffix(txIdPrefix);
String suffix2 = manager.retrieveSuffix(txIdPrefix);
assertThatNoException().isThrownBy(() -> manager.returnSuffix(txIdPrefix, suffix1));
Map<?, ?> suffixCache = KafkaTestUtils.getPropertyValue(manager, "suffixCache", Map.class);
suffixStrategy.setMaxCache(2);
String suffix1 = suffixStrategy.acquireSuffix(txIdPrefix);
String suffix2 = suffixStrategy.acquireSuffix(txIdPrefix);
assertThatNoException().isThrownBy(() -> suffixStrategy.releaseSuffix(txIdPrefix, suffix1));
Map<?, ?> suffixCache = KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class);
assertThat(suffixCache).hasSize(1);
Queue queue = (Queue) suffixCache.get(txIdPrefix);
assertThat(queue).hasSize(1);
Expand All @@ -92,22 +92,22 @@ void testReturnAllSuffixesWithCache() {
@Test
void testReset() {
String txIdPrefix = "txIdPrefix";
manager.setMaxCache(2);
String suffix1 = manager.retrieveSuffix(txIdPrefix);
String suffix2 = manager.retrieveSuffix(txIdPrefix);
assertThatNoException().isThrownBy(() -> manager.returnSuffix(txIdPrefix, suffix1));
manager.reset();
Map<?, ?> suffixCache = KafkaTestUtils.getPropertyValue(manager, "suffixCache", Map.class);
suffixStrategy.setMaxCache(2);
String suffix1 = suffixStrategy.acquireSuffix(txIdPrefix);
String suffix2 = suffixStrategy.acquireSuffix(txIdPrefix);
assertThatNoException().isThrownBy(() -> suffixStrategy.releaseSuffix(txIdPrefix, suffix1));
suffixStrategy.reset();
Map<?, ?> suffixCache = KafkaTestUtils.getPropertyValue(suffixStrategy, "suffixCache", Map.class);
assertThat(suffixCache).hasSize(0);
}

@Test
void testSetMaxCacheIsNegative() {
assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> manager.setMaxCache(-1));
assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> suffixStrategy.setMaxCache(-1));
}

@Test
void testSetMaxCacheIsZero() {
assertThatNoException().isThrownBy(() -> manager.setMaxCache(0));
assertThatNoException().isThrownBy(() -> suffixStrategy.setMaxCache(0));
}
}
Loading

0 comments on commit defa439

Please sign in to comment.