From cc9303af0f87e0b1e3041220901d05df1cb0d422 Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Tue, 6 Aug 2024 11:29:15 +0300 Subject: [PATCH 01/10] good --- CHANGELOG.md | 4 ++ .../tkms/TransactionalKafkaMessageSender.java | 7 +- .../kafka/tkms/api/ITkmsMessageDecorator.java | 11 +++ .../kafka/tkms/api/TkmsMessage.java | 8 +++ .../tkms/config/TkmsAutoConfiguration.java | 9 +++ .../kafka/tkms/MessageDecorationTest.java | 70 +++++++++++++++++++ .../kafka/tkms/test/TestMessageDecorator.java | 21 ++++++ 7 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java create mode 100644 tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java create mode 100644 tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java diff --git a/CHANGELOG.md b/CHANGELOG.md index c466b13..c11de0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.30.0] 2024-07-25 +### Added +- Added `ITkmsMessageDecorator` that kicks in before message is registered and adds custom headers + ## [0.29.1] 2024-07-25 ### Fixed - Fixed the prometheus metrics cant cast metric type gauge to info issue. diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java index b681507..74d5079 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java @@ -8,6 +8,7 @@ import com.transferwise.kafka.tkms.TransactionContext.ShardPartitionMessages; import com.transferwise.kafka.tkms.api.ITkmsEventsListener; import com.transferwise.kafka.tkms.api.ITkmsEventsListener.MessageRegisteredEvent; +import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator; import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender; import com.transferwise.kafka.tkms.api.TkmsMessage; import com.transferwise.kafka.tkms.api.TkmsMessage.Header; @@ -62,6 +63,8 @@ protected void setTkmsDaoProvider(ITkmsDaoProvider tkmsDaoProvider) { private IProblemNotifier problemNotifier; @Autowired private ITkmsTopicValidator tkmsTopicValidator; + @Autowired + private List messageDecorators; private volatile List tkmsEventsListeners; private RateLimiter errorLogRateLimiter = RateLimiter.create(2); @@ -145,7 +148,7 @@ protected void checkActiveTransaction(int shard, boolean transactionActive, bool @Override public SendMessagesResult sendMessages(SendMessagesRequest request) { var transactionActive = TransactionSynchronizationManager.isActualTransactionActive(); - + request.getTkmsMessages().forEach(message -> messageDecorators.forEach(message::accept)); validateMessages(request); var validatedTopics = new HashSet(); @@ -250,7 +253,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) { var message = request.getTkmsMessage(); var shardPartition = getShardPartition(message); - + messageDecorators.forEach(message::accept); try { shardPartition.putIntoMdc(); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java new file mode 100644 index 0000000..92c6467 --- /dev/null +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java @@ -0,0 +1,11 @@ +package com.transferwise.kafka.tkms.api; + +import com.transferwise.kafka.tkms.api.TkmsMessage.Header; +import java.util.List; +import java.util.Map; + +public interface ITkmsMessageDecorator { + default List
getHeaders(TkmsMessage message){ + return List.of(); + } +} diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java index a4ce7ac..ce0376d 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java @@ -79,6 +79,14 @@ public TkmsMessage addHeader(Header header) { return this; } + public TkmsMessage accept(ITkmsMessageDecorator decorator){ + var headers = decorator.getHeaders(this); + if(headers != null){ + headers.forEach(this::addHeader); + } + return this; + } + /** * Forces specified compression. */ diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java index 5104543..dda6d8d 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java @@ -6,6 +6,7 @@ import com.transferwise.common.baseutils.meters.cache.MeterCache; import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; import com.transferwise.common.baseutils.transactionsmanagement.TransactionsHelper; +import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator; import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.AutoConfigureAfter; @@ -15,6 +16,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import java.util.Collections; +import java.util.List; @Configuration @AutoConfigureAfter({FlywayAutoConfiguration.class, ValidationAutoConfiguration.class}) @@ -40,4 +43,10 @@ public TransactionsHelper twTransactionsHelper() { return new TransactionsHelper(); } + @Bean + @ConditionalOnMissingBean + public List messageDecorators() { + return Collections.emptyList(); + } + } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java new file mode 100644 index 0000000..5422e9c --- /dev/null +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java @@ -0,0 +1,70 @@ +package com.transferwise.kafka.tkms; + +import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; +import com.transferwise.kafka.tkms.api.ITkmsMessageInterceptor.MessageInterceptionDecision; +import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender.SendMessagesRequest; +import com.transferwise.kafka.tkms.api.TkmsMessage; +import com.transferwise.kafka.tkms.test.BaseIntTest; +import com.transferwise.kafka.tkms.test.ITkmsSentMessagesCollector.SentMessage; +import com.transferwise.kafka.tkms.test.TestMessagesInterceptor; +import com.transferwise.kafka.tkms.test.TestProperties; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Slf4j +class MessageDecorationTest extends BaseIntTest { + + @Autowired + private TestMessagesInterceptor testMessagesInterceptor; + @Autowired + private TransactionalKafkaMessageSender transactionalKafkaMessageSender; + @Autowired + private TestProperties testProperties; + @Autowired + private ITransactionsHelper transactionsHelper; + + @AfterEach + void cleanupTest() { + testMessagesInterceptor.setBeforeSendingToKafkaFunction(null); + } + + @Test + void messagesAreDecorateWithJambi() { + byte[] someValue = "Here from the king's mountain view, Feast like a sultan I do!".getBytes(StandardCharsets.UTF_8); + + String topic = testProperties.getTestTopic(); + + transactionsHelper.withTransaction().run(() -> + transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest() + .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("A").setValue(someValue)) + .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("B").setValue(someValue)) + )); + + await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2); + + var messages = tkmsSentMessagesCollector.getSentMessages(topic); + assertThat(messages.size()).isEqualTo(2); + checkForHeader(messages.get(0) , "adam-jones" , "jambi"); + checkForHeader(messages.get(1) , "danny-carey" , "the-grudge"); + } + + private void checkForHeader(SentMessage sentMessage, String key, String value) { + assertTrue( + StreamSupport.stream(sentMessage.getProducerRecord().headers().spliterator() , false) + .anyMatch(h -> h.key().equals(key) && value.equals(new String(h.value()))) + ); + } +} diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java new file mode 100644 index 0000000..285420c --- /dev/null +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java @@ -0,0 +1,21 @@ +package com.transferwise.kafka.tkms.test; + +import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator; +import com.transferwise.kafka.tkms.api.TkmsMessage; +import com.transferwise.kafka.tkms.api.TkmsMessage.Header; +import org.springframework.stereotype.Component; +import java.util.List; + +@Component +public class TestMessageDecorator implements ITkmsMessageDecorator { + + @Override + public List
getHeaders(TkmsMessage message){ + var h1 = new Header().setKey("adam-jones").setValue("jambi".getBytes()); + if (message.getValue() != null && new String(message.getValue()).startsWith("Here from the king")) { + return List.of(h1); + } + return List.of(); + } + +} From 2cf168ab5a475773d2f103952a6b7e87491e3edc Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Tue, 6 Aug 2024 11:33:33 +0300 Subject: [PATCH 02/10] hey --- .../transferwise/kafka/tkms/config/TkmsAutoConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java index dda6d8d..0874f8c 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java @@ -44,7 +44,7 @@ public TransactionsHelper twTransactionsHelper() { } @Bean - @ConditionalOnMissingBean + @ConditionalOnMissingBean(ITkmsMessageDecorator.class) public List messageDecorators() { return Collections.emptyList(); } From e6d5a6d9559090b0f463c443177ce5978fb05c7a Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Tue, 6 Aug 2024 11:36:06 +0300 Subject: [PATCH 03/10] hey --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c11de0c..25d59b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [0.30.0] 2024-07-25 +## [0.30.0] 2024-08-06 ### Added - Added `ITkmsMessageDecorator` that kicks in before message is registered and adds custom headers From 36842591cbcd40bd7fe8ebba0bbd639980d7f81d Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Tue, 6 Aug 2024 13:17:47 +0300 Subject: [PATCH 04/10] happy checkstyle --- .../tkms/TransactionalKafkaMessageSender.java | 2 +- .../kafka/tkms/api/ITkmsMessageDecorator.java | 4 +-- .../kafka/tkms/api/TkmsMessage.java | 4 +-- .../tkms/config/TkmsAutoConfiguration.java | 6 ++-- .../kafka/tkms/MessageDecorationTest.java | 34 ++++++++----------- .../kafka/tkms/test/TestMessageDecorator.java | 12 +++---- 6 files changed, 28 insertions(+), 34 deletions(-) diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java index 74d5079..5c84134 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java @@ -147,10 +147,10 @@ protected void checkActiveTransaction(int shard, boolean transactionActive, bool @Override public SendMessagesResult sendMessages(SendMessagesRequest request) { - var transactionActive = TransactionSynchronizationManager.isActualTransactionActive(); request.getTkmsMessages().forEach(message -> messageDecorators.forEach(message::accept)); validateMessages(request); + var transactionActive = TransactionSynchronizationManager.isActualTransactionActive(); var validatedTopics = new HashSet(); int seq = 0; diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java index 92c6467..e39c9a6 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java @@ -2,10 +2,10 @@ import com.transferwise.kafka.tkms.api.TkmsMessage.Header; import java.util.List; -import java.util.Map; public interface ITkmsMessageDecorator { - default List
getHeaders(TkmsMessage message){ + + default List
getHeaders(TkmsMessage message) { return List.of(); } } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java index ce0376d..1a3380b 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java @@ -79,9 +79,9 @@ public TkmsMessage addHeader(Header header) { return this; } - public TkmsMessage accept(ITkmsMessageDecorator decorator){ + public TkmsMessage accept(ITkmsMessageDecorator decorator) { var headers = decorator.getHeaders(this); - if(headers != null){ + if (headers != null) { headers.forEach(this::addHeader); } return this; diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java index 0874f8c..2cdef1e 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java @@ -8,6 +8,8 @@ import com.transferwise.common.baseutils.transactionsmanagement.TransactionsHelper; import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator; import io.micrometer.core.instrument.MeterRegistry; +import java.util.Collections; +import java.util.List; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -16,8 +18,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; -import java.util.Collections; -import java.util.List; @Configuration @AutoConfigureAfter({FlywayAutoConfiguration.class, ValidationAutoConfiguration.class}) @@ -42,7 +42,7 @@ public IMeterCache twDefaultMeterCache(MeterRegistry meterRegistry) { public TransactionsHelper twTransactionsHelper() { return new TransactionsHelper(); } - + @Bean @ConditionalOnMissingBean(ITkmsMessageDecorator.class) public List messageDecorators() { diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java index 5422e9c..27fc2ac 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java @@ -1,28 +1,22 @@ package com.transferwise.kafka.tkms; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; -import com.transferwise.kafka.tkms.api.ITkmsMessageInterceptor.MessageInterceptionDecision; import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender.SendMessagesRequest; import com.transferwise.kafka.tkms.api.TkmsMessage; import com.transferwise.kafka.tkms.test.BaseIntTest; import com.transferwise.kafka.tkms.test.ITkmsSentMessagesCollector.SentMessage; import com.transferwise.kafka.tkms.test.TestMessagesInterceptor; import com.transferwise.kafka.tkms.test.TestProperties; +import java.nio.charset.StandardCharsets; +import java.util.stream.StreamSupport; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertTrue; @Slf4j class MessageDecorationTest extends BaseIntTest { @@ -49,22 +43,22 @@ void messagesAreDecorateWithJambi() { transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest() - .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("A").setValue(someValue)) - .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("B").setValue(someValue)) + .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("adam-jones").setValue(someValue)) + .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("danny-carey").setValue(someValue)) )); await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2); - var messages = tkmsSentMessagesCollector.getSentMessages(topic); - assertThat(messages.size()).isEqualTo(2); - checkForHeader(messages.get(0) , "adam-jones" , "jambi"); - checkForHeader(messages.get(1) , "danny-carey" , "the-grudge"); + + assertEquals(2, messages.size()); + checkForHeader(messages.get(0), "tool", "jambi"); + checkForHeader(messages.get(1), "tool", "jambi"); } private void checkForHeader(SentMessage sentMessage, String key, String value) { assertTrue( - StreamSupport.stream(sentMessage.getProducerRecord().headers().spliterator() , false) - .anyMatch(h -> h.key().equals(key) && value.equals(new String(h.value()))) + StreamSupport.stream(sentMessage.getProducerRecord().headers().spliterator(), false) + .anyMatch(h -> h.key().equals(key) && value.equals(new String(h.value()))) ); } } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java index 285420c..6353b3b 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java @@ -3,19 +3,19 @@ import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator; import com.transferwise.kafka.tkms.api.TkmsMessage; import com.transferwise.kafka.tkms.api.TkmsMessage.Header; -import org.springframework.stereotype.Component; import java.util.List; +import org.springframework.stereotype.Component; @Component public class TestMessageDecorator implements ITkmsMessageDecorator { @Override - public List
getHeaders(TkmsMessage message){ - var h1 = new Header().setKey("adam-jones").setValue("jambi".getBytes()); - if (message.getValue() != null && new String(message.getValue()).startsWith("Here from the king")) { - return List.of(h1); + public List
getHeaders(TkmsMessage message) { + var h1 = new Header().setKey("tool").setValue("jambi".getBytes()); + if (message.getKey() == null) { + return List.of(); } - return List.of(); + return List.of(h1); } } From e4dc33809c445ddde71747a8ada1fdab891f3db8 Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Tue, 6 Aug 2024 13:23:09 +0300 Subject: [PATCH 05/10] spotbugs happy --- .../com/transferwise/kafka/tkms/MessageDecorationTest.java | 2 +- .../com/transferwise/kafka/tkms/test/TestMessageDecorator.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java index 27fc2ac..6768f08 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java @@ -58,7 +58,7 @@ void messagesAreDecorateWithJambi() { private void checkForHeader(SentMessage sentMessage, String key, String value) { assertTrue( StreamSupport.stream(sentMessage.getProducerRecord().headers().spliterator(), false) - .anyMatch(h -> h.key().equals(key) && value.equals(new String(h.value()))) + .anyMatch(h -> h.key().equals(key) && value.equals(new String(h.value(), StandardCharsets.UTF_8))) ); } } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java index 6353b3b..10147e3 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java @@ -3,6 +3,7 @@ import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator; import com.transferwise.kafka.tkms.api.TkmsMessage; import com.transferwise.kafka.tkms.api.TkmsMessage.Header; +import java.nio.charset.StandardCharsets; import java.util.List; import org.springframework.stereotype.Component; @@ -11,7 +12,7 @@ public class TestMessageDecorator implements ITkmsMessageDecorator { @Override public List
getHeaders(TkmsMessage message) { - var h1 = new Header().setKey("tool").setValue("jambi".getBytes()); + var h1 = new Header().setKey("tool").setValue("jambi".getBytes(StandardCharsets.UTF_8)); if (message.getKey() == null) { return List.of(); } From e8846b85f036a366754070cc6551a4e452e8d51f Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Tue, 6 Aug 2024 13:30:30 +0300 Subject: [PATCH 06/10] test good --- .../transferwise/kafka/tkms/test/TestMessageDecorator.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java index 10147e3..478af31 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java @@ -13,10 +13,10 @@ public class TestMessageDecorator implements ITkmsMessageDecorator { @Override public List
getHeaders(TkmsMessage message) { var h1 = new Header().setKey("tool").setValue("jambi".getBytes(StandardCharsets.UTF_8)); - if (message.getKey() == null) { - return List.of(); + if (message.getValue() != null && new String(message.getValue(), StandardCharsets.UTF_8).startsWith("Here from")) { + return List.of(h1); } - return List.of(h1); + return List.of(); } } From 7eab357f4d4cbc973fbf209415fedd356ed0f00a Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Tue, 6 Aug 2024 14:40:10 +0300 Subject: [PATCH 07/10] add extra stuff --- .../kafka/tkms/api/ITkmsMessageDecorator.java | 14 ++++++++++++-- .../transferwise/kafka/tkms/api/TkmsMessage.java | 7 ++++++- .../kafka/tkms/api/TkmsShardPartition.java | 2 +- .../kafka/tkms/config/TkmsAutoConfiguration.java | 2 +- .../kafka/tkms/MessageDecorationTest.java | 8 ++++++-- .../kafka/tkms/test/TestMessageDecorator.java | 8 +++++++- 6 files changed, 33 insertions(+), 8 deletions(-) diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java index e39c9a6..982cc3e 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/ITkmsMessageDecorator.java @@ -2,10 +2,20 @@ import com.transferwise.kafka.tkms.api.TkmsMessage.Header; import java.util.List; +import org.springframework.core.Ordered; -public interface ITkmsMessageDecorator { +public interface ITkmsMessageDecorator extends Ordered { - default List
getHeaders(TkmsMessage message) { + default List
getAdditionalHeaders(TkmsMessage message) { return List.of(); } + + default TkmsShardPartition getOverridedPartition(TkmsMessage message) { + return null; + } + + default int getOrder() { + return LOWEST_PRECEDENCE; + } + } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java index 1a3380b..cd394b5 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsMessage.java @@ -80,10 +80,15 @@ public TkmsMessage addHeader(Header header) { } public TkmsMessage accept(ITkmsMessageDecorator decorator) { - var headers = decorator.getHeaders(this); + var headers = decorator.getAdditionalHeaders(this); if (headers != null) { headers.forEach(this::addHeader); } + var overridedPartition = decorator.getOverridedPartition(this); + if (overridedPartition != null) { + setShard(overridedPartition.getShard()); + setPartition(overridedPartition.getPartition()); + } return this; } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsShardPartition.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsShardPartition.java index 8da6f52..a418f19 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsShardPartition.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/api/TkmsShardPartition.java @@ -17,7 +17,7 @@ public class TkmsShardPartition { private Tag micrometerPartitionTag; private String stringPresentation; - private TkmsShardPartition(int shard, int partition) { + public TkmsShardPartition(int shard, int partition) { this.shard = shard; this.partition = partition; this.micrometerShardTag = Tag.of("shard", String.valueOf(shard)); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java index 2cdef1e..987a80c 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java @@ -42,7 +42,7 @@ public IMeterCache twDefaultMeterCache(MeterRegistry meterRegistry) { public TransactionsHelper twTransactionsHelper() { return new TransactionsHelper(); } - + @Bean @ConditionalOnMissingBean(ITkmsMessageDecorator.class) public List messageDecorators() { diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java index 6768f08..8a24b17 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java @@ -43,8 +43,8 @@ void messagesAreDecorateWithJambi() { transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest() - .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("adam-jones").setValue(someValue)) - .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("danny-carey").setValue(someValue)) + .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("adam-jones").setShard(4).setValue(someValue)) + .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("danny-carey").setPartition(5).setValue(someValue)) )); await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2); @@ -52,7 +52,11 @@ void messagesAreDecorateWithJambi() { assertEquals(2, messages.size()); checkForHeader(messages.get(0), "tool", "jambi"); + assertEquals(0, messages.get(0).getShardPartition().getShard()); + assertEquals(0, messages.get(0).getShardPartition().getPartition()); checkForHeader(messages.get(1), "tool", "jambi"); + assertEquals(0, messages.get(1).getShardPartition().getShard()); + assertEquals(0, messages.get(1).getShardPartition().getPartition()); } private void checkForHeader(SentMessage sentMessage, String key, String value) { diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java index 478af31..44dec08 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java @@ -3,6 +3,7 @@ import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator; import com.transferwise.kafka.tkms.api.TkmsMessage; import com.transferwise.kafka.tkms.api.TkmsMessage.Header; +import com.transferwise.kafka.tkms.api.TkmsShardPartition; import java.nio.charset.StandardCharsets; import java.util.List; import org.springframework.stereotype.Component; @@ -11,7 +12,7 @@ public class TestMessageDecorator implements ITkmsMessageDecorator { @Override - public List
getHeaders(TkmsMessage message) { + public List
getAdditionalHeaders(TkmsMessage message) { var h1 = new Header().setKey("tool").setValue("jambi".getBytes(StandardCharsets.UTF_8)); if (message.getValue() != null && new String(message.getValue(), StandardCharsets.UTF_8).startsWith("Here from")) { return List.of(h1); @@ -19,4 +20,9 @@ public List
getHeaders(TkmsMessage message) { return List.of(); } + @Override + public TkmsShardPartition getOverridedPartition(TkmsMessage message) { + return new TkmsShardPartition(0, 0); + } + } From 52a348d321dc905479fcea3da9bbc08aed17b321 Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Tue, 6 Aug 2024 14:48:03 +0300 Subject: [PATCH 08/10] ok test --- .../kafka/tkms/test/TestMessageDecorator.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java index 44dec08..0459c1d 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java @@ -13,16 +13,18 @@ public class TestMessageDecorator implements ITkmsMessageDecorator { @Override public List
getAdditionalHeaders(TkmsMessage message) { - var h1 = new Header().setKey("tool").setValue("jambi".getBytes(StandardCharsets.UTF_8)); if (message.getValue() != null && new String(message.getValue(), StandardCharsets.UTF_8).startsWith("Here from")) { - return List.of(h1); + return List.of(new Header().setKey("tool").setValue("jambi".getBytes(StandardCharsets.UTF_8))); } return List.of(); } @Override public TkmsShardPartition getOverridedPartition(TkmsMessage message) { - return new TkmsShardPartition(0, 0); + if (message.getValue() != null && new String(message.getValue(), StandardCharsets.UTF_8).startsWith("Here from")) { + return new TkmsShardPartition(0, 0); + } + return new TkmsShardPartition(message.getShard(), message.getPartition()); } } From f8cb5223fb92ab827c5ba8e8b607203ab59cc202 Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Tue, 6 Aug 2024 14:53:56 +0300 Subject: [PATCH 09/10] test happy again --- .../com/transferwise/kafka/tkms/test/TestMessageDecorator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java index 0459c1d..cf41233 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java @@ -24,7 +24,7 @@ public TkmsShardPartition getOverridedPartition(TkmsMessage message) { if (message.getValue() != null && new String(message.getValue(), StandardCharsets.UTF_8).startsWith("Here from")) { return new TkmsShardPartition(0, 0); } - return new TkmsShardPartition(message.getShard(), message.getPartition()); + return null; } } From 86ced0346a5bcd6aff5fffc22be852c34bf6ba31 Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Tue, 6 Aug 2024 15:02:30 +0300 Subject: [PATCH 10/10] bump gradle props --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 0de6718..902c55f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.29.1 +version=0.30.0