Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PPP-502] TKMS - Add decorator for message headers #87

Merged
merged 10 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.30.0] 2024-08-06
### Added
- Added `ITkmsMessageDecorator` that kicks in before message is registered and adds custom headers

## [0.29.1] 2024-07-25
### Fixed
- Fixed the prometheus metrics cant cast metric type gauge to info issue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.transferwise.kafka.tkms.TransactionContext.ShardPartitionMessages;
import com.transferwise.kafka.tkms.api.ITkmsEventsListener;
import com.transferwise.kafka.tkms.api.ITkmsEventsListener.MessageRegisteredEvent;
import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.api.TkmsMessage.Header;
Expand Down Expand Up @@ -62,6 +63,8 @@ protected void setTkmsDaoProvider(ITkmsDaoProvider tkmsDaoProvider) {
private IProblemNotifier problemNotifier;
@Autowired
private ITkmsTopicValidator tkmsTopicValidator;
@Autowired
private List<ITkmsMessageDecorator> messageDecorators;

private volatile List<ITkmsEventsListener> tkmsEventsListeners;
private RateLimiter errorLogRateLimiter = RateLimiter.create(2);
Expand Down Expand Up @@ -144,10 +147,10 @@ protected void checkActiveTransaction(int shard, boolean transactionActive, bool

@Override
public SendMessagesResult sendMessages(SendMessagesRequest request) {
var transactionActive = TransactionSynchronizationManager.isActualTransactionActive();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

making checkstyle happy
Distance between variable 'transactionActive' declaration and its first usage is 4, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).


request.getTkmsMessages().forEach(message -> messageDecorators.forEach(message::accept));
validateMessages(request);

var transactionActive = TransactionSynchronizationManager.isActualTransactionActive();
var validatedTopics = new HashSet<String>();

int seq = 0;
Expand Down Expand Up @@ -250,7 +253,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) {

var message = request.getTkmsMessage();
var shardPartition = getShardPartition(message);

messageDecorators.forEach(message::accept);
try {
shardPartition.putIntoMdc();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.transferwise.kafka.tkms.api;

import com.transferwise.kafka.tkms.api.TkmsMessage.Header;
import java.util.List;

public interface ITkmsMessageDecorator {

default List<Header> getHeaders(TkmsMessage message) {
return List.of();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ public TkmsMessage addHeader(Header header) {
return this;
}

public TkmsMessage accept(ITkmsMessageDecorator decorator) {
var headers = decorator.getHeaders(this);
if (headers != null) {
headers.forEach(this::addHeader);
}
return this;
}

/**
* Forces specified compression.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import com.transferwise.common.baseutils.meters.cache.MeterCache;
import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.common.baseutils.transactionsmanagement.TransactionsHelper;
import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.Collections;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand Down Expand Up @@ -39,5 +42,11 @@ public IMeterCache twDefaultMeterCache(MeterRegistry meterRegistry) {
public TransactionsHelper twTransactionsHelper() {
return new TransactionsHelper();
}

@Bean
@ConditionalOnMissingBean(ITkmsMessageDecorator.class)
public List<ITkmsMessageDecorator> messageDecorators() {
return Collections.emptyList();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.transferwise.kafka.tkms;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender.SendMessagesRequest;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.test.BaseIntTest;
import com.transferwise.kafka.tkms.test.ITkmsSentMessagesCollector.SentMessage;
import com.transferwise.kafka.tkms.test.TestMessagesInterceptor;
import com.transferwise.kafka.tkms.test.TestProperties;
import java.nio.charset.StandardCharsets;
import java.util.stream.StreamSupport;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

@Slf4j
class MessageDecorationTest extends BaseIntTest {

@Autowired
private TestMessagesInterceptor testMessagesInterceptor;
@Autowired
private TransactionalKafkaMessageSender transactionalKafkaMessageSender;
@Autowired
private TestProperties testProperties;
@Autowired
private ITransactionsHelper transactionsHelper;

@AfterEach
void cleanupTest() {
testMessagesInterceptor.setBeforeSendingToKafkaFunction(null);
}

@Test
void messagesAreDecorateWithJambi() {
byte[] someValue = "Here from the king's mountain view, Feast like a sultan I do!".getBytes(StandardCharsets.UTF_8);

String topic = testProperties.getTestTopic();

transactionsHelper.withTransaction().run(() ->
transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest()
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("adam-jones").setValue(someValue))
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("danny-carey").setValue(someValue))
));

await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2);
var messages = tkmsSentMessagesCollector.getSentMessages(topic);

assertEquals(2, messages.size());
checkForHeader(messages.get(0), "tool", "jambi");
checkForHeader(messages.get(1), "tool", "jambi");
}

private void checkForHeader(SentMessage sentMessage, String key, String value) {
assertTrue(
StreamSupport.stream(sentMessage.getProducerRecord().headers().spliterator(), false)
.anyMatch(h -> h.key().equals(key) && value.equals(new String(h.value())))

Check warning on line 61 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java

View workflow job for this annotation

GitHub Actions / Spotbugs Report-(3.2.2)

DM_DEFAULT_ENCODING

Found reliance on default encoding in com.transferwise.kafka.tkms.MessageDecorationTest.lambda$checkForHeader$2(String, String, Header): new String(byte[])
Raw output
Found a call to a method which will perform a byte to String (or String to byte) conversion, and will assume that the default platform encoding is suitable. This will cause the application behavior to vary between platforms. Use an alternative API and specify a charset name or Charset object explicitly.

Check warning on line 61 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessageDecorationTest.java

View workflow job for this annotation

GitHub Actions / Spotbugs Report-(3.3.1)

DM_DEFAULT_ENCODING

Found reliance on default encoding in com.transferwise.kafka.tkms.MessageDecorationTest.lambda$checkForHeader$2(String, String, Header): new String(byte[])
Raw output
Found a call to a method which will perform a byte to String (or String to byte) conversion, and will assume that the default platform encoding is suitable. This will cause the application behavior to vary between platforms. Use an alternative API and specify a charset name or Charset object explicitly.
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.transferwise.kafka.tkms.test;

import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.api.TkmsMessage.Header;
import java.util.List;
import org.springframework.stereotype.Component;

@Component
public class TestMessageDecorator implements ITkmsMessageDecorator {

@Override
public List<Header> getHeaders(TkmsMessage message) {
var h1 = new Header().setKey("tool").setValue("jambi".getBytes());

Check warning on line 14 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java

View workflow job for this annotation

GitHub Actions / Spotbugs Report-(3.2.2)

DM_DEFAULT_ENCODING

Found reliance on default encoding in com.transferwise.kafka.tkms.test.TestMessageDecorator.getHeaders(TkmsMessage): String.getBytes()
Raw output
Found a call to a method which will perform a byte to String (or String to byte) conversion, and will assume that the default platform encoding is suitable. This will cause the application behavior to vary between platforms. Use an alternative API and specify a charset name or Charset object explicitly.

Check warning on line 14 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestMessageDecorator.java

View workflow job for this annotation

GitHub Actions / Spotbugs Report-(3.3.1)

DM_DEFAULT_ENCODING

Found reliance on default encoding in com.transferwise.kafka.tkms.test.TestMessageDecorator.getHeaders(TkmsMessage): String.getBytes()
Raw output
Found a call to a method which will perform a byte to String (or String to byte) conversion, and will assume that the default platform encoding is suitable. This will cause the application behavior to vary between platforms. Use an alternative API and specify a charset name or Charset object explicitly.
if (message.getKey() == null) {
return List.of();
}
return List.of(h1);
}

}
Loading