Skip to content

Commit

Permalink
[PPP-502] TKMS - Add decorator for message headers (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
hussainkarafallah authored Aug 6, 2024
1 parent 0e34b7d commit 7067b2c
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 5 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.30.0] 2024-08-06
### Added
- Added `ITkmsMessageDecorator` that kicks in before message is registered and adds custom headers

## [0.29.1] 2024-07-25
### Fixed
- Fixed the prometheus metrics cant cast metric type gauge to info issue.
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.29.1
version=0.30.0
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.transferwise.kafka.tkms.TransactionContext.ShardPartitionMessages;
import com.transferwise.kafka.tkms.api.ITkmsEventsListener;
import com.transferwise.kafka.tkms.api.ITkmsEventsListener.MessageRegisteredEvent;
import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.api.TkmsMessage.Header;
Expand Down Expand Up @@ -62,6 +63,8 @@ protected void setTkmsDaoProvider(ITkmsDaoProvider tkmsDaoProvider) {
private IProblemNotifier problemNotifier;
@Autowired
private ITkmsTopicValidator tkmsTopicValidator;
@Autowired
private List<ITkmsMessageDecorator> messageDecorators;

private volatile List<ITkmsEventsListener> tkmsEventsListeners;
private RateLimiter errorLogRateLimiter = RateLimiter.create(2);
Expand Down Expand Up @@ -144,10 +147,10 @@ protected void checkActiveTransaction(int shard, boolean transactionActive, bool

@Override
public SendMessagesResult sendMessages(SendMessagesRequest request) {
var transactionActive = TransactionSynchronizationManager.isActualTransactionActive();

request.getTkmsMessages().forEach(message -> messageDecorators.forEach(message::accept));
validateMessages(request);

var transactionActive = TransactionSynchronizationManager.isActualTransactionActive();
var validatedTopics = new HashSet<String>();

int seq = 0;
Expand Down Expand Up @@ -250,7 +253,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) {

var message = request.getTkmsMessage();
var shardPartition = getShardPartition(message);

messageDecorators.forEach(message::accept);
try {
shardPartition.putIntoMdc();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.transferwise.kafka.tkms.api;

import com.transferwise.kafka.tkms.api.TkmsMessage.Header;
import java.util.List;
import org.springframework.core.Ordered;

public interface ITkmsMessageDecorator extends Ordered {

default List<Header> getAdditionalHeaders(TkmsMessage message) {
return List.of();
}

default TkmsShardPartition getOverridedPartition(TkmsMessage message) {
return null;
}

default int getOrder() {
return LOWEST_PRECEDENCE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ public TkmsMessage addHeader(Header header) {
return this;
}

public TkmsMessage accept(ITkmsMessageDecorator decorator) {
var headers = decorator.getAdditionalHeaders(this);
if (headers != null) {
headers.forEach(this::addHeader);
}
var overridedPartition = decorator.getOverridedPartition(this);
if (overridedPartition != null) {
setShard(overridedPartition.getShard());
setPartition(overridedPartition.getPartition());
}
return this;
}

/**
* Forces specified compression.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class TkmsShardPartition {
private Tag micrometerPartitionTag;
private String stringPresentation;

private TkmsShardPartition(int shard, int partition) {
public TkmsShardPartition(int shard, int partition) {
this.shard = shard;
this.partition = partition;
this.micrometerShardTag = Tag.of("shard", String.valueOf(shard));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import com.transferwise.common.baseutils.meters.cache.MeterCache;
import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.common.baseutils.transactionsmanagement.TransactionsHelper;
import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.Collections;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand Down Expand Up @@ -40,4 +43,10 @@ public TransactionsHelper twTransactionsHelper() {
return new TransactionsHelper();
}

@Bean
@ConditionalOnMissingBean(ITkmsMessageDecorator.class)
public List<ITkmsMessageDecorator> messageDecorators() {
return Collections.emptyList();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.transferwise.kafka.tkms;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender.SendMessagesRequest;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.test.BaseIntTest;
import com.transferwise.kafka.tkms.test.ITkmsSentMessagesCollector.SentMessage;
import com.transferwise.kafka.tkms.test.TestMessagesInterceptor;
import com.transferwise.kafka.tkms.test.TestProperties;
import java.nio.charset.StandardCharsets;
import java.util.stream.StreamSupport;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

@Slf4j
class MessageDecorationTest extends BaseIntTest {

@Autowired
private TestMessagesInterceptor testMessagesInterceptor;
@Autowired
private TransactionalKafkaMessageSender transactionalKafkaMessageSender;
@Autowired
private TestProperties testProperties;
@Autowired
private ITransactionsHelper transactionsHelper;

@AfterEach
void cleanupTest() {
testMessagesInterceptor.setBeforeSendingToKafkaFunction(null);
}

@Test
void messagesAreDecorateWithJambi() {
byte[] someValue = "Here from the king's mountain view, Feast like a sultan I do!".getBytes(StandardCharsets.UTF_8);

String topic = testProperties.getTestTopic();

transactionsHelper.withTransaction().run(() ->
transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest()
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("adam-jones").setShard(4).setValue(someValue))
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("danny-carey").setPartition(5).setValue(someValue))
));

await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2);
var messages = tkmsSentMessagesCollector.getSentMessages(topic);

assertEquals(2, messages.size());
checkForHeader(messages.get(0), "tool", "jambi");
assertEquals(0, messages.get(0).getShardPartition().getShard());
assertEquals(0, messages.get(0).getShardPartition().getPartition());
checkForHeader(messages.get(1), "tool", "jambi");
assertEquals(0, messages.get(1).getShardPartition().getShard());
assertEquals(0, messages.get(1).getShardPartition().getPartition());
}

private void checkForHeader(SentMessage sentMessage, String key, String value) {
assertTrue(
StreamSupport.stream(sentMessage.getProducerRecord().headers().spliterator(), false)
.anyMatch(h -> h.key().equals(key) && value.equals(new String(h.value(), StandardCharsets.UTF_8)))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.transferwise.kafka.tkms.test;

import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.api.TkmsMessage.Header;
import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.springframework.stereotype.Component;

@Component
public class TestMessageDecorator implements ITkmsMessageDecorator {

@Override
public List<Header> getAdditionalHeaders(TkmsMessage message) {
if (message.getValue() != null && new String(message.getValue(), StandardCharsets.UTF_8).startsWith("Here from")) {
return List.of(new Header().setKey("tool").setValue("jambi".getBytes(StandardCharsets.UTF_8)));
}
return List.of();
}

@Override
public TkmsShardPartition getOverridedPartition(TkmsMessage message) {
if (message.getValue() != null && new String(message.getValue(), StandardCharsets.UTF_8).startsWith("Here from")) {
return new TkmsShardPartition(0, 0);
}
return null;
}

}

0 comments on commit 7067b2c

Please sign in to comment.