Skip to content

Commit

Permalink
Topic validation is using a kafka producer per shard.
Browse files Browse the repository at this point in the history
  • Loading branch information
onukristo committed Jan 12, 2024
1 parent 9e16673 commit e466798
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 38 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.28.1] - 2024-01-12

### Fixed

- Topic validator is using a Kafka producer per shard.
This would ensure, that a right Kafka settings are used.
As different shards can have a different Kafka server behind them.

## [0.28.0] - 2023-12-20

### Changed
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.28.0
version=0.28.1
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -80,44 +82,66 @@ public void afterPropertiesSet() {

@Override
public void preValidateAll() {
final var topics = tkmsProperties.getTopics();

final var semaphore = new Semaphore(tkmsProperties.getTopicValidation().getValidationConcurrencyAtInitialization());
final var failures = new AtomicInteger();
final var countDownLatch = new CountDownLatch(topics.size());
final var phaser = new Phaser(1);
final var startTimeEpochMs = System.currentTimeMillis();

for (var topic : topics) {
topicsValidatedDuringInitializationOrNotified.put(topic, Boolean.TRUE);
final var timeoutMs =
tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;
if (ExceptionUtils.doUnchecked(() -> semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS))) {
/*
We are validating one by one, to get the proper error messages from Kafka.
And, we are doing it concurrently to speed things up.
*/
executor.execute(() -> {
try {
validate(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), topic, null);

log.info("Topic '{}' successfully pre-validated.", topic);
} catch (Throwable t) {
log.error("Topic validation for '" + topic + "' failed.", t);
failures.incrementAndGet();
} finally {
countDownLatch.countDown();
semaphore.release();
}
});
} else {
break;
for (int shard = 0; shard < tkmsProperties.getShardsCount(); shard++) {
var shardProperties = tkmsProperties.getShards().get(shard);
List<String> shardTopics = shardProperties == null ? null : shardProperties.getTopics();

if (shardTopics != null && shard == tkmsProperties.getDefaultShard()) {
throw new IllegalStateException("Topics for default shard have to be specified on 'tw-tkms.topics' property.");
}

if (shard == tkmsProperties.getDefaultShard()) {
shardTopics = tkmsProperties.getTopics();
}

if (shardTopics == null) {
continue;
}

for (var topic : shardTopics) {
topicsValidatedDuringInitializationOrNotified.put(topic, Boolean.TRUE);
final var timeoutMs =
tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;
if (ExceptionUtils.doUnchecked(() -> semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS))) {

final var finalShard = shard;
/*
We are validating one by one, to get the proper error messages from Kafka.
And, we are doing it concurrently to speed things up.
*/
phaser.register();
executor.execute(() -> {
try {
validate(TkmsShardPartition.of(finalShard, 0), topic, null);

log.info("Topic '{}' successfully pre-validated.", topic);
} catch (Throwable t) {
log.error("Topic validation for '" + topic + "' failed.", t);
failures.incrementAndGet();
} finally {
phaser.arriveAndDeregister();
semaphore.release();
}
});
} else {
break;
}
}
}

final var timeoutMs =
tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;

if (!ExceptionUtils.doUnchecked(() -> countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS))) {
int phase = phaser.arrive();
try {
phaser.awaitAdvanceInterruptibly(phase, timeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
throw new IllegalStateException("Topic validation is taking too long.");
}
Expand All @@ -133,7 +157,7 @@ public void validate(TkmsShardPartition shardPartition, String topic, Integer pa
if (tkmsProperties.getTopicValidation().isUseAdminClient()) {
validateUsingAdmin(shardPartition, topic, partition);
} else {
validateUsingProducer(topic);
validateUsingProducer(shardPartition, topic);
}
}

Expand All @@ -146,7 +170,10 @@ protected void validateUsingAdmin(TkmsShardPartition shardPartition, String topi
return Boolean.TRUE;
});

var response = topicDescriptionsCache.get(new FetchTopicDescriptionRequest().setTopic(topic));
var response = topicDescriptionsCache.get(new FetchTopicDescriptionRequest()
.setTopic(topic)
.setShardPartition(shardPartition)
);

if (response == null) {
throw new NullPointerException("Could not fetch topic description for topic '" + topic + "'.");
Expand Down Expand Up @@ -185,8 +212,8 @@ protected void validateUsingAdmin(TkmsShardPartition shardPartition, String topi
Legacy logic.
We keep it in, in case some service would run into issues with the admin client based validation.
*/
protected void validateUsingProducer(String topic) {
tkmsKafkaProducerProvider.getKafkaProducerForTopicValidation().partitionsFor(topic);
protected void validateUsingProducer(TkmsShardPartition shardPartition, String topic) {
tkmsKafkaProducerProvider.getKafkaProducerForTopicValidation(shardPartition).partitionsFor(topic);
}

protected FetchTopicDescriptionResponse fetchTopicDescription(FetchTopicDescriptionRequest request) {
Expand All @@ -197,7 +224,7 @@ protected FetchTopicDescriptionResponse fetchTopicDescription(FetchTopicDescript
&& tkmsProperties.getTopicValidation().isTryToAutoCreateTopics()) {
final var topic = request.getTopic();
try {
validateUsingProducer(topic);
validateUsingProducer(request.getShardPartition(), topic);

log.info("Succeeded in auto creating topic `{}`", topic);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ protected int getPartition(int shard, TkmsMessage message) {
@Data
@Accessors(chain = true)
protected static class FetchTopicDescriptionRequest {

private TkmsShardPartition shardPartition;
private String topic;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public interface ITkmsKafkaProducerProvider {

KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);

KafkaProducer<String, byte[]> getKafkaProducerForTopicValidation();
KafkaProducer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition);

void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition shardPa
}

@Override
public KafkaProducer<String, byte[]> getKafkaProducerForTopicValidation() {
return getKafkaProducer(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), UseCase.TOPIC_VALIDATION);
public KafkaProducer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition) {
return getKafkaProducer(TkmsShardPartition.of(shardPartition.getShard(), 0), UseCase.TOPIC_VALIDATION);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ public void afterPropertiesSet() {
* <p>It is not mandatory, but it allows to do some pre validation and prevent the service starting when something is wrong.
*
* <p>Also, so we can warm up their metadata, avoiding elevated latencies at the start of the service.
*
* <p>You may want to list some topics under a specific shard configuration. If those use a different Kafka Server or ACLs or else.
*/
@ResolvedValue
@LegacyResolvedValue
Expand Down Expand Up @@ -315,6 +317,10 @@ public static class ShardProperties {
@ResolvedValue
@LegacyResolvedValue
private Map<String, String> kafka = new HashMap<>();

@ResolvedValue
@LegacyResolvedValue
private List<String> topics = new ArrayList<>();
}

public boolean isValidateSerialization(int shard) {
Expand Down
4 changes: 4 additions & 0 deletions tw-tkms-starter/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ tw-tkms:
ms: 7
earliest-visible-messages:
enabled: true
topics:
- TestTopic
compression:
algorithm: random
min-size: 10
Expand Down Expand Up @@ -81,6 +83,8 @@ tw-tkms:
delete-batch-sizes: "51, 11, 5, 1"
topic-validation:
use-admin-client: true
topics:
- TestTopicPostgres

# We will not pre-create the topic in order to test the
tw-tkms-test:
Expand Down

0 comments on commit e466798

Please sign in to comment.