From aedef3e9f1067809a8dfb055af1d836acf37239f Mon Sep 17 00:00:00 2001 From: Yevhenii Voievodin Date: Sat, 5 Oct 2024 22:38:59 +0300 Subject: [PATCH] Support sending standardized uuid and priority values --- CHANGELOG.md | 27 ++ .../kafka/tkms/demoapp/MessagesProducer.java | 3 + gradle.properties | 2 +- .../kafka/tkms/StandardHeaders.java | 10 + .../tkms/TransactionalKafkaMessageSender.java | 43 ++- .../kafka/tkms/api/TkmsMessage.java | 27 ++ .../kafka/tkms/config/TkmsProperties.java | 6 + .../tkms/metrics/TkmsMetricsTemplate.java | 10 +- .../tkms/EarliestMessageTrackingIntTest.java | 8 +- .../kafka/tkms/EndToEndIntTest.java | 303 +++++++++++++----- .../kafka/tkms/ManualIntTest.java | 9 +- .../kafka/tkms/MessageDecorationTest.java | 25 +- .../kafka/tkms/MessagesInterceptionTest.java | 31 +- .../MultiServerTopicValidationIntTest.java | 6 +- .../kafka/tkms/PostgresEndToEndIntTest.java | 12 +- .../tkms/TkmsMessageValidationIntTest.java | 42 +++ 16 files changed, 453 insertions(+), 111 deletions(-) create mode 100644 tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/StandardHeaders.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 43f1af6..d1f8e52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,33 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.0.0] - 2024-10-07 + +### Changed + +Added two new properties to the `TkmsMessage`: + +- `uuid` [required] - specifies uniqueness of the message. Will be delivered to consumers in the `x-wise-uuid` header. +- `priority` [optional] - specifies priority of the message. Lower number - higher priority. Will be delivered to consumers in the `x-wise-uuid` header. + +Consumers of messages that have UUID and priority headers can efficiently use provided values for deduplication and other processing purposes with no need to deserialize payloads. + +Best practices for setting UUID value: +- Likely the UUID value provided will be stored and indexed on consumer side. It's recommended to use sequential UUIDs in such scenarios, which proved to yield better performance. One way to generate sequential UUIDs is by using [tw-base-utils](https://github.com/transferwise/tw-base-utils/blob/master/tw-base-utils/src/main/java/com/transferwise/common/baseutils/UuidUtils.java#L37) library. +- If payload already has UUID value then assign the same value to the corresponding `TkmsMessage`. It ensures that consumers of such messages can consistently deduplicate them by depending on one of those UUIDs. It simplifies consumers migration to standard header based UUID deduplication. +- If custom message identification mechanism is used (not based on UUID), still generate and assign UUID to the messages. However, be mindful of cases when messages are sent in non-transactional environments. For example, the same message might be sent twice with different UUIDs but the same identity (according to the custom identification mechanism). + +UUID presence is required by default. This is the only breaking change in this version. The intention of such behaviour is to force producers to supply UUID value in messages sent, which will greatly benefit consumers of those messages. This requirement can be relaxed by setting the following configuration property. +```yaml +uuid-header-required: false +``` + +Added a new gauge metric that exposes value of `uuid-header-required` configuration property. +``` +# 0 - uuid header isn't required, 1 - uuid header is required +tw_tkms_configuration_uuid_header_required +``` + ## [0.30.1] - 2024-08-08 ### Changed - MeterFilter's applied by the library are no longer explicitly applied and are instead diff --git a/demoapp/src/main/java/com/transferwise/kafka/tkms/demoapp/MessagesProducer.java b/demoapp/src/main/java/com/transferwise/kafka/tkms/demoapp/MessagesProducer.java index 7929148..fcb6704 100644 --- a/demoapp/src/main/java/com/transferwise/kafka/tkms/demoapp/MessagesProducer.java +++ b/demoapp/src/main/java/com/transferwise/kafka/tkms/demoapp/MessagesProducer.java @@ -1,6 +1,7 @@ package com.transferwise.kafka.tkms.demoapp; import com.transferwise.common.baseutils.ExceptionUtils; +import com.transferwise.common.baseutils.UuidUtils; import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender; import com.transferwise.kafka.tkms.api.TkmsMessage; import java.nio.charset.StandardCharsets; @@ -50,6 +51,8 @@ public void produce(long threadCount, long batchCount, long batchSize) { String key = String.valueOf(finalT * batchCount * batchSize + finalI * batchSize + j); TkmsMessage message = new TkmsMessage() + .setUuid(UuidUtils.generatePrefixCombUuid()) + .setPriority(17L) .setTopic("MyTopic") .setTimestamp(Instant.now()) .setKey(key).setValue(textMessage.getBytes(StandardCharsets.UTF_8)); diff --git a/gradle.properties b/gradle.properties index fe6a7f0..aef125e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.30.1 +version=1.0.0 diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/StandardHeaders.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/StandardHeaders.java new file mode 100644 index 0000000..0ba90ed --- /dev/null +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/StandardHeaders.java @@ -0,0 +1,10 @@ +package com.transferwise.kafka.tkms; + +/** + * Set of standard headers used by the library. + */ +final class StandardHeaders { + + static final String X_WISE_UUID = "x-wise-uuid"; + static final String X_WISE_PRIORITY = "x-wise-priority"; +} diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java index 5c84134..3bc0208 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java @@ -19,6 +19,7 @@ import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationLevel; import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationType; import com.transferwise.kafka.tkms.metrics.ITkmsMetricsTemplate; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -148,7 +149,11 @@ protected void checkActiveTransaction(int shard, boolean transactionActive, bool @Override public SendMessagesResult sendMessages(SendMessagesRequest request) { request.getTkmsMessages().forEach(message -> messageDecorators.forEach(message::accept)); - validateMessages(request); + for (int i = 0; i < request.getTkmsMessages().size(); i++) { + TkmsMessage tkmsMessage = request.getTkmsMessages().get(i); + addStandardHeaders(tkmsMessage); + validateMessage(tkmsMessage, i); + } var transactionActive = TransactionSynchronizationManager.isActualTransactionActive(); var validatedTopics = new HashSet(); @@ -263,6 +268,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) { checkActiveTransaction(shardPartition.getShard(), transactionActive, deferMessageRegistrationUntilCommit); + addStandardHeaders(request.getTkmsMessage()); validateMessage(message, 0); validateMessageSize(message, 0); @@ -374,13 +380,6 @@ public void afterCompletion(int status) { } } - protected void validateMessages(SendMessagesRequest request) { - for (int i = 0; i < request.getTkmsMessages().size(); i++) { - var tkmsMessage = request.getTkmsMessages().get(i); - validateMessage(tkmsMessage, i); - } - } - protected void validateMessage(TkmsMessage message, int messageIdx) { Preconditions.checkNotNull(message, "%s: No message provided.", messageIdx); Preconditions.checkArgument(!Strings.isNullOrEmpty(message.getTopic()), "%s: No topic provided.", messageIdx); @@ -396,13 +395,23 @@ protected void validateMessage(TkmsMessage message, int messageIdx) { message.getShard(), properties.getShardsCount()); } Preconditions.checkNotNull(message.getValue(), "%s: Value can not be null.", messageIdx); + boolean uuidHeaderPresent = false; if (message.getHeaders() != null) { for (int headerIdx = 0; headerIdx < message.getHeaders().size(); headerIdx++) { Header header = message.getHeaders().get(headerIdx); Preconditions.checkNotNull(header.getValue(), "%s: Header value @{%s} can not be null.", messageIdx, headerIdx); Preconditions.checkArgument(!Strings.isNullOrEmpty(header.getKey()), "%s: Header key @{%s} can not be null.", messageIdx, headerIdx); + uuidHeaderPresent |= StandardHeaders.X_WISE_UUID.equals(header.getKey()); } } + if (properties.isUuidHeaderRequired() && !uuidHeaderPresent) { + throw new IllegalArgumentException( + "%d: Message is required to have @{%s} header.".formatted( + messageIdx, + StandardHeaders.X_WISE_UUID + ) + ); + } } /** @@ -434,6 +443,24 @@ protected void validateMessageSize(TkmsMessage message, int messageIdx) { } } + private static void addStandardHeaders(TkmsMessage tkmsMessage) { + if (tkmsMessage.getPriority() != null) { + tkmsMessage.addHeader( + new Header() + .setKey(StandardHeaders.X_WISE_PRIORITY) + .setValue(tkmsMessage.getPriority().toString().getBytes(StandardCharsets.UTF_8)) + ); + } + // uuid shall remain last header, so it can be quickly accessed using Headers#lastHeader + if (tkmsMessage.getUuid() != null) { + tkmsMessage.addHeader( + new Header() + .setKey(StandardHeaders.X_WISE_UUID) + .setValue(tkmsMessage.getUuid().toString().getBytes(StandardCharsets.UTF_8)) + ); + } + } + private int utf8Length(CharSequence s) { if (s == null) { return 0; diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java index cd394b5..98e2dc9 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java @@ -1,11 +1,13 @@ package com.transferwise.kafka.tkms.api; +import com.transferwise.common.baseutils.UuidUtils; import com.transferwise.kafka.tkms.CompressionAlgorithm; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.UUID; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; import javax.validation.constraints.PositiveOrZero; @@ -71,6 +73,31 @@ public class TkmsMessage { */ private Map metadata; + /** + * Uniquely identifies this message for consumers. + * + *

If value is set then it will be added to headers with {@code x-wise-uuid} key. + * + *

Having UUID in header allows consumer to run deduplication check on this value without need to deserialize payload. + * If payload provides uuid it must be the same as this value so that consumers that depend on either of these values can have consistent + * deduplication. + * + *

Prefer using sequential uuids (e.g. {@link UuidUtils#generatePrefixCombUuid()}) which are proved to yield better performance. + */ + private UUID uuid; + + /** + * Defines priority of this message for consumers. + * + *

Lower value means higher priority. For example, 0 is higher priority than 10. + * + *

If value is set then it will be added to headers with {@code x-wise-priority} key. + * + *

Having priority in header allows consumer to derive priority without need to deserialize payload. For example, it can be useful + * when consumers filter messages based on priority before deciding how to process those. + */ + private Long priority; + public TkmsMessage addHeader(Header header) { if (headers == null) { headers = new ArrayList<>(); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java index a2c7468..4a1b523 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java @@ -3,6 +3,7 @@ import com.transferwise.common.baseutils.validation.LegacyResolvedValue; import com.transferwise.common.baseutils.validation.ResolvedValue; import com.transferwise.kafka.tkms.CompressionAlgorithm; +import com.transferwise.kafka.tkms.api.TkmsMessage; import com.transferwise.kafka.tkms.api.TkmsShardPartition; import java.time.Duration; import java.util.ArrayList; @@ -238,6 +239,11 @@ public void afterPropertiesSet() { */ private boolean deferMessageRegistrationUntilCommit = false; + /** + * Whether every message sent is required to have {@code x-wise-uuid} header. See {@link TkmsMessage#getUuid()} for more details. + */ + private boolean uuidHeaderRequired = true; + @Valid @jakarta.validation.Valid private Compression compression = new Compression(); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/TkmsMetricsTemplate.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/TkmsMetricsTemplate.java index 9963190..6cae9fb 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/TkmsMetricsTemplate.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/metrics/TkmsMetricsTemplate.java @@ -8,15 +8,9 @@ import com.transferwise.kafka.tkms.config.TkmsProperties; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Meter; -import io.micrometer.core.instrument.Meter.Type; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; -import io.micrometer.core.instrument.config.MeterFilter; -import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; import java.time.Instant; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import lombok.Data; @@ -37,6 +31,7 @@ public class TkmsMetricsTemplate implements ITkmsMetricsTemplate, InitializingBe // Miccrometer 1.13 (which comes with Spring boot 3.3) doesn't properly convert gauge metrics with info suffix when using underscore, // using dot here as a workaround public static final String GAUGE_LIBRARY_INFO = "tw.library.info"; + public static final String GAUGE_CONFIGURATION_UUID_HEADER_REQUIRED = "tw_tkms_configuration_uuid_header_required"; public static final String TIMER_PROXY_POLL = "tw_tkms_proxy_poll"; public static final String GAUGE_PROXY_POLL_IN_PROGRESS = "tw_tkms_proxy_poll_in_progress"; public static final String TIMER_PROXY_CYCLE = "tw_tkms_proxy_cycle"; @@ -288,6 +283,9 @@ public void registerLibrary() { Gauge.builder(GAUGE_LIBRARY_INFO, () -> 1d).tags("version", version, "library", "tw-tkms") .description("Provides metadata about the library, for example the version.") .register(meterCache.getMeterRegistry()); + Gauge.builder(GAUGE_CONFIGURATION_UUID_HEADER_REQUIRED, tkmsProperties, props -> props.isUuidHeaderRequired() ? 1d : 0d) + .description("0 - uuid header isn't required, 1 - uuid header is required") + .register(meterCache.getMeterRegistry()); } @Override diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EarliestMessageTrackingIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EarliestMessageTrackingIntTest.java index 3a6796b..95b7397 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EarliestMessageTrackingIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EarliestMessageTrackingIntTest.java @@ -3,6 +3,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import com.transferwise.common.baseutils.UuidUtils; import com.transferwise.common.baseutils.clock.TestClock; import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; import com.transferwise.kafka.tkms.api.TkmsMessage; @@ -106,7 +107,12 @@ void testIfEarliestMessageTrackerBehavesAsExpected() { protected void sendMessageAndWaitForArrival() { transactionsHelper.withTransaction().run(() -> { - var result = tkms.sendMessage(new TkmsMessage().setTopic(testTopic).setValue("Hello Kristo!".getBytes(StandardCharsets.UTF_8))); + var result = tkms.sendMessage( + new TkmsMessage() + .setUuid(UuidUtils.generatePrefixCombUuid()) + .setTopic(testTopic) + .setValue("Hello Kristo!".getBytes(StandardCharsets.UTF_8)) + ); log.info("Registered a message with storage id " + result.getStorageId()); } ); diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java index 5747db8..9b41f21 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java @@ -1,5 +1,6 @@ package com.transferwise.kafka.tkms; +import static com.transferwise.common.baseutils.UuidUtils.generatePrefixCombUuid; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; @@ -28,6 +29,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadLocalRandom; @@ -107,6 +109,8 @@ void testThatJsonStringMessageCanBeSentAndRetrieved(boolean deferUntilCommit) { for (int i = 0; i < messageMultiplier; i++) { sb.append(messagePart); } + var uuid = generatePrefixCombUuid(); + var priority = 17L; tkmsStorageToKafkaProxy.pause(); @@ -116,10 +120,20 @@ void testThatJsonStringMessageCanBeSentAndRetrieved(boolean deferUntilCommit) { Consumer> messageCounter = cr -> ExceptionUtils.doUnchecked(() -> { TestEvent receivedEvent = objectMapper.readValue(cr.value(), TestEvent.class); if (receivedEvent.getMessage().equals(message)) { - assertThat(cr.headers().toArray()).hasSize(1); - org.apache.kafka.common.header.Header header = cr.headers().toArray()[0]; - assertThat(header.key()).isEqualTo("x-tw-criticality"); - assertThat(new String(header.value(), StandardCharsets.UTF_8)).isEqualTo("PrettyLowLol"); + assertThat(cr.headers().toArray()).hasSize(3); + + org.apache.kafka.common.header.Header criticalityHeader = cr.headers().toArray()[0]; + assertThat(criticalityHeader.key()).isEqualTo("x-tw-criticality"); + assertThat(new String(criticalityHeader.value(), StandardCharsets.UTF_8)).isEqualTo("PrettyLowLol"); + + org.apache.kafka.common.header.Header priorityHeader = cr.headers().toArray()[1]; + assertThat(priorityHeader.key()).isEqualTo(StandardHeaders.X_WISE_PRIORITY); + assertThat(Long.parseLong(new String(priorityHeader.value(), StandardCharsets.UTF_8))).isEqualTo(priority); + + org.apache.kafka.common.header.Header uuidHeader = cr.headers().toArray()[2]; + assertThat(uuidHeader.key()).isEqualTo(StandardHeaders.X_WISE_UUID); + assertThat(UUID.fromString(new String(uuidHeader.value(), StandardCharsets.UTF_8))).isEqualTo(uuid); + receivedCount.incrementAndGet(); } else { throw new IllegalStateException("Wrong message receive: " + receivedEvent.getMessage()); @@ -133,9 +147,18 @@ void testThatJsonStringMessageCanBeSentAndRetrieved(boolean deferUntilCommit) { await().until(() -> tkmsStorageToKafkaProxy.isPaused()); transactionsHelper.withTransaction().run(() -> { - var result = transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(toJsonBytes(testEvent)) - .addHeader(new Header().setKey("x-tw-criticality").setValue("PrettyLowLol".getBytes(StandardCharsets.UTF_8)))); + var result = transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setUuid(uuid) + .setPriority(priority) + .setTopic(testProperties.getTestTopic()) + .setValue(toJsonBytes(testEvent)) + .addHeader( + new Header() + .setKey("x-tw-criticality") + .setValue("PrettyLowLol".getBytes(StandardCharsets.UTF_8)) + ) + ); var messagesCount = tkmsTestDao.getMessagesCount(result.getShardPartition()); if (deferUntilCommit) { @@ -206,8 +229,12 @@ void testExactlyOnceDelivery(int scenario) throws Exception { for (long i = 0; i < batchSize; i++) { long id = finalT * threadsCount * batchesCount + finalB * batchesCount + i; TestEvent testEvent = new TestEvent().setId(id).setMessage(message); - transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(objectMapper.writeValueAsBytes(testEvent))); + transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(testProperties.getTestTopic()) + .setValue(objectMapper.writeValueAsBytes(testEvent)) + ); } return null; }); @@ -275,8 +302,14 @@ void testThatMessagesWithSameKeyEndUpInOnePartition(boolean deferUntilCommit) { TestEvent testEvent = new TestEvent().setId(1L).setMessage(message); transactionsHelper.withTransaction().run(() -> - transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setKey(key).setTopic(testProperties.getTestTopic()).setValue(toJsonBytes(testEvent)))); + transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setKey(key) + .setTopic(testProperties.getTestTopic()) + .setValue(toJsonBytes(testEvent)) + ) + ); } await().until(() -> receivedCount.get() >= n); @@ -312,10 +345,14 @@ void testThatMessagesWithSamePartitionEndUpInOnePartition(boolean deferUntilComm TestEvent testEvent = new TestEvent().setId(1L).setMessage(message); transactionsHelper.withTransaction().run(() -> - transactionalKafkaMessageSender - .sendMessage( - new TkmsMessage().setPartition(partition).setTopic(testProperties.getTestTopic()) - .setValue(toJsonBytes(testEvent)))); + transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setPartition(partition) + .setTopic(testProperties.getTestTopic()) + .setValue(toJsonBytes(testEvent)) + ) + ); } await().until(() -> receivedCount.get() >= n); @@ -359,9 +396,14 @@ void testThatMessagesOrderForAnEntityIsPreserved(boolean deferUntilCommit) throw long id = finalEntityId * entityEventsCount + i; TestEvent testEvent = new TestEvent().setId(id).setEntityId(finalEntityId).setMessage(message); transactionsHelper.withTransaction().run(() -> - transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setKey(String.valueOf(finalEntityId)).setTopic(testProperties.getTestTopic()) - .setValue(toJsonBytes(testEvent)))); + transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setKey(String.valueOf(finalEntityId)) + .setTopic(testProperties.getTestTopic()) + .setValue(toJsonBytes(testEvent)) + ) + ); checkIfTransactionContextsHaveBeenCleared(); } @@ -429,8 +471,13 @@ void testThatMessagesOrderForAnEntityIsPreservedWithBatches(boolean deferUntilCo for (int j = 0; j < batchSize; j++) { var id = finalEntityId * entityEventsCount + i; var testEvent = new TestEvent().setId(id).setEntityId(finalEntityId).setMessage(message); - sendMessagesRequest.addTkmsMessage(new TkmsMessage().setKey(String.valueOf(finalEntityId)).setTopic(testProperties.getTestTopic()) - .setValue(toJsonBytes(testEvent))); + sendMessagesRequest.addTkmsMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setKey(String.valueOf(finalEntityId)) + .setTopic(testProperties.getTestTopic()) + .setValue(toJsonBytes(testEvent)) + ); i++; } @@ -489,9 +536,16 @@ void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean var expectedMessage = useAdminClient ? "Topic 'NotExistingTopic' does not exist." : "Topic NotExistingTopic not present in metadata after"; - assertThatThrownBy(() -> transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setTopic("NotExistingTopic").setValue("Stuff".getBytes(StandardCharsets.UTF_8))))) - .hasMessageContaining(expectedMessage); + assertThatThrownBy( + () -> transactionsHelper.withTransaction().run( + () -> transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic("NotExistingTopic") + .setValue("Stuff".getBytes(StandardCharsets.UTF_8)) + ) + ) + ).hasMessageContaining(expectedMessage); } finally { // Stop logs spam about not existing topic in metadata. tkmsKafkaProducerProvider.closeKafkaProducersForTopicValidation(); @@ -503,9 +557,14 @@ void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean void sendingOutMessagesWithoutActiveTransactionsWillFail(boolean deferUntilCommit) { setupConfig(deferUntilCommit); - assertThatThrownBy(() -> transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setTopic("NotExistingTopic").setValue("Stuff".getBytes(StandardCharsets.UTF_8)))) - .isInstanceOf(IllegalStateException.class) + assertThatThrownBy( + () -> transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic("NotExistingTopic") + .setValue("Stuff".getBytes(StandardCharsets.UTF_8)) + ) + ).isInstanceOf(IllegalStateException.class) .hasMessageContaining("No active transaction detected."); } @@ -526,10 +585,10 @@ void sendingMultipleMessagesWorks(boolean deferUntilCommit) { SendMessagesResult sendMessagesResult = transactionsHelper.withTransaction().call(() -> transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest() - .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value)) - .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(1)) - .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(0).setPartition(0)) - .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(0).setPartition(1)) + .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value)) + .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value).setShard(1)) + .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value).setShard(0).setPartition(0)) + .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value).setShard(0).setPartition(1)) )); assertThat(sendMessagesResult.getResults().size()).isEqualTo(4); @@ -577,21 +636,21 @@ void sendingMultipleMessagesWithMultipleStatementsWorks(boolean deferUntilCommit transactionsHelper.withTransaction().run(() -> { transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest() - .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value)) - .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(1)) - .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(0).setPartition(0)) - .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(0).setPartition(1)) + .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value)) + .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value).setShard(0).setPartition(0)) + .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value).setShard(0).setPartition(1)) + .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value).setShard(1)) ); transactionalKafkaMessageSender.sendMessage(new SendMessageRequest().setDeferMessageRegistrationUntilCommit(deferUntilCommit) - .setTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value))); + .setTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value))); transactionalKafkaMessageSender.sendMessage(new SendMessageRequest() - .setTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value))); + .setTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value))); transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest() - .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value)) - .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value)) + .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value)) + .addTkmsMessage(new TkmsMessage().setUuid(generatePrefixCombUuid()).setTopic(topic).setValue(value)) ); var messagesCount = tkmsTestDao.getMessagesCount(TkmsShardPartition.of(0, 0)) @@ -637,46 +696,98 @@ void mixingDeferredAndNotDeferredMessagesIsPrevented(boolean deferUntilCommit) { assertThatThrownBy(() -> { transactionsHelper.withTransaction().run(() -> { - transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest() - .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value)) + transactionalKafkaMessageSender.sendMessages( + new SendMessagesRequest().addTkmsMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(topic) + .setValue(value) + ) ); - transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest().setDeferMessageRegistrationUntilCommit(!deferUntilCommit) - .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value)) + transactionalKafkaMessageSender.sendMessages( + new SendMessagesRequest() + .setDeferMessageRegistrationUntilCommit(!deferUntilCommit) + .addTkmsMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(topic) + .setValue(value) + ) ); }); }).hasMessage("You can not mix deferred and not deferred messages in the same transaction, as it will break the ordering guarantees."); assertThatThrownBy(() -> { transactionsHelper.withTransaction().run(() -> { - transactionalKafkaMessageSender.sendMessage(new SendMessageRequest() - .setTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value)) + transactionalKafkaMessageSender.sendMessage( + new SendMessageRequest().setTkmsMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(topic) + .setValue(value) + ) ); - transactionalKafkaMessageSender.sendMessage(new SendMessageRequest().setDeferMessageRegistrationUntilCommit(!deferUntilCommit) - .setTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value)) + transactionalKafkaMessageSender.sendMessage( + new SendMessageRequest() + .setDeferMessageRegistrationUntilCommit(!deferUntilCommit) + .setTkmsMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(topic) + .setValue(value) + ) ); }); }).hasMessage("You can not mix deferred and not deferred messages in the same transaction, as it will break the ordering guarantees."); // We can mix it between shard-partitions, because between those the order is not important. transactionsHelper.withTransaction().run(() -> { - transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest() - .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(0)) + transactionalKafkaMessageSender.sendMessages( + new SendMessagesRequest().addTkmsMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(topic) + .setValue(value) + .setShard(0) + ) ); - transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest().setDeferMessageRegistrationUntilCommit(!deferUntilCommit) - .addTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(1)) + transactionalKafkaMessageSender.sendMessages( + new SendMessagesRequest() + .setDeferMessageRegistrationUntilCommit(!deferUntilCommit) + .addTkmsMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(topic) + .setValue(value) + .setShard(1) + ) ); }); transactionsHelper.withTransaction().run(() -> { - transactionalKafkaMessageSender.sendMessage(new SendMessageRequest() - .setTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(0)) + transactionalKafkaMessageSender.sendMessage( + new SendMessageRequest().setTkmsMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(topic) + .setValue(value) + .setShard(0) + ) ); - transactionalKafkaMessageSender.sendMessage(new SendMessageRequest().setDeferMessageRegistrationUntilCommit(!deferUntilCommit) - .setTkmsMessage(new TkmsMessage().setTopic(topic).setValue(value).setShard(1)) + transactionalKafkaMessageSender.sendMessage( + new SendMessageRequest() + .setDeferMessageRegistrationUntilCommit(!deferUntilCommit) + .setTkmsMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(topic) + .setValue(value) + .setShard(1) + ) ); }); @@ -712,23 +823,39 @@ void testThatSendingLargeMessagesWillNotCauseAnIssue(boolean deferUntilCommit) { transactionsHelper.withTransaction().call(() -> transactionalKafkaMessageSender .sendMessage( - new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(message.getValue().getBytes(StandardCharsets.US_ASCII))))) + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(testProperties.getTestTopic()) + .setValue(message.getValue().getBytes(StandardCharsets.US_ASCII)) + ) + )) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("0: Estimated message size is 10485878, which is larger than maximum of 10485760."); + .hasMessage("0: Estimated message size is 10485937, which is larger than maximum of 10485760."); assertThatThrownBy(() -> transactionsHelper.withTransaction().call(() -> - transactionalKafkaMessageSender - .sendMessages(new SendMessagesRequest().addTkmsMessage( - new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(message.getValue().getBytes(StandardCharsets.US_ASCII)))))) + transactionalKafkaMessageSender.sendMessages( + new SendMessagesRequest().addTkmsMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(testProperties.getTestTopic()) + .setValue(message.getValue().getBytes(StandardCharsets.US_ASCII)) + ) + ) + ) + ) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("0: Estimated message size is 10485878, which is larger than maximum of 10485760."); + .hasMessage("0: Estimated message size is 10485937, which is larger than maximum of 10485760."); message.setValue(message.getValue().substring(0, 10484000)); transactionsHelper.withTransaction().run(() -> - transactionalKafkaMessageSender - .sendMessage( - new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(message.getValue().getBytes(StandardCharsets.US_ASCII)))); + transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(testProperties.getTestTopic()) + .setValue(message.getValue().getBytes(StandardCharsets.US_ASCII)) + ) + ); await().until(() -> receivedCount.get() > 0); @@ -768,8 +895,13 @@ void testThatTemporaryDeleteFailureDoesNotLeaveTrashBehind(boolean deferUntilCom try { for (int i = 0; i < messagesCount; i++) { TestEvent testEvent = new TestEvent().setId((long) i).setMessage(message); - transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(toJsonBytes(testEvent)))); + transactionsHelper.withTransaction().run( + () -> transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(testProperties.getTestTopic()) + .setValue(toJsonBytes(testEvent))) + ); } await().until(() -> receivedCount.get() >= messagesCount); @@ -827,10 +959,20 @@ void testThatInsertFailureDoesNotLeaveTrashBehind(boolean deferUntilCommit) { var testEvent1 = new TestEvent().setId(1L).setMessage(message); assertThatThrownBy(() -> transactionsHelper.withTransaction().run(() -> { - transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(toJsonBytes(testEvent0)).setShard(0)); - transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(toJsonBytes(testEvent1)).setShard(1)); + transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(testProperties.getTestTopic()) + .setValue(toJsonBytes(testEvent0)) + .setShard(0) + ); + transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setUuid(generatePrefixCombUuid()) + .setTopic(testProperties.getTestTopic()) + .setValue(toJsonBytes(testEvent1)) + .setShard(1) + ); })).hasMessage("Haha, inserts are failing lol."); assertThat(receivedCount.get()).isEqualTo(0); @@ -854,12 +996,12 @@ private static Stream compressionInput() { var arguments = new ArrayList(); for (var deferUntilCommit : deferUntilCommits) { - arguments.add(Arguments.of(CompressionAlgorithm.GZIP, 102, 103, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.NONE, 1163, 1163, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.LZ4, 126, 126, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY, 158, 158, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY_FRAMED, 156, 156, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.ZSTD, 92, 92, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.GZIP, 156, 157, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.NONE, 1218, 1218, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.LZ4, 182, 182, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY, 214, 214, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY_FRAMED, 212, 212, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.ZSTD, 147, 147, deferUntilCommit)); } return arguments.stream(); @@ -897,9 +1039,14 @@ void testMessageIsCompressed(CompressionAlgorithm algorithm, int expectedSeriali TestEvent testEvent = new TestEvent().setId(1L).setMessage(message); transactionsHelper.withTransaction().run(() -> - transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()).setValue(toJsonBytes(testEvent)) - .setCompression(new Compression().setAlgorithm(algorithm)))); + transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setUuid(UUID.fromString("7554ffe0-4da2-4de9-bebf-f3131fd7a84a")) + .setTopic(testProperties.getTestTopic()) + .setValue(toJsonBytes(testEvent)) + .setCompression(new Compression().setAlgorithm(algorithm)) + ) + ); await().until(() -> receivedCount.get() > 0); waitUntilTablesAreEmpty(); diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/ManualIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/ManualIntTest.java index 59d04e1..e49e426 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/ManualIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/ManualIntTest.java @@ -2,6 +2,7 @@ import static org.awaitility.Awaitility.await; +import com.transferwise.common.baseutils.UuidUtils; import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender; import com.transferwise.kafka.tkms.api.TkmsMessage; @@ -74,7 +75,13 @@ void fillTkmsTable() { try { transactionsHelper.withTransaction().run(() -> { for (int i = 0; i < batchSize; i++) { - transactionalKafkaMessageSender.sendMessage(new TkmsMessage().setTopic("TestTopicPostgres").setKey("a").setValue(contentBytes)); + transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setUuid(UuidUtils.generatePrefixCombUuid()) + .setTopic("TestTopicPostgres") + .setKey("a") + .setValue(contentBytes) + ); } }); log.info("Inserted batch #" + finalT); diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java index 8a24b17..3d70ac2 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.transferwise.common.baseutils.UuidUtils; import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender.SendMessagesRequest; import com.transferwise.kafka.tkms.api.TkmsMessage; @@ -42,10 +43,26 @@ void messagesAreDecorateWithJambi() { String topic = testProperties.getTestTopic(); transactionsHelper.withTransaction().run(() -> - transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest() - .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("adam-jones").setShard(4).setValue(someValue)) - .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("danny-carey").setPartition(5).setValue(someValue)) - )); + transactionalKafkaMessageSender.sendMessages( + new SendMessagesRequest() + .addTkmsMessage( + new TkmsMessage() + .setUuid(UuidUtils.generatePrefixCombUuid()) + .setTopic(topic) + .setKey("adam-jones") + .setShard(4) + .setValue(someValue) + ) + .addTkmsMessage( + new TkmsMessage() + .setUuid(UuidUtils.generatePrefixCombUuid()) + .setTopic(topic) + .setKey("danny-carey") + .setPartition(5) + .setValue(someValue) + ) + ) + ); await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2); var messages = tkmsSentMessagesCollector.getSentMessages(topic); diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagesInterceptionTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagesInterceptionTest.java index 29b2214..352597b 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagesInterceptionTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagesInterceptionTest.java @@ -3,6 +3,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import com.transferwise.common.baseutils.UuidUtils; import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; import com.transferwise.kafka.tkms.api.ITkmsMessageInterceptor.MessageInterceptionDecision; import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender.SendMessagesRequest; @@ -67,11 +68,31 @@ void messagesCanBeIntercepted() { String topic = testProperties.getTestTopic(); transactionsHelper.withTransaction().run(() -> - transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest() - .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("A").setValue(someValue)) - .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("B").setValue(someValue)) - .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("C").setValue(someValue)) - )); + transactionalKafkaMessageSender.sendMessages( + new SendMessagesRequest() + .addTkmsMessage( + new TkmsMessage() + .setUuid(UuidUtils.generatePrefixCombUuid()) + .setTopic(topic) + .setKey("A") + .setValue(someValue) + ) + .addTkmsMessage( + new TkmsMessage() + .setUuid(UuidUtils.generatePrefixCombUuid()) + .setTopic(topic) + .setKey("B") + .setValue(someValue) + ) + .addTkmsMessage( + new TkmsMessage() + .setUuid(UuidUtils.generatePrefixCombUuid()) + .setTopic(topic) + .setKey("C") + .setValue(someValue) + ) + ) + ); await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2); diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MultiServerTopicValidationIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MultiServerTopicValidationIntTest.java index 0773705..112c77d 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MultiServerTopicValidationIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MultiServerTopicValidationIntTest.java @@ -2,6 +2,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.transferwise.common.baseutils.UuidUtils; import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender; import com.transferwise.kafka.tkms.api.TkmsMessage; @@ -62,7 +63,10 @@ void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean protected void sendMessage(String topic, Integer shard) { try { transactionsHelper.withTransaction().run(() -> { - var message = new TkmsMessage().setTopic(topic).setValue("Stuff".getBytes(StandardCharsets.UTF_8)); + var message = new TkmsMessage() + .setUuid(UuidUtils.generatePrefixCombUuid()) + .setTopic(topic) + .setValue("Stuff".getBytes(StandardCharsets.UTF_8)); if (shard != null) { message.setShard(shard); } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/PostgresEndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/PostgresEndToEndIntTest.java index c1e9a71..c6721b3 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/PostgresEndToEndIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/PostgresEndToEndIntTest.java @@ -16,12 +16,12 @@ private static Stream compressionInput() { var arguments = new ArrayList(); for (var deferUntilCommit : deferUntilCommits) { - arguments.add(Arguments.of(CompressionAlgorithm.GZIP, 111, 110, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.NONE, 1172, 1171, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.LZ4, 135, 134, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY, 164, 160, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY_FRAMED, 162, 158, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.ZSTD, 101, 100, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.GZIP, 164, 165, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.NONE, 1226, 1226, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.LZ4, 190, 190, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY, 216, 216, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY_FRAMED, 214, 214, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.ZSTD, 155, 155, deferUntilCommit)); } return arguments.stream(); diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/TkmsMessageValidationIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/TkmsMessageValidationIntTest.java index 8791613..29a5449 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/TkmsMessageValidationIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/TkmsMessageValidationIntTest.java @@ -7,7 +7,10 @@ import com.transferwise.kafka.tkms.api.TkmsMessage; import com.transferwise.kafka.tkms.test.BaseIntTest; import com.transferwise.kafka.tkms.test.BaseTestEnvironment; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.springframework.beans.factory.annotation.Autowired; @@ -20,9 +23,17 @@ class TkmsMessageValidationIntTest extends BaseIntTest { @Autowired private ITransactionsHelper transactionsHelper; + private boolean uuidHeaderRequired; + + @BeforeEach + public void saveConfig() { + uuidHeaderRequired = tkmsProperties.isUuidHeaderRequired(); + } + @AfterEach public void cleanup() { tkmsProperties.setDeferMessageRegistrationUntilCommit(false); + tkmsProperties.setUuidHeaderRequired(uuidHeaderRequired); } protected void setupConfig(boolean deferUntilCommit) { @@ -40,4 +51,35 @@ void invalidMessagesDoNotPassValidation(boolean deferUntilCommit) { .isInstanceOf(IllegalArgumentException.class) .hasMessage("0: No topic provided."); } + + @Test + void failsToSendMessageThatHasNoUuidWhenUuidHeaderIsRequired() { + tkmsProperties.setUuidHeaderRequired(true); + + assertThatThrownBy( + () -> transactionsHelper.withTransaction().run( + () -> transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setTopic(testProperties.getTestTopic()) + .setValue(new byte[1]) + ) + ) + ).isInstanceOf(IllegalArgumentException.class) + .hasMessage("0: Message is required to have @{x-wise-uuid} header."); + } + + @Test + void doesNotRequireUuidPresentIfItIsNotRequiredByConfiguration() { + tkmsProperties.setUuidHeaderRequired(false); + + transactionsHelper.withTransaction().run( + () -> transactionalKafkaMessageSender.sendMessage( + new TkmsMessage() + .setTopic(testProperties.getTestTopic()) + .setValue(new byte[1]) + ) + ); + + Awaitility.await().until(() -> tkmsSentMessagesCollector.getSentMessages(testProperties.getTestTopic()).size() == 1); + } }