Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PPP-502] TKMS - Add decorator for message headers #87

Merged
merged 10 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.30.0] 2024-08-06
### Added
- Added `ITkmsMessageDecorator` that kicks in before message is registered and adds custom headers

## [0.29.1] 2024-07-25
### Fixed
- Fixed the prometheus metrics cant cast metric type gauge to info issue.
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.29.1
version=0.30.0
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.transferwise.kafka.tkms.TransactionContext.ShardPartitionMessages;
import com.transferwise.kafka.tkms.api.ITkmsEventsListener;
import com.transferwise.kafka.tkms.api.ITkmsEventsListener.MessageRegisteredEvent;
import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.api.TkmsMessage.Header;
Expand Down Expand Up @@ -62,6 +63,8 @@ protected void setTkmsDaoProvider(ITkmsDaoProvider tkmsDaoProvider) {
private IProblemNotifier problemNotifier;
@Autowired
private ITkmsTopicValidator tkmsTopicValidator;
@Autowired
private List<ITkmsMessageDecorator> messageDecorators;

private volatile List<ITkmsEventsListener> tkmsEventsListeners;
private RateLimiter errorLogRateLimiter = RateLimiter.create(2);
Expand Down Expand Up @@ -144,10 +147,10 @@ protected void checkActiveTransaction(int shard, boolean transactionActive, bool

@Override
public SendMessagesResult sendMessages(SendMessagesRequest request) {
var transactionActive = TransactionSynchronizationManager.isActualTransactionActive();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

making checkstyle happy
Distance between variable 'transactionActive' declaration and its first usage is 4, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).


request.getTkmsMessages().forEach(message -> messageDecorators.forEach(message::accept));
validateMessages(request);

var transactionActive = TransactionSynchronizationManager.isActualTransactionActive();
var validatedTopics = new HashSet<String>();

int seq = 0;
Expand Down Expand Up @@ -250,7 +253,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) {

var message = request.getTkmsMessage();
var shardPartition = getShardPartition(message);

messageDecorators.forEach(message::accept);
try {
shardPartition.putIntoMdc();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.transferwise.kafka.tkms.api;

import com.transferwise.kafka.tkms.api.TkmsMessage.Header;
import java.util.List;
import org.springframework.core.Ordered;

public interface ITkmsMessageDecorator extends Ordered {

default List<Header> getAdditionalHeaders(TkmsMessage message) {
return List.of();
}

default TkmsShardPartition getOverridedPartition(TkmsMessage message) {
return null;
}

default int getOrder() {
return LOWEST_PRECEDENCE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ public TkmsMessage addHeader(Header header) {
return this;
}

public TkmsMessage accept(ITkmsMessageDecorator decorator) {
var headers = decorator.getAdditionalHeaders(this);
if (headers != null) {
headers.forEach(this::addHeader);
}
var overridedPartition = decorator.getOverridedPartition(this);
if (overridedPartition != null) {
setShard(overridedPartition.getShard());
setPartition(overridedPartition.getPartition());
}
return this;
}

/**
* Forces specified compression.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class TkmsShardPartition {
private Tag micrometerPartitionTag;
private String stringPresentation;

private TkmsShardPartition(int shard, int partition) {
public TkmsShardPartition(int shard, int partition) {
this.shard = shard;
this.partition = partition;
this.micrometerShardTag = Tag.of("shard", String.valueOf(shard));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import com.transferwise.common.baseutils.meters.cache.MeterCache;
import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.common.baseutils.transactionsmanagement.TransactionsHelper;
import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.Collections;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand Down Expand Up @@ -40,4 +43,10 @@ public TransactionsHelper twTransactionsHelper() {
return new TransactionsHelper();
}

@Bean
@ConditionalOnMissingBean(ITkmsMessageDecorator.class)
public List<ITkmsMessageDecorator> messageDecorators() {
return Collections.emptyList();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.transferwise.kafka.tkms;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender.SendMessagesRequest;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.test.BaseIntTest;
import com.transferwise.kafka.tkms.test.ITkmsSentMessagesCollector.SentMessage;
import com.transferwise.kafka.tkms.test.TestMessagesInterceptor;
import com.transferwise.kafka.tkms.test.TestProperties;
import java.nio.charset.StandardCharsets;
import java.util.stream.StreamSupport;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

@Slf4j
class MessageDecorationTest extends BaseIntTest {

@Autowired
private TestMessagesInterceptor testMessagesInterceptor;
@Autowired
private TransactionalKafkaMessageSender transactionalKafkaMessageSender;
@Autowired
private TestProperties testProperties;
@Autowired
private ITransactionsHelper transactionsHelper;

@AfterEach
void cleanupTest() {
testMessagesInterceptor.setBeforeSendingToKafkaFunction(null);
}

@Test
void messagesAreDecorateWithJambi() {
byte[] someValue = "Here from the king's mountain view, Feast like a sultan I do!".getBytes(StandardCharsets.UTF_8);

String topic = testProperties.getTestTopic();

transactionsHelper.withTransaction().run(() ->
transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest()
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("adam-jones").setShard(4).setValue(someValue))
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("danny-carey").setPartition(5).setValue(someValue))
));

await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2);
var messages = tkmsSentMessagesCollector.getSentMessages(topic);

assertEquals(2, messages.size());
checkForHeader(messages.get(0), "tool", "jambi");
assertEquals(0, messages.get(0).getShardPartition().getShard());
assertEquals(0, messages.get(0).getShardPartition().getPartition());
checkForHeader(messages.get(1), "tool", "jambi");
assertEquals(0, messages.get(1).getShardPartition().getShard());
assertEquals(0, messages.get(1).getShardPartition().getPartition());
}

private void checkForHeader(SentMessage sentMessage, String key, String value) {
assertTrue(
StreamSupport.stream(sentMessage.getProducerRecord().headers().spliterator(), false)
.anyMatch(h -> h.key().equals(key) && value.equals(new String(h.value(), StandardCharsets.UTF_8)))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.transferwise.kafka.tkms.test;

import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.api.TkmsMessage.Header;
import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.springframework.stereotype.Component;

@Component
public class TestMessageDecorator implements ITkmsMessageDecorator {

@Override
public List<Header> getAdditionalHeaders(TkmsMessage message) {
if (message.getValue() != null && new String(message.getValue(), StandardCharsets.UTF_8).startsWith("Here from")) {
return List.of(new Header().setKey("tool").setValue("jambi".getBytes(StandardCharsets.UTF_8)));
}
return List.of();
}

@Override
public TkmsShardPartition getOverridedPartition(TkmsMessage message) {
if (message.getValue() != null && new String(message.getValue(), StandardCharsets.UTF_8).startsWith("Here from")) {
return new TkmsShardPartition(0, 0);
}
return null;
}

}
Loading