Skip to content

Commit

Permalink
GH-2852: support fixed transaction id suffix
Browse files Browse the repository at this point in the history
Introducing a new interface for generating transactionIdSuffix with a default increment implementation.

Introducing a new interface for return the current transactionIdSuffix.

Resolves #2852
  • Loading branch information
Zhiyang.Wang1 committed Nov 27, 2023
1 parent d30de4e commit 90b6ea9
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,15 @@ public final void setTransactionIdPrefix(String transactionIdPrefix) {
return this.transactionIdPrefix;
}

public final boolean initTransactionIdSuffix(int transactionIdSuffix) {
Assert.isTrue(transactionIdSuffix >= 0, "'transactionIdSuffix' initial value must greater than or equal 0");
return this.transactionIdSuffix.compareAndSet(0, transactionIdSuffix);
}

public int getCurrTransactionIdSuffix() {
return this.transactionIdSuffix.get();
}

/**
* Set to true to create a producer per thread instead of singleton that is shared by
* all clients. Clients <b>must</b> call {@link #closeThreadBoundProducer()} to
Expand Down Expand Up @@ -498,7 +507,7 @@ public List<ProducerPostProcessor<K, V>> getPostProcessors() {
* @since 3.1
*/
public void setMaxCache(int maxCache) {
Assert.isTrue(maxCache >= 0, "max cache must greater than or equal 0");
Assert.isTrue(maxCache >= 0, "'maxCache' must greater than or equal 0");
this.maxCache = maxCache;
}

Expand Down Expand Up @@ -868,7 +877,7 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
() -> "No suffix cache found for " + txIdPrefix + ", max cache" + this.maxCache);
suffix = suffixQueue.poll();
if (suffix == null) {
throw new NoProducerAvailableException("No transaction producer available for " + txIdPrefix);
throw new NoProducerAvailableException("No available transaction producer suffix for " + txIdPrefix);
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,39 @@ protected Producer createRawProducer(Map configs) {
pf.destroy();
}

@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
void testTransactionId() throws InterruptedException {
final Producer producer = mock(Producer.class);
final Map<String, Object> configPassedToKafkaConsumer = new HashMap<>();
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

@Override
protected Producer createRawProducer(Map configs) {
configPassedToKafkaConsumer.clear();
configPassedToKafkaConsumer.putAll(configs);
return producer;
}

};
pf.setBootstrapServersSupplier(() -> "foo");
pf.setTransactionIdPrefix("tx.");
pf.setMaxCache(2);
assertThat(pf.initTransactionIdSuffix(10)).isTrue();
Producer aProducer = pf.createProducer();
assertThat(configPassedToKafkaConsumer.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).isEqualTo("tx.10");
assertThat(pf.initTransactionIdSuffix(20)).isFalse();
Producer bProducer = pf.createProducer();
assertThat(configPassedToKafkaConsumer.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).isEqualTo("tx.11");
bProducer.close(Duration.ofSeconds(20));
pf.setMaxAge(Duration.ofMillis(10));
Thread.sleep(50);
Producer cProducer = pf.createProducer();
assertThat(configPassedToKafkaConsumer.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).isEqualTo("tx.11");
assertThat(pf.getCurrTransactionIdSuffix()).isEqualTo(12);
}


@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void configUpdates() {
Expand Down

0 comments on commit 90b6ea9

Please sign in to comment.