diff --git a/CHANGELOG.md b/CHANGELOG.md index a152613..6861dec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.27.0] - 2023-12-20 + +### Changed + +- Topic validation can now be done via Kafka Admin, instead of Kafka Producer. + The new logic is under a feature flag, until it gets more battle tested. + ## [0.26.0] - 2023-12-14 ### Removed diff --git a/build.libraries.gradle b/build.libraries.gradle index 9406fd0..f48afef 100644 --- a/build.libraries.gradle +++ b/build.libraries.gradle @@ -27,6 +27,7 @@ ext { zstdJni : 'com.github.luben:zstd-jni:1.5.2-1', // versions managed by spring-boot-dependencies platform + caffeine : "com.github.ben-manes.caffeine:caffeine", commonsLang3 : 'org.apache.commons:commons-lang3', flywayCore : 'org.flywaydb:flyway-core', flywayMysql : 'org.flywaydb:flyway-mysql', diff --git a/gradle.properties b/gradle.properties index 6f6dd85..e80baea 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.26.0 +version=0.27.0 diff --git a/tw-tkms-starter/build.gradle b/tw-tkms-starter/build.gradle index 689ceee..7b70c90 100644 --- a/tw-tkms-starter/build.gradle +++ b/tw-tkms-starter/build.gradle @@ -24,6 +24,7 @@ dependencies { compileOnly libraries.javaxValidationApi compileOnly libraries.jakartaValidationApi + implementation libraries.caffeine implementation libraries.commonsLang3 implementation libraries.curatorRecipes implementation libraries.guava @@ -121,7 +122,7 @@ shadowJar { attributes 'Implementation-Version': "$project.version" } relocate('com.google.protobuf', 'com.transferwise.kafka.tkms.shadow.com.google.protobuf') - + // Minimize does not reduce the jar much (1.9->1.5 MB), so let's not risk/mess with that. /* minimize {} @@ -206,7 +207,7 @@ publishing { sign publishing.publications.twTkmsStarter } } - + repositories { maven { url System.getenv("MAVEN_URL") diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/ITkmsTopicValidator.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/ITkmsTopicValidator.java new file mode 100644 index 0000000..b770fa5 --- /dev/null +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/ITkmsTopicValidator.java @@ -0,0 +1,10 @@ +package com.transferwise.kafka.tkms; + +import com.transferwise.kafka.tkms.api.TkmsShardPartition; + +public interface ITkmsTopicValidator { + + void preValidateAll(); + + void validate(TkmsShardPartition tkmsShardPartition, String topic, Integer partition); +} diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java new file mode 100644 index 0000000..c94d1f2 --- /dev/null +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java @@ -0,0 +1,214 @@ +package com.transferwise.kafka.tkms; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.transferwise.common.baseutils.ExceptionUtils; +import com.transferwise.common.baseutils.concurrency.IExecutorServicesProvider; +import com.transferwise.common.baseutils.concurrency.ThreadNamingExecutorServiceWrapper; +import com.transferwise.kafka.tkms.TransactionalKafkaMessageSender.FetchTopicDescriptionRequest; +import com.transferwise.kafka.tkms.TransactionalKafkaMessageSender.FetchTopicDescriptionResponse; +import com.transferwise.kafka.tkms.api.TkmsShardPartition; +import com.transferwise.kafka.tkms.config.ITkmsKafkaAdminProvider; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; +import com.transferwise.kafka.tkms.config.TkmsProperties; +import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationLevel; +import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationType; +import com.transferwise.kafka.tkms.metrics.ITkmsMetricsTemplate; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics; +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.admin.DescribeTopicsOptions; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; + +@Slf4j +public class TkmsTopicValidator implements ITkmsTopicValidator, InitializingBean { + + @Autowired + private IExecutorServicesProvider executorServicesProvider; + + @Autowired + private MeterRegistry meterRegistry; + + @Autowired + private TkmsProperties tkmsProperties; + + @Autowired + protected ITkmsKafkaAdminProvider tkmsKafkaAdminProvider; + + @Autowired + protected IProblemNotifier problemNotifier; + + @Autowired + protected ITkmsKafkaProducerProvider tkmsKafkaProducerProvider; + + @Autowired + protected ITkmsMetricsTemplate tkmsMetricsTemplate; + + private LoadingCache topicDescriptionsCache; + + private final Map topicsValidatedDuringInitializationOrNotified = new ConcurrentHashMap<>(); + + private ExecutorService executor; + + public void afterPropertiesSet() { + this.executor = new ThreadNamingExecutorServiceWrapper("tw-tkms-td-cache", executorServicesProvider.getGlobalExecutorService()); + topicDescriptionsCache = Caffeine.newBuilder() + .maximumSize(10_000) + .executor(executor) + .expireAfterWrite(Duration.ofMinutes(5)) + .refreshAfterWrite(Duration.ofSeconds(30)) + .recordStats() + .build(this::fetchTopicDescription); + + CaffeineCacheMetrics.monitor(meterRegistry, topicDescriptionsCache, "tkmsTopicDescriptions"); + } + + @Override + public void preValidateAll() { + final var topics = tkmsProperties.getTopics(); + + final var semaphore = new Semaphore(tkmsProperties.getAdminClientTopicsValidationConcurrency()); + final var failures = new AtomicInteger(); + final var countDownLatch = new CountDownLatch(topics.size()); + final var startTimeEpochMs = System.currentTimeMillis(); + + for (var topic : topics) { + topicsValidatedDuringInitializationOrNotified.put(topic, Boolean.TRUE); + final var timeoutMs = tkmsProperties.getInternals().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs; + if (ExceptionUtils.doUnchecked(() -> semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS))) { + /* + We are validating one by one, to get the proper error messages from Kafka. + And, we are doing it concurrently to speed things up. + */ + executor.execute(() -> { + try { + validate(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), topic, null); + + log.info("Topic '{}' successfully pre-validated.", topic); + } catch (Throwable t) { + log.error("Topic validation for '" + topic + "' failed.", t); + failures.incrementAndGet(); + } finally { + countDownLatch.countDown(); + semaphore.release(); + } + }); + } else { + break; + } + } + + final var timeoutMs = tkmsProperties.getInternals().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs; + + if (!ExceptionUtils.doUnchecked(() -> countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS))) { + tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(); + throw new IllegalStateException("Topic validation is taking too long."); + } + + if (failures.get() > 0) { + tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(); + throw new IllegalStateException("There were failures with topics validations. Refusing to start."); + } + } + + @Override + public void validate(TkmsShardPartition shardPartition, String topic, Integer partition) { + if (tkmsProperties.isUseAdminClientForTopicsValidation()) { + validateUsingAdmin(shardPartition, topic, partition); + } else { + validateUsingProducer(topic); + } + } + + protected void validateUsingAdmin(TkmsShardPartition shardPartition, String topic, Integer partition) { + topicsValidatedDuringInitializationOrNotified.computeIfAbsent(topic, k -> { + problemNotifier.notify(shardPartition.getShard(), NotificationType.TOPIC_NOT_VALIDATED_AT_INIT, NotificationLevel.WARN, () -> + "Topic '" + topic + "' was not validated during initialization. This can introduce some lag." + + " Please specify all the topics this service is using in the Tkms property of 'topics'." + ); + return Boolean.TRUE; + }); + + var response = topicDescriptionsCache.get(new FetchTopicDescriptionRequest().setTopic(topic)); + + if (response == null) { + throw new NullPointerException("Could not fetch topic description for topic '" + topic + "'."); + } + + if (response.getThrowable() != null) { + String message; + if (response.getThrowable() instanceof UnknownTopicOrPartitionException) { + message = "Topic '" + topic + "' does not exist."; + } else { + message = "Topic validation for '" + topic + "' failed."; + } + throw new IllegalStateException(message, response.getThrowable()); + } + + final var topicDescription = response.getTopicDescription(); + + final var aclOperations = topicDescription.authorizedOperations(); + if (aclOperations == null || aclOperations.isEmpty()) { + tkmsMetricsTemplate.registerNoAclOperationsFetched(shardPartition, topic); + } else if (!aclOperations.contains(AclOperation.ALL) + && !aclOperations.contains(AclOperation.WRITE) + ) { + throw new IllegalStateException("The service does not have any ACLs of ALL/WRITE/IDEMPOTENT_WRITE on topic '" + topic + "'." + + " The ACLs available are '" + StringUtils.join(aclOperations, ",") + "'."); + } + + if (partition != null) { + if (topicDescription.partitions().size() < partition - 1) { + throw new IllegalStateException("Kafka partition " + partition + " does not exist for topic '" + topic + "'."); + } + } + } + + /* + Legacy logic. + We keep it in, in case some service would run into issues with the admin client based validation. + */ + protected void validateUsingProducer(String topic) { + tkmsKafkaProducerProvider.getKafkaProducerForTopicValidation().partitionsFor(topic); + } + + protected FetchTopicDescriptionResponse fetchTopicDescription(FetchTopicDescriptionRequest request) { + final var topic = request.getTopic(); + TopicDescription topicDescription = null; + + Throwable throwable = null; + + try { + topicDescription = tkmsKafkaAdminProvider.getKafkaAdmin().describeTopics(Collections.singleton(topic), + new DescribeTopicsOptions().includeAuthorizedOperations(true)) + .allTopicNames().get(30, TimeUnit.SECONDS).get(topic); + } catch (Throwable t) { + if (t instanceof ExecutionException) { + throwable = t.getCause(); + } else { + throwable = t; + } + } + + if (throwable != null) { + return new FetchTopicDescriptionResponse().setThrowable(throwable); + } + + return new FetchTopicDescriptionResponse().setTopicDescription(topicDescription); + } +} diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java index a76f1b8..067b8b0 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java @@ -13,7 +13,6 @@ import com.transferwise.kafka.tkms.api.TkmsMessage.Header; import com.transferwise.kafka.tkms.api.TkmsShardPartition; import com.transferwise.kafka.tkms.config.ITkmsDaoProvider; -import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; import com.transferwise.kafka.tkms.config.TkmsProperties; import com.transferwise.kafka.tkms.config.TkmsProperties.DatabaseDialect; import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationLevel; @@ -25,7 +24,10 @@ import java.util.HashSet; import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import lombok.Data; +import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.TopicDescription; import org.slf4j.MDC; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; @@ -53,26 +55,22 @@ protected void setTkmsDaoProvider(ITkmsDaoProvider tkmsDaoProvider) { @Autowired private ApplicationContext applicationContext; @Autowired - private ITkmsKafkaProducerProvider kafkaProducerProvider; - @Autowired private IEnvironmentValidator environmentValidator; @Autowired private ITransactionsHelper transactionsHelper; @Autowired private IProblemNotifier problemNotifier; + @Autowired + private ITkmsTopicValidator tkmsTopicValidator; private volatile List tkmsEventsListeners; private RateLimiter errorLogRateLimiter = RateLimiter.create(2); @Override public void afterPropertiesSet() { - Assertions.setLevel(properties.getInternals().getAssertionLevel()); - environmentValidator.validate(); - for (String topic : properties.getTopics()) { - validateTopic(topic); - } + tkmsTopicValidator.preValidateAll(); validateDeleteBatchSizes(); @@ -168,7 +166,9 @@ public SendMessagesResult sendMessages(SendMessagesRequest request) { var topic = tkmsMessage.getTopic(); if (!validatedTopics.contains(topic)) { - validateTopic(topic); + tkmsTopicValidator.validate(shardPartition, topic, tkmsMessage.getPartition()); + + // TODO: Remove when we leave only admin client based validations in. validatedTopics.add(topic); } @@ -264,7 +264,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) { validateMessageSize(message, 0); var topic = message.getTopic(); - validateTopic(topic); + tkmsTopicValidator.validate(shardPartition, topic, message.getPartition()); if (deferMessageRegistrationUntilCommit) { // Transaction is guaranteed to be active here. @@ -371,13 +371,6 @@ public void afterCompletion(int status) { } } - /** - * Every call to normal `KafkaProducer.send()` uses metadata for a topic as well, so should be very fast. - */ - protected void validateTopic(String topic) { - kafkaProducerProvider.getKafkaProducerForTopicValidation().partitionsFor(topic); - } - protected void validateMessages(SendMessagesRequest request) { for (int i = 0; i < request.getTkmsMessages().size(); i++) { var tkmsMessage = request.getTkmsMessages().get(i); @@ -522,4 +515,20 @@ protected int getPartition(int shard, TkmsMessage message) { return ThreadLocalRandom.current().nextInt(tablesCount); } + + @Data + @Accessors(chain = true) + protected static class FetchTopicDescriptionRequest { + + private String topic; + } + + @Data + @Accessors(chain = true) + protected static class FetchTopicDescriptionResponse { + + private Throwable throwable; + + private TopicDescription topicDescription; + } } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaAdminProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaAdminProvider.java new file mode 100644 index 0000000..e533be4 --- /dev/null +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaAdminProvider.java @@ -0,0 +1,11 @@ +package com.transferwise.kafka.tkms.config; + +import org.apache.kafka.clients.admin.Admin; + +public interface ITkmsKafkaAdminProvider { + + Admin getKafkaAdmin(); + + void closeKafkaAdmin(); + +} diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsConfiguration.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsConfiguration.java index 08978ed..dfa87d2 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsConfiguration.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsConfiguration.java @@ -7,6 +7,7 @@ import com.transferwise.kafka.tkms.ITkmsInterrupterService; import com.transferwise.kafka.tkms.ITkmsPaceMaker; import com.transferwise.kafka.tkms.ITkmsStorageToKafkaProxy; +import com.transferwise.kafka.tkms.ITkmsTopicValidator; import com.transferwise.kafka.tkms.ITkmsZookeeperOperations; import com.transferwise.kafka.tkms.JavaxValidationEnvironmentValidator; import com.transferwise.kafka.tkms.ProblemNotifier; @@ -14,6 +15,7 @@ import com.transferwise.kafka.tkms.TkmsMessageInterceptors; import com.transferwise.kafka.tkms.TkmsPaceMaker; import com.transferwise.kafka.tkms.TkmsStorageToKafkaProxy; +import com.transferwise.kafka.tkms.TkmsTopicValidator; import com.transferwise.kafka.tkms.TkmsZookeeperOperations; import com.transferwise.kafka.tkms.TransactionalKafkaMessageSender; import com.transferwise.kafka.tkms.api.ITkmsMessageInterceptors; @@ -178,4 +180,16 @@ public ProblemNotifier tkmsProblemNotifier() { public TkmsInterrupterService tkmsInterrupterService() { return new TkmsInterrupterService(); } + + @Bean + @ConditionalOnMissingBean(ITkmsKafkaAdminProvider.class) + public ITkmsKafkaAdminProvider tkmsKafkaAdminProvider() { + return new TkmsKafkaAdminProvider(); + } + + @Bean + @ConditionalOnMissingBean(ITkmsTopicValidator.class) + public ITkmsTopicValidator tkmsTopicValidator() { + return new TkmsTopicValidator(); + } } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaAdminProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaAdminProvider.java new file mode 100644 index 0000000..e0f8451 --- /dev/null +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaAdminProvider.java @@ -0,0 +1,96 @@ +package com.transferwise.kafka.tkms.config; + +import com.transferwise.common.gracefulshutdown.GracefulShutdownStrategy; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Data; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.springframework.beans.factory.annotation.Autowired; + +@Slf4j +public class TkmsKafkaAdminProvider implements ITkmsKafkaAdminProvider, GracefulShutdownStrategy { + + /** + * Keep the kafka-clients' MBean registration happy. + */ + private static final AtomicInteger sequence = new AtomicInteger(); + + @Autowired + private TkmsProperties tkmsProperties; + + @Autowired + private MeterRegistry meterRegistry; + + private Map admins = new ConcurrentHashMap<>(); + + @Override + public Admin getKafkaAdmin() { + return admins.computeIfAbsent(0L, key -> { + var configs = new HashMap(); + + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "Please specify 'tw-tkms.kafka.bootstrap.servers'."); + configs.put(AdminClientConfig.CLIENT_ID_CONFIG, "tw-tkms-topic-validation-" + sequence.incrementAndGet()); + configs.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + configs.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, 100); + configs.put(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 5000); + + var configNames = AdminClientConfig.configNames(); + for (var e : tkmsProperties.getKafka().entrySet()) { + if (configNames.contains(e.getKey())) { + configs.put(e.getKey(), e.getValue()); + } + } + + final var admin = KafkaAdminClient.create(configs); + final var kafkaClientMetrics = new KafkaClientMetrics(admin); + kafkaClientMetrics.bindTo(meterRegistry); + + return new AdminEntry().setAdmin(admin).setKafkaClientMetric(kafkaClientMetrics); + }).getAdmin(); + } + + @Override + public void closeKafkaAdmin() { + var adminEntry = admins.remove(0L); + + if (adminEntry == null) { + return; + } + + adminEntry.getKafkaClientMetric().close(); + + try { + adminEntry.getAdmin().close(Duration.ofSeconds(5)); + } catch (Throwable t) { + log.error("Closing Kafka admin failed.", t); + } + } + + @Override + public void applicationTerminating() { + closeKafkaAdmin(); + } + + @Override + public boolean canShutdown() { + return true; + } + + @Data + @Accessors(chain = true) + protected static class AdminEntry { + + private Admin admin; + + private KafkaClientMetrics kafkaClientMetric; + } +} diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java index 8b283c1..101fdcb 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java @@ -9,6 +9,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import lombok.Data; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; @@ -22,6 +23,11 @@ @Slf4j public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, GracefulShutdownStrategy { + /** + * Keep the kafka-clients' MBean registration happy. + */ + private static final AtomicInteger sequence = new AtomicInteger(); + @Autowired private TkmsProperties tkmsProperties; @@ -51,7 +57,8 @@ public KafkaProducer getKafkaProducer(TkmsShardPartition shardPa configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "10000"); configs.put(ProducerConfig.CLIENT_ID_CONFIG, - "tw-tkms-" + shardPartition.getShard() + "-" + shardPartition.getPartition() + "-" + useCase.name().toLowerCase()); + "tw-tkms-" + shardPartition.getShard() + "-" + shardPartition.getPartition() + "-" + useCase.name().toLowerCase() + + "-" + sequence.incrementAndGet()); if (useCase == UseCase.PROXY) { // We use large lingering time, because we are calling the `.flush()` anyway. diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java index cd49239..7d3ba7e 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java @@ -206,6 +206,22 @@ public void afterPropertiesSet() { @LegacyResolvedValue private List topics = new ArrayList<>(); + /** + * Uses AdminClient to validate topics. + * + *

