diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java index be84834..a76f1b8 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java @@ -14,7 +14,6 @@ import com.transferwise.kafka.tkms.api.TkmsShardPartition; import com.transferwise.kafka.tkms.config.ITkmsDaoProvider; import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; -import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; import com.transferwise.kafka.tkms.config.TkmsProperties; import com.transferwise.kafka.tkms.config.TkmsProperties.DatabaseDialect; import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationLevel; @@ -72,7 +71,7 @@ public void afterPropertiesSet() { environmentValidator.validate(); for (String topic : properties.getTopics()) { - validateTopic(TkmsShardPartition.of(properties.getDefaultShard(), 0), topic); + validateTopic(topic); } validateDeleteBatchSizes(); @@ -169,7 +168,7 @@ public SendMessagesResult sendMessages(SendMessagesRequest request) { var topic = tkmsMessage.getTopic(); if (!validatedTopics.contains(topic)) { - validateTopic(shardPartition, topic); + validateTopic(topic); validatedTopics.add(topic); } @@ -265,7 +264,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) { validateMessageSize(message, 0); var topic = message.getTopic(); - validateTopic(shardPartition, topic); + validateTopic(topic); if (deferMessageRegistrationUntilCommit) { // Transaction is guaranteed to be active here. @@ -375,8 +374,8 @@ public void afterCompletion(int status) { /** * Every call to normal `KafkaProducer.send()` uses metadata for a topic as well, so should be very fast. */ - protected void validateTopic(TkmsShardPartition shardPartition, String topic) { - kafkaProducerProvider.getKafkaProducer(shardPartition, UseCase.TOPIC_VALIDATION).partitionsFor(topic); + protected void validateTopic(String topic) { + kafkaProducerProvider.getKafkaProducerForTopicValidation().partitionsFor(topic); } protected void validateMessages(SendMessagesRequest request) { @@ -393,7 +392,7 @@ protected void validateMessage(TkmsMessage message, int messageIdx) { Preconditions.checkArgument(message.getPartition() >= 0, "%s: Partition number can not be negative: %s", messageIdx, message.getPartition()); } if (message.getKey() != null) { - Preconditions.checkArgument(message.getKey().length() > 0, "%s: Key can not be an empty string.", messageIdx); + Preconditions.checkArgument(!message.getKey().isEmpty(), "%s: Key can not be an empty string.", messageIdx); } if (message.getShard() != null) { Preconditions.checkArgument(message.getShard() >= 0, "%s: Shard number can not be negative :%s", messageIdx, message.getShard()); @@ -463,7 +462,7 @@ private int utf8Length(CharSequence s) { protected void fireMessageRegisteredEvent(TkmsShardPartition shardPartition, Long id, TkmsMessage message) { var listeners = getTkmsEventsListeners(); if (log.isDebugEnabled()) { - log.debug("Message was registered for {} with storage id {}. Listeners count: ", shardPartition, id, listeners.size()); + log.debug("Message was registered for {} with storage id {}. Listeners count: {}.", shardPartition, id, listeners.size()); } if (tkmsEventsListeners.isEmpty()) { diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java index 449e183..b4c0717 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java @@ -7,8 +7,12 @@ public interface ITkmsKafkaProducerProvider { KafkaProducer getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); + KafkaProducer getKafkaProducerForTopicValidation(); + void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); + void closeKafkaProducerForTopicValidation(); + enum UseCase { PROXY, TEST, diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java index dc7f98c..8b283c1 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java @@ -40,7 +40,12 @@ public KafkaProducer getKafkaProducer(TkmsShardPartition shardPa configs.put(ProducerConfig.ACKS_CONFIG, "all"); configs.put(ProducerConfig.BATCH_SIZE_CONFIG, "163840"); configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(tkmsProperties.getMaximumMessageBytes())); - configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); + + // The following block is to guarantee the messages order. + configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); + configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + configs.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Please specify 'tw-tkms.kafka.bootstrap.servers'."); configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000"); configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); @@ -69,6 +74,11 @@ public KafkaProducer getKafkaProducer(TkmsShardPartition shardPa }).getProducer(); } + @Override + public KafkaProducer getKafkaProducerForTopicValidation() { + return getKafkaProducer(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), UseCase.TOPIC_VALIDATION); + } + @Override public void closeKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) { var producerEntry = producers.remove(Pair.of(shardPartition, useCase)); @@ -86,6 +96,11 @@ public void closeKafkaProducer(TkmsShardPartition shardPartition, UseCase useCas } } + @Override + public void closeKafkaProducerForTopicValidation() { + closeKafkaProducer(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), UseCase.TOPIC_VALIDATION); + } + @Override public void applicationTerminating() { producers.keySet().forEach(key -> closeKafkaProducer(key.getLeft(), key.getRight())); diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java index 6fb41f7..ddd029e 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java @@ -16,6 +16,7 @@ import com.transferwise.kafka.tkms.api.TkmsMessage.Header; import com.transferwise.kafka.tkms.api.TkmsShardPartition; import com.transferwise.kafka.tkms.config.ITkmsDaoProvider; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; import com.transferwise.kafka.tkms.dao.FaultInjectedTkmsDao; import com.transferwise.kafka.tkms.metrics.TkmsMetricsTemplate; import com.transferwise.kafka.tkms.test.BaseIntTest; @@ -65,6 +66,8 @@ abstract class EndToEndIntTest extends BaseIntTest { private ITkmsDaoProvider tkmsDaoProvider; @Autowired private TkmsStorageToKafkaProxy tkmsStorageToKafkaProxy; + @Autowired + private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider; private FaultInjectedTkmsDao faultInjectedTkmsDao; @@ -168,7 +171,8 @@ void testExactlyOnceDelivery(int scenario) throws Exception { setupConfig(deferUntilCommit); tkmsProperties.setValidateSerialization(validateSerialization); - String message = "Hello World!"; + // For producer to create more batches and spread messages around different partitions. + String message = StringUtils.repeat("Hello World!", 100); int threadsCount = 20; int batchesCount = 20; int batchSize = 20; @@ -462,11 +466,17 @@ void testThatMessagesOrderForAnEntityIsPreservedWithBatches(boolean deferUntilCo @ValueSource(booleans = {false, true}) @SneakyThrows void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean deferUntilCommit) { - setupConfig(deferUntilCommit); + try { + setupConfig(deferUntilCommit); - assertThatThrownBy(() -> transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setTopic("NotExistingTopic").setValue("Stuff".getBytes(StandardCharsets.UTF_8))))) - .hasMessageContaining("Topic NotExistingTopic not present in metadata"); + assertThatThrownBy(() -> transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender + .sendMessage(new TkmsMessage().setTopic("NotExistingTopic").setValue("Stuff".getBytes(StandardCharsets.UTF_8))))) + .hasMessageContaining("Topic NotExistingTopic not present in metadata"); + } + finally { + // Stop logs spam about not existing topic in metadata. + tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(); + } } @ParameterizedTest