diff --git a/CHANGELOG.md b/CHANGELOG.md index 1410d0c..c766f42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,13 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [0.31.1] - 2024-10-09 +## [0.32.0] - 2024-10-11 +### Added +- Added a postprocessor to the `TkmsKafkaProducerProvider` to allow features like tracing attached to the Kafka Producer. +## [0.31.1] - 2024-10-09 ### Fixed - Override the lombok generated `TkmsMessage#setHeaders` to copy the passed headers into a mutable `ArrayList` in order to avoid `UnsupportedOperationException` ## [0.31.0] - 2024-10-07 - ### Changed Added two new methods to the `TkmsMessage` that allow to conveniently use standard uuid and priority headers: diff --git a/gradle.properties b/gradle.properties index 516a2d2..a730574 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.31.1 +version=0.32.0 diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java index fb72a50..0621942 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java @@ -42,7 +42,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.mutable.MutableObject; -import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.InterruptException; @@ -166,7 +166,7 @@ private void poll(Control control, TkmsShardPartition shardPartition) { } } - private void poll0(Control control, TkmsShardPartition shardPartition, KafkaProducer kafkaProducer) { + private void poll0(Control control, TkmsShardPartition shardPartition, Producer kafkaProducer) { int pollerBatchSize = properties.getPollerBatchSize(shardPartition.getShard()); long startTimeMs = System.currentTimeMillis(); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerPostProcessor.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerPostProcessor.java new file mode 100644 index 0000000..0b4b0ed --- /dev/null +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerPostProcessor.java @@ -0,0 +1,7 @@ +package com.transferwise.kafka.tkms.config; + +import java.util.function.Function; +import org.apache.kafka.clients.producer.Producer; + +public interface ITkmsKafkaProducerPostProcessor extends Function, Producer> { +} diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java index 3473305..dbddf13 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java @@ -1,13 +1,13 @@ package com.transferwise.kafka.tkms.config; import com.transferwise.kafka.tkms.api.TkmsShardPartition; -import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; public interface ITkmsKafkaProducerProvider { - KafkaProducer getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); + Producer getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); - KafkaProducer getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition); + Producer getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition); void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java index b9f8b61..987a80c 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java @@ -37,8 +37,6 @@ public IMeterCache twDefaultMeterCache(MeterRegistry meterRegistry) { return new MeterCache(meterRegistry); } - - @Bean @ConditionalOnMissingBean(ITransactionsHelper.class) public TransactionsHelper twTransactionsHelper() { diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java index 21c98c4..7072890 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java @@ -6,7 +6,9 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -16,6 +18,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -39,8 +42,12 @@ public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, Gr private Map, ProducerEntry> producers = new ConcurrentHashMap<>(); + @Autowired(required = false) + private List postProcessors = new ArrayList<>(); + + @Override - public KafkaProducer getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) { + public Producer getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) { return producers.computeIfAbsent(Pair.of(shardPartition, useCase), key -> { Map configs = new HashMap<>(); @@ -84,7 +91,7 @@ public KafkaProducer getKafkaProducer(TkmsShardPartition shardPa } } - final var producer = new KafkaProducer(configs); + final var producer = getKafkaProducer(configs); final var kafkaClientMetrics = new KafkaClientMetrics(producer); kafkaClientMetrics.bindTo(meterRegistry); @@ -92,8 +99,16 @@ public KafkaProducer getKafkaProducer(TkmsShardPartition shardPa }).getProducer(); } + private Producer getKafkaProducer(Map configs) { + Producer producer = new KafkaProducer<>(configs); + for (ITkmsKafkaProducerPostProcessor pp : this.postProcessors) { + producer = pp.apply(producer); + } + return producer; + } + @Override - public KafkaProducer getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition) { + public Producer getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition) { return getKafkaProducer(TkmsShardPartition.of(shardPartition.getShard(), 0), UseCase.TOPIC_VALIDATION); } @@ -139,7 +154,7 @@ public boolean canShutdown() { @Accessors(chain = true) protected static class ProducerEntry { - private KafkaProducer producer; + private Producer producer; private KafkaClientMetrics kafkaClientMetric; } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java new file mode 100644 index 0000000..d1fa8d5 --- /dev/null +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java @@ -0,0 +1,63 @@ +package com.transferwise.kafka.tkms; + +import static com.transferwise.kafka.tkms.test.TestKafkaProducerPostProcessor.TEST_MESSAGE; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; +import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender; +import com.transferwise.kafka.tkms.api.TkmsMessage; +import com.transferwise.kafka.tkms.test.BaseIntTest; +import com.transferwise.kafka.tkms.test.ITkmsSentMessagesCollector; +import com.transferwise.kafka.tkms.test.TestMessagesInterceptor; +import com.transferwise.kafka.tkms.test.TestProperties; +import java.nio.charset.StandardCharsets; +import java.util.stream.StreamSupport; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +class MessagePostProcessingTest extends BaseIntTest { + + @Autowired + private TestMessagesInterceptor testMessagesInterceptor; + @Autowired + private TransactionalKafkaMessageSender transactionalKafkaMessageSender; + @Autowired + private TestProperties testProperties; + @Autowired + private ITransactionsHelper transactionsHelper; + + @AfterEach + void cleanupTest() { + testMessagesInterceptor.setBeforeSendingToKafkaFunction(null); + } + + @Test + void messagesAreInstrumentedWithProducerPostProcessor() { + byte[] someValue = TEST_MESSAGE; + + String topic = testProperties.getTestTopic(); + + transactionsHelper.withTransaction().run(() -> + transactionalKafkaMessageSender.sendMessages(new ITransactionalKafkaMessageSender.SendMessagesRequest() + .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("1").setValue(someValue)) + .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("2").setValue(someValue)) + )); + + await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2); + var messages = tkmsSentMessagesCollector.getSentMessages(topic); + + assertEquals(2, messages.size()); + checkForHeader(messages.get(0), "wrapTest", "wrapped"); + checkForHeader(messages.get(1), "wrapTest", "wrapped"); + } + + private void checkForHeader(ITkmsSentMessagesCollector.SentMessage sentMessage, String key, String value) { + assertTrue( + StreamSupport.stream(sentMessage.getProducerRecord().headers().spliterator(), false) + .anyMatch(h -> h.key().equals(key) && value.equals(new String(h.value(), StandardCharsets.UTF_8))) + ); + } +} diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java index cdbf3b2..ca20b78 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java @@ -6,7 +6,9 @@ import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; import com.transferwise.kafka.tkms.test.BaseIntTest; import java.lang.reflect.Field; -import org.apache.kafka.clients.producer.KafkaProducer; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -16,13 +18,20 @@ class TkmsKafkaProducerProviderTestServer extends BaseIntTest { @Autowired private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider; + @Test void shardKafkaPropertiesAreApplied() throws Exception { - KafkaProducer kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(TkmsShardPartition.of(1, 0), UseCase.PROXY); + Producer kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(TkmsShardPartition.of(1, 0), UseCase.PROXY); + + InvocationHandler handler = Proxy.getInvocationHandler(kafkaProducer); + + Field originalProducerFiled = handler.getClass().getDeclaredField("producer"); + originalProducerFiled.setAccessible(true); + Object originalProducer = originalProducerFiled.get(handler); - Field producerConfigField = kafkaProducer.getClass().getDeclaredField("producerConfig"); + Field producerConfigField = originalProducer.getClass().getDeclaredField("producerConfig"); producerConfigField.setAccessible(true); - ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(kafkaProducer); + ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(originalProducer); assertThat(producerConfig.getLong("linger.ms")).isEqualTo(7L); } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java new file mode 100644 index 0000000..b101a5b --- /dev/null +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java @@ -0,0 +1,65 @@ +package com.transferwise.kafka.tkms.test; + +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.stereotype.Component; + +@Component +public class TestKafkaProducerPostProcessor implements ITkmsKafkaProducerPostProcessor { + + public static final byte[] TEST_MESSAGE = "Testing ProducerPostProcessing".getBytes(StandardCharsets.UTF_8); + + private ProxyInvocationHandler handler; + + @SuppressWarnings("unchecked") + @Override + public Producer apply(Producer producer) { + handler = new ProxyInvocationHandler(producer); + return (Producer) + Proxy.newProxyInstance( + TestKafkaProducerPostProcessor.class.getClassLoader(), + new Class[] {Producer.class}, + handler); + } + + private static class ProxyInvocationHandler implements InvocationHandler { + + private final Producer producer; + + public ProxyInvocationHandler(Producer producer) { + this.producer = producer; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if ("send".equals(method.getName()) + && method.getParameterCount() >= 1 + && method.getParameterTypes()[0] == ProducerRecord.class) { + ProducerRecord record = (ProducerRecord) args[0]; + if (Arrays.equals(TEST_MESSAGE, record.value())) { + record.headers().add("wrapTest", "wrapped".getBytes(StandardCharsets.UTF_8)); + } + Callback callback = + method.getParameterCount() >= 2 + && method.getParameterTypes()[1] == Callback.class + ? (Callback) args[1] + : null; + return producer.send(record, callback); + } else { + try { + return method.invoke(producer, args); + } catch (InvocationTargetException exception) { + throw exception.getCause(); + } + } + } + } +} \ No newline at end of file