AdminClient allows us to also check if topics have suitable ACLs. + * + *

Experimental option. + * + *

May be the default in the future. + */ + private boolean useAdminClientForTopicsValidation = false; + + /** + * How many topics validations are we doing in parallel, during the initialization of Tkms. + */ + private int adminClientTopicsValidationConcurrency = 10; + @Valid @jakarta.validation.Valid private EarliestVisibleMessages earliestVisibleMessages = new EarliestVisibleMessages(); @@ -532,6 +548,11 @@ public static class Internals { *

Too low values may cause very large batches to fail. */ private Duration flushInterruptionDuration = Duration.ofSeconds(30); + + /** + * How long do we wait for topics to get pre-validated. + */ + private Duration topicPreValidationTimeout = Duration.ofMinutes(1); } public enum NotificationLevel { @@ -555,6 +576,7 @@ public enum NotificationType { TOO_MANY_DELETE_BATCHES, EARLIEST_MESSAGES_SYSTEM_DISABLED, ENGINE_INDEPENDENT_STATS_NOT_ENABLED, - NO_ACTIVE_TRANSACTION + NO_ACTIVE_TRANSACTION, + TOPIC_NOT_VALIDATED_AT_INIT } } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/ITkmsMetricsTemplate.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/ITkmsMetricsTemplate.java index cfe924d..0ccce91 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/ITkmsMetricsTemplate.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/ITkmsMetricsTemplate.java @@ -57,4 +57,9 @@ void recordMessageSerialization(TkmsShardPartition shardPartition, CompressionAl Object registerPollingInProgressGauge(TkmsShardPartition shardPartition); void registerMessagesInTransactionCount(long registeredMessagesCount, boolean success); + + /* + * Allows to debug if something is misconfigured and the ACLs are not returned by the server. + */ + void registerNoAclOperationsFetched(TkmsShardPartition shardPartition, String topic); } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/TkmsMetricsTemplate.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/TkmsMetricsTemplate.java index 6df791c..5b889c9 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/TkmsMetricsTemplate.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/TkmsMetricsTemplate.java @@ -63,6 +63,8 @@ public class TkmsMetricsTemplate implements ITkmsMetricsTemplate, InitializingBe public static final String GAUGE_DAO_EARLIEST_MESSAGE_ID = "tw_tkms_dao_earliest_message_id"; public static final String COUNTER_DAO_EARLIEST_MESSAGE_ID_COMMIT = "tw_tkms_dao_earliest_message_id_commit"; + public static final String COUNTER_NO_ACL_OPERATIONS_FETCHED = "tw_tkms_no_acl_operations_fetched"; + public static final Tag NA_SHARD_TAG = Tag.of("shard", "N/A"); public static final Tag NA_PARTITION_TAG = Tag.of("partition", "N/A"); public static final Tag TAG_SUCCESS_TRUE = Tag.of("success", "true"); @@ -432,6 +434,11 @@ public void registerMessagesInTransactionCount(long registeredMessagesCount, boo meterCache.summary(SUMMARY_MESSAGES_IN_TRANSACTION, TagsSet.of(successTag(success))).record(registeredMessagesCount); } + @Override + public void registerNoAclOperationsFetched(TkmsShardPartition shardPartition, String topic) { + meterCache.counter(COUNTER_NO_ACL_OPERATIONS_FETCHED, TagsSet.of(shardTag(shardPartition), topicTag(topic))).increment(); + } + protected MetricHandle registerGauge(String name, Supplier supplier, Tag... tags) { return new MetricHandle().setMeter(Gauge.builder(name, supplier) .tags(Tags.of(tags)).register(meterCache.getMeterRegistry())); diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java index 010d3ff..7f974c7 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java @@ -88,6 +88,7 @@ public void cleanup() { ((TransactionalKafkaMessageSender) transactionalKafkaMessageSender).setTkmsDaoProvider(tkmsDaoProvider); tkmsProperties.setDeferMessageRegistrationUntilCommit(false); tkmsProperties.setValidateSerialization(false); + tkmsProperties.setUseAdminClientForTopicsValidation(false); } protected void setupConfig(boolean deferUntilCommit) { @@ -468,16 +469,29 @@ void testThatMessagesOrderForAnEntityIsPreservedWithBatches(boolean deferUntilCo } } + private static Stream unknownTopicsMatrix() { + return Stream.of( + Arguments.of(true, true), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false) + ); + } + @ParameterizedTest - @ValueSource(booleans = {false, true}) + @MethodSource("unknownTopicsMatrix") @SneakyThrows - void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean deferUntilCommit) { + void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean deferUntilCommit, boolean useAdminClient) { try { setupConfig(deferUntilCommit); + tkmsProperties.setUseAdminClientForTopicsValidation(useAdminClient); + + var expectedMessage = + useAdminClient ? "Topic 'NotExistingTopic' does not exist." : "Topic NotExistingTopic not present in metadata after"; assertThatThrownBy(() -> transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender .sendMessage(new TkmsMessage().setTopic("NotExistingTopic").setValue("Stuff".getBytes(StandardCharsets.UTF_8))))) - .hasMessageContaining("Topic NotExistingTopic not present in metadata"); + .hasMessageContaining(expectedMessage); } finally { // Stop logs spam about not existing topic in metadata. tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(); diff --git a/tw-tkms-starter/src/test/resources/application.yml b/tw-tkms-starter/src/test/resources/application.yml index 6d36008..eb25838 100644 --- a/tw-tkms-starter/src/test/resources/application.yml +++ b/tw-tkms-starter/src/test/resources/application.yml @@ -31,6 +31,8 @@ tw-tkms: polling-interval: 5ms shards-count: 2 insert-batch-size: 2 + topics: + - TestTopic kafka: bootstrap.servers: "${TW_TKMS_KAFKA_TCP_HOST:localhost}:${TW_TKMS_KAFKA_TCP_9092}" shards: @@ -78,6 +80,7 @@ spring: tw-tkms: database-dialect: POSTGRES delete-batch-sizes: "51, 11, 5, 1" + use-admin-client-for-topics-validation: true tw-tkms-test: test-topic: TestTopicPostgres