From 7067b2c00a6c2b3879fa54a5c06dc10ffbbfa664 Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah <35973295+hussainkarafallah@users.noreply.github.com> Date: Tue, 6 Aug 2024 16:28:19 +0300 Subject: [PATCH] [PPP-502] TKMS - Add decorator for message headers (#87) --- CHANGELOG.md | 4 ++ gradle.properties | 2 +- .../tkms/TransactionalKafkaMessageSender.java | 9 ++- .../kafka/tkms/api/ITkmsMessageDecorator.java | 21 ++++++ .../kafka/tkms/api/TkmsMessage.java | 13 ++++ .../kafka/tkms/api/TkmsShardPartition.java | 2 +- .../tkms/config/TkmsAutoConfiguration.java | 9 +++ .../kafka/tkms/MessageDecorationTest.java | 68 +++++++++++++++++++ .../kafka/tkms/test/TestMessageDecorator.java | 30 ++++++++ 9 files changed, 153 insertions(+), 5 deletions(-) create mode 100644 tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java create mode 100644 tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java create mode 100644 tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java diff --git a/CHANGELOG.md b/CHANGELOG.md index c466b13..25d59b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.30.0] 2024-08-06 +### Added +- Added `ITkmsMessageDecorator` that kicks in before message is registered and adds custom headers + ## [0.29.1] 2024-07-25 ### Fixed - Fixed the prometheus metrics cant cast metric type gauge to info issue. diff --git a/gradle.properties b/gradle.properties index 0de6718..902c55f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.29.1 +version=0.30.0 diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java index b681507..5c84134 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java @@ -8,6 +8,7 @@ import com.transferwise.kafka.tkms.TransactionContext.ShardPartitionMessages; import com.transferwise.kafka.tkms.api.ITkmsEventsListener; import com.transferwise.kafka.tkms.api.ITkmsEventsListener.MessageRegisteredEvent; +import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator; import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender; import com.transferwise.kafka.tkms.api.TkmsMessage; import com.transferwise.kafka.tkms.api.TkmsMessage.Header; @@ -62,6 +63,8 @@ protected void setTkmsDaoProvider(ITkmsDaoProvider tkmsDaoProvider) { private IProblemNotifier problemNotifier; @Autowired private ITkmsTopicValidator tkmsTopicValidator; + @Autowired + private List messageDecorators; private volatile List tkmsEventsListeners; private RateLimiter errorLogRateLimiter = RateLimiter.create(2); @@ -144,10 +147,10 @@ protected void checkActiveTransaction(int shard, boolean transactionActive, bool @Override public SendMessagesResult sendMessages(SendMessagesRequest request) { - var transactionActive = TransactionSynchronizationManager.isActualTransactionActive(); - + request.getTkmsMessages().forEach(message -> messageDecorators.forEach(message::accept)); validateMessages(request); + var transactionActive = TransactionSynchronizationManager.isActualTransactionActive(); var validatedTopics = new HashSet(); int seq = 0; @@ -250,7 +253,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) { var message = request.getTkmsMessage(); var shardPartition = getShardPartition(message); - + messageDecorators.forEach(message::accept); try { shardPartition.putIntoMdc(); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java new file mode 100644 index 0000000..982cc3e --- /dev/null +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java @@ -0,0 +1,21 @@ +package com.transferwise.kafka.tkms.api; + +import com.transferwise.kafka.tkms.api.TkmsMessage.Header; +import java.util.List; +import org.springframework.core.Ordered; + +public interface ITkmsMessageDecorator extends Ordered { + + default List
getAdditionalHeaders(TkmsMessage message) { + return List.of(); + } + + default TkmsShardPartition getOverridedPartition(TkmsMessage message) { + return null; + } + + default int getOrder() { + return LOWEST_PRECEDENCE; + } + +} diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java index a4ce7ac..cd394b5 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java @@ -79,6 +79,19 @@ public TkmsMessage addHeader(Header header) { return this; } + public TkmsMessage accept(ITkmsMessageDecorator decorator) { + var headers = decorator.getAdditionalHeaders(this); + if (headers != null) { + headers.forEach(this::addHeader); + } + var overridedPartition = decorator.getOverridedPartition(this); + if (overridedPartition != null) { + setShard(overridedPartition.getShard()); + setPartition(overridedPartition.getPartition()); + } + return this; + } + /** * Forces specified compression. */ diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsShardPartition.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsShardPartition.java index 8da6f52..a418f19 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsShardPartition.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsShardPartition.java @@ -17,7 +17,7 @@ public class TkmsShardPartition { private Tag micrometerPartitionTag; private String stringPresentation; - private TkmsShardPartition(int shard, int partition) { + public TkmsShardPartition(int shard, int partition) { this.shard = shard; this.partition = partition; this.micrometerShardTag = Tag.of("shard", String.valueOf(shard)); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java index 5104543..987a80c 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java @@ -6,7 +6,10 @@ import com.transferwise.common.baseutils.meters.cache.MeterCache; import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; import com.transferwise.common.baseutils.transactionsmanagement.TransactionsHelper; +import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator; import io.micrometer.core.instrument.MeterRegistry; +import java.util.Collections; +import java.util.List; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -40,4 +43,10 @@ public TransactionsHelper twTransactionsHelper() { return new TransactionsHelper(); } + @Bean + @ConditionalOnMissingBean(ITkmsMessageDecorator.class) + public List messageDecorators() { + return Collections.emptyList(); + } + } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java new file mode 100644 index 0000000..8a24b17 --- /dev/null +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java @@ -0,0 +1,68 @@ +package com.transferwise.kafka.tkms; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; +import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender.SendMessagesRequest; +import com.transferwise.kafka.tkms.api.TkmsMessage; +import com.transferwise.kafka.tkms.test.BaseIntTest; +import com.transferwise.kafka.tkms.test.ITkmsSentMessagesCollector.SentMessage; +import com.transferwise.kafka.tkms.test.TestMessagesInterceptor; +import com.transferwise.kafka.tkms.test.TestProperties; +import java.nio.charset.StandardCharsets; +import java.util.stream.StreamSupport; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +@Slf4j +class MessageDecorationTest extends BaseIntTest { + + @Autowired + private TestMessagesInterceptor testMessagesInterceptor; + @Autowired + private TransactionalKafkaMessageSender transactionalKafkaMessageSender; + @Autowired + private TestProperties testProperties; + @Autowired + private ITransactionsHelper transactionsHelper; + + @AfterEach + void cleanupTest() { + testMessagesInterceptor.setBeforeSendingToKafkaFunction(null); + } + + @Test + void messagesAreDecorateWithJambi() { + byte[] someValue = "Here from the king's mountain view, Feast like a sultan I do!".getBytes(StandardCharsets.UTF_8); + + String topic = testProperties.getTestTopic(); + + transactionsHelper.withTransaction().run(() -> + transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest() + .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("adam-jones").setShard(4).setValue(someValue)) + .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("danny-carey").setPartition(5).setValue(someValue)) + )); + + await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2); + var messages = tkmsSentMessagesCollector.getSentMessages(topic); + + assertEquals(2, messages.size()); + checkForHeader(messages.get(0), "tool", "jambi"); + assertEquals(0, messages.get(0).getShardPartition().getShard()); + assertEquals(0, messages.get(0).getShardPartition().getPartition()); + checkForHeader(messages.get(1), "tool", "jambi"); + assertEquals(0, messages.get(1).getShardPartition().getShard()); + assertEquals(0, messages.get(1).getShardPartition().getPartition()); + } + + private void checkForHeader(SentMessage sentMessage, String key, String value) { + assertTrue( + StreamSupport.stream(sentMessage.getProducerRecord().headers().spliterator(), false) + .anyMatch(h -> h.key().equals(key) && value.equals(new String(h.value(), StandardCharsets.UTF_8))) + ); + } +} diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java new file mode 100644 index 0000000..cf41233 --- /dev/null +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java @@ -0,0 +1,30 @@ +package com.transferwise.kafka.tkms.test; + +import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator; +import com.transferwise.kafka.tkms.api.TkmsMessage; +import com.transferwise.kafka.tkms.api.TkmsMessage.Header; +import com.transferwise.kafka.tkms.api.TkmsShardPartition; +import java.nio.charset.StandardCharsets; +import java.util.List; +import org.springframework.stereotype.Component; + +@Component +public class TestMessageDecorator implements ITkmsMessageDecorator { + + @Override + public List
getAdditionalHeaders(TkmsMessage message) { + if (message.getValue() != null && new String(message.getValue(), StandardCharsets.UTF_8).startsWith("Here from")) { + return List.of(new Header().setKey("tool").setValue("jambi".getBytes(StandardCharsets.UTF_8))); + } + return List.of(); + } + + @Override + public TkmsShardPartition getOverridedPartition(TkmsMessage message) { + if (message.getValue() != null && new String(message.getValue(), StandardCharsets.UTF_8).startsWith("Here from")) { + return new TkmsShardPartition(0, 0); + } + return null; + } + +}