From e466798b89aa64a3a4d572261fcb5b904727f5ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Fri, 12 Jan 2024 16:13:26 +0200 Subject: [PATCH] Topic validation is using a kafka producer per shard. --- CHANGELOG.md | 8 ++ gradle.properties | 2 +- .../kafka/tkms/TkmsTopicValidator.java | 93 ++++++++++++------- .../tkms/TransactionalKafkaMessageSender.java | 2 +- .../config/ITkmsKafkaProducerProvider.java | 2 +- .../config/TkmsKafkaProducerProvider.java | 4 +- .../kafka/tkms/config/TkmsProperties.java | 6 ++ .../src/test/resources/application.yml | 4 + 8 files changed, 83 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c92c916..2db0d36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.28.1] - 2024-01-12 + +### Fixed + +- Topic validator is using a Kafka producer per shard. + This would ensure, that a right Kafka settings are used. + As different shards can have a different Kafka server behind them. + ## [0.28.0] - 2023-12-20 ### Changed diff --git a/gradle.properties b/gradle.properties index 0f84cfd..1f3888a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.28.0 +version=0.28.1 diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java index af0442a..ce7cea8 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java @@ -18,13 +18,15 @@ import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics; import java.time.Duration; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Phaser; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -80,44 +82,66 @@ public void afterPropertiesSet() { @Override public void preValidateAll() { - final var topics = tkmsProperties.getTopics(); final var semaphore = new Semaphore(tkmsProperties.getTopicValidation().getValidationConcurrencyAtInitialization()); final var failures = new AtomicInteger(); - final var countDownLatch = new CountDownLatch(topics.size()); + final var phaser = new Phaser(1); final var startTimeEpochMs = System.currentTimeMillis(); - for (var topic : topics) { - topicsValidatedDuringInitializationOrNotified.put(topic, Boolean.TRUE); - final var timeoutMs = - tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs; - if (ExceptionUtils.doUnchecked(() -> semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS))) { - /* - We are validating one by one, to get the proper error messages from Kafka. - And, we are doing it concurrently to speed things up. - */ - executor.execute(() -> { - try { - validate(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), topic, null); - - log.info("Topic '{}' successfully pre-validated.", topic); - } catch (Throwable t) { - log.error("Topic validation for '" + topic + "' failed.", t); - failures.incrementAndGet(); - } finally { - countDownLatch.countDown(); - semaphore.release(); - } - }); - } else { - break; + for (int shard = 0; shard < tkmsProperties.getShardsCount(); shard++) { + var shardProperties = tkmsProperties.getShards().get(shard); + List shardTopics = shardProperties == null ? null : shardProperties.getTopics(); + + if (shardTopics != null && shard == tkmsProperties.getDefaultShard()) { + throw new IllegalStateException("Topics for default shard have to be specified on 'tw-tkms.topics' property."); + } + + if (shard == tkmsProperties.getDefaultShard()) { + shardTopics = tkmsProperties.getTopics(); + } + + if (shardTopics == null) { + continue; + } + + for (var topic : shardTopics) { + topicsValidatedDuringInitializationOrNotified.put(topic, Boolean.TRUE); + final var timeoutMs = + tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs; + if (ExceptionUtils.doUnchecked(() -> semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS))) { + + final var finalShard = shard; + /* + We are validating one by one, to get the proper error messages from Kafka. + And, we are doing it concurrently to speed things up. + */ + phaser.register(); + executor.execute(() -> { + try { + validate(TkmsShardPartition.of(finalShard, 0), topic, null); + + log.info("Topic '{}' successfully pre-validated.", topic); + } catch (Throwable t) { + log.error("Topic validation for '" + topic + "' failed.", t); + failures.incrementAndGet(); + } finally { + phaser.arriveAndDeregister(); + semaphore.release(); + } + }); + } else { + break; + } } } final var timeoutMs = tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs; - if (!ExceptionUtils.doUnchecked(() -> countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS))) { + int phase = phaser.arrive(); + try { + phaser.awaitAdvanceInterruptibly(phase, timeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException | TimeoutException e) { tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(); throw new IllegalStateException("Topic validation is taking too long."); } @@ -133,7 +157,7 @@ public void validate(TkmsShardPartition shardPartition, String topic, Integer pa if (tkmsProperties.getTopicValidation().isUseAdminClient()) { validateUsingAdmin(shardPartition, topic, partition); } else { - validateUsingProducer(topic); + validateUsingProducer(shardPartition, topic); } } @@ -146,7 +170,10 @@ protected void validateUsingAdmin(TkmsShardPartition shardPartition, String topi return Boolean.TRUE; }); - var response = topicDescriptionsCache.get(new FetchTopicDescriptionRequest().setTopic(topic)); + var response = topicDescriptionsCache.get(new FetchTopicDescriptionRequest() + .setTopic(topic) + .setShardPartition(shardPartition) + ); if (response == null) { throw new NullPointerException("Could not fetch topic description for topic '" + topic + "'."); @@ -185,8 +212,8 @@ protected void validateUsingAdmin(TkmsShardPartition shardPartition, String topi Legacy logic. We keep it in, in case some service would run into issues with the admin client based validation. */ - protected void validateUsingProducer(String topic) { - tkmsKafkaProducerProvider.getKafkaProducerForTopicValidation().partitionsFor(topic); + protected void validateUsingProducer(TkmsShardPartition shardPartition, String topic) { + tkmsKafkaProducerProvider.getKafkaProducerForTopicValidation(shardPartition).partitionsFor(topic); } protected FetchTopicDescriptionResponse fetchTopicDescription(FetchTopicDescriptionRequest request) { @@ -197,7 +224,7 @@ protected FetchTopicDescriptionResponse fetchTopicDescription(FetchTopicDescript && tkmsProperties.getTopicValidation().isTryToAutoCreateTopics()) { final var topic = request.getTopic(); try { - validateUsingProducer(topic); + validateUsingProducer(request.getShardPartition(), topic); log.info("Succeeded in auto creating topic `{}`", topic); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java index 067b8b0..131351a 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java @@ -519,7 +519,7 @@ protected int getPartition(int shard, TkmsMessage message) { @Data @Accessors(chain = true) protected static class FetchTopicDescriptionRequest { - + private TkmsShardPartition shardPartition; private String topic; } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java index b4c0717..d98aa53 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java @@ -7,7 +7,7 @@ public interface ITkmsKafkaProducerProvider { KafkaProducer getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); - KafkaProducer getKafkaProducerForTopicValidation(); + KafkaProducer getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition); void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java index 101fdcb..7662c56 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java @@ -82,8 +82,8 @@ public KafkaProducer getKafkaProducer(TkmsShardPartition shardPa } @Override - public KafkaProducer getKafkaProducerForTopicValidation() { - return getKafkaProducer(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), UseCase.TOPIC_VALIDATION); + public KafkaProducer getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition) { + return getKafkaProducer(TkmsShardPartition.of(shardPartition.getShard(), 0), UseCase.TOPIC_VALIDATION); } @Override diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java index 65cf8fc..a2c7468 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java @@ -196,6 +196,8 @@ public void afterPropertiesSet() { *

It is not mandatory, but it allows to do some pre validation and prevent the service starting when something is wrong. * *

Also, so we can warm up their metadata, avoiding elevated latencies at the start of the service. + * + *

You may want to list some topics under a specific shard configuration. If those use a different Kafka Server or ACLs or else. */ @ResolvedValue @LegacyResolvedValue @@ -315,6 +317,10 @@ public static class ShardProperties { @ResolvedValue @LegacyResolvedValue private Map kafka = new HashMap<>(); + + @ResolvedValue + @LegacyResolvedValue + private List topics = new ArrayList<>(); } public boolean isValidateSerialization(int shard) { diff --git a/tw-tkms-starter/src/test/resources/application.yml b/tw-tkms-starter/src/test/resources/application.yml index 1cc2ad3..ab2b794 100644 --- a/tw-tkms-starter/src/test/resources/application.yml +++ b/tw-tkms-starter/src/test/resources/application.yml @@ -43,6 +43,8 @@ tw-tkms: ms: 7 earliest-visible-messages: enabled: true + topics: + - TestTopic compression: algorithm: random min-size: 10 @@ -81,6 +83,8 @@ tw-tkms: delete-batch-sizes: "51, 11, 5, 1" topic-validation: use-admin-client: true + topics: + - TestTopicPostgres # We will not pre-create the topic in order to test the tw-tkms-test: