From e376f5d31a4b7e378d7372b425f6ee6ac6c1a195 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Sin=C3=B3ros-Szab=C3=B3?= Date: Mon, 2 Sep 2024 11:32:00 +0200 Subject: [PATCH 1/5] add KafkaProducer post-processor --- .../kafka/tkms/TkmsStorageToKafkaProxy.java | 6 +- .../ITkmsKafkaProducerPostProcessor.java | 7 ++ .../config/ITkmsKafkaProducerProvider.java | 12 ++- .../config/TkmsKafkaProducerProvider.java | 33 +++++++- .../kafka/tkms/MessagePostProcessingTest.java | 70 +++++++++++++++++ .../TkmsKafkaProducerProviderTestServer.java | 17 ++++- .../test/TestKafkaProducerPostProcessor.java | 76 +++++++++++++++++++ 7 files changed, 209 insertions(+), 12 deletions(-) create mode 100644 tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerPostProcessor.java create mode 100644 tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java create mode 100644 tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java index fb72a50..380197c 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java @@ -16,6 +16,7 @@ import com.transferwise.kafka.tkms.api.ITkmsMessageInterceptors; import com.transferwise.kafka.tkms.api.TkmsShardPartition; import com.transferwise.kafka.tkms.config.ITkmsDaoProvider; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor; import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; import com.transferwise.kafka.tkms.config.TkmsProperties; @@ -43,6 +44,7 @@ import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.InterruptException; @@ -79,6 +81,8 @@ public class TkmsStorageToKafkaProxy implements GracefulShutdownStrategy, ITkmsS @Autowired private ITkmsMessageInterceptors messageIntereceptors; @Autowired + private ITkmsKafkaProducerPostProcessor tkmsKafkaProducerPostProcessor; + @Autowired private SharedReentrantLockBuilderFactory lockBuilderFactory; @Autowired private ITkmsInterrupterService tkmsInterrupterService; @@ -166,7 +170,7 @@ private void poll(Control control, TkmsShardPartition shardPartition) { } } - private void poll0(Control control, TkmsShardPartition shardPartition, KafkaProducer kafkaProducer) { + private void poll0(Control control, TkmsShardPartition shardPartition, Producer kafkaProducer) { int pollerBatchSize = properties.getPollerBatchSize(shardPartition.getShard()); long startTimeMs = System.currentTimeMillis(); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerPostProcessor.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerPostProcessor.java new file mode 100644 index 0000000..0b4b0ed --- /dev/null +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerPostProcessor.java @@ -0,0 +1,7 @@ +package com.transferwise.kafka.tkms.config; + +import java.util.function.Function; +import org.apache.kafka.clients.producer.Producer; + +public interface ITkmsKafkaProducerPostProcessor extends Function, Producer> { +} diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java index 3473305..3402902 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java @@ -1,13 +1,19 @@ package com.transferwise.kafka.tkms.config; import com.transferwise.kafka.tkms.api.TkmsShardPartition; -import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; public interface ITkmsKafkaProducerProvider { - KafkaProducer getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); + Producer getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); - KafkaProducer getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition); + Producer getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition); + + default void addPostProcessor(ITkmsKafkaProducerPostProcessor postProcessor) { + } + + default void removePostProcessors() { + } void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java index 21c98c4..193e4a2 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java @@ -6,7 +6,9 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -16,10 +18,12 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.Assert; @Slf4j public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, GracefulShutdownStrategy { @@ -39,8 +43,21 @@ public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, Gr private Map, ProducerEntry> producers = new ConcurrentHashMap<>(); + private List postProcessors = new ArrayList<>(); + + @Override + public void addPostProcessor(ITkmsKafkaProducerPostProcessor postProcessor) { + Assert.notNull(postProcessor, "'postProcessor' cannot be null"); + this.postProcessors.add(postProcessor); + } + + @Override + public void removePostProcessors() { + this.postProcessors.clear(); + } + @Override - public KafkaProducer getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) { + public Producer getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) { return producers.computeIfAbsent(Pair.of(shardPartition, useCase), key -> { Map configs = new HashMap<>(); @@ -84,7 +101,7 @@ public KafkaProducer getKafkaProducer(TkmsShardPartition shardPa } } - final var producer = new KafkaProducer(configs); + final var producer = getKafkaProducer(configs); final var kafkaClientMetrics = new KafkaClientMetrics(producer); kafkaClientMetrics.bindTo(meterRegistry); @@ -92,8 +109,16 @@ public KafkaProducer getKafkaProducer(TkmsShardPartition shardPa }).getProducer(); } + private Producer getKafkaProducer(Map configs) { + Producer producer = new KafkaProducer<>(configs); + for (ITkmsKafkaProducerPostProcessor pp : this.postProcessors) { + producer = pp.apply(producer); + } + return producer; + } + @Override - public KafkaProducer getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition) { + public Producer getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition) { return getKafkaProducer(TkmsShardPartition.of(shardPartition.getShard(), 0), UseCase.TOPIC_VALIDATION); } @@ -139,7 +164,7 @@ public boolean canShutdown() { @Accessors(chain = true) protected static class ProducerEntry { - private KafkaProducer producer; + private Producer producer; private KafkaClientMetrics kafkaClientMetric; } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java new file mode 100644 index 0000000..756c7b2 --- /dev/null +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java @@ -0,0 +1,70 @@ +package com.transferwise.kafka.tkms; + +import static com.transferwise.kafka.tkms.test.TestKafkaProducerPostProcessor.TEST_MESSAGE; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; +import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender; +import com.transferwise.kafka.tkms.api.TkmsMessage; +import com.transferwise.kafka.tkms.test.BaseIntTest; +import com.transferwise.kafka.tkms.test.ITkmsSentMessagesCollector; +import com.transferwise.kafka.tkms.test.TestProperties; +import java.nio.charset.StandardCharsets; +import java.util.stream.StreamSupport; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.springframework.beans.factory.annotation.Autowired; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class MessagePostProcessingTest extends BaseIntTest { + + @Autowired + private TransactionalKafkaMessageSender transactionalKafkaMessageSender; + + @Autowired + private TestProperties testProperties; + + @Autowired + private ITransactionsHelper transactionsHelper; + + @BeforeEach + void setupTest() { + tkmsSentMessagesCollector.clear(); + } + + @AfterEach + void cleanupTest() { + tkmsSentMessagesCollector.clear(); + } + + @Test + void messagesAreInstrumentedWithProducerPostProcessor() { + byte[] someValue = TEST_MESSAGE; + + String topic = testProperties.getTestTopic(); + + transactionsHelper.withTransaction().run(() -> + transactionalKafkaMessageSender.sendMessages(new ITransactionalKafkaMessageSender.SendMessagesRequest() + .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("1").setValue(someValue)) + .addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("2").setValue(someValue)) + )); + + await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2); + var messages = tkmsSentMessagesCollector.getSentMessages(topic); + + assertEquals(2, messages.size()); + checkForHeader(messages.get(0), "wrapTest", "wrapped"); + checkForHeader(messages.get(1), "wrapTest", "wrapped"); + } + + private void checkForHeader(ITkmsSentMessagesCollector.SentMessage sentMessage, String key, String value) { + assertTrue( + StreamSupport.stream(sentMessage.getProducerRecord().headers().spliterator(), false) + .anyMatch(h -> h.key().equals(key) && value.equals(new String(h.value(), StandardCharsets.UTF_8))) + ); + } +} diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java index cdbf3b2..ca20b78 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java @@ -6,7 +6,9 @@ import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; import com.transferwise.kafka.tkms.test.BaseIntTest; import java.lang.reflect.Field; -import org.apache.kafka.clients.producer.KafkaProducer; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -16,13 +18,20 @@ class TkmsKafkaProducerProviderTestServer extends BaseIntTest { @Autowired private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider; + @Test void shardKafkaPropertiesAreApplied() throws Exception { - KafkaProducer kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(TkmsShardPartition.of(1, 0), UseCase.PROXY); + Producer kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(TkmsShardPartition.of(1, 0), UseCase.PROXY); + + InvocationHandler handler = Proxy.getInvocationHandler(kafkaProducer); + + Field originalProducerFiled = handler.getClass().getDeclaredField("producer"); + originalProducerFiled.setAccessible(true); + Object originalProducer = originalProducerFiled.get(handler); - Field producerConfigField = kafkaProducer.getClass().getDeclaredField("producerConfig"); + Field producerConfigField = originalProducer.getClass().getDeclaredField("producerConfig"); producerConfigField.setAccessible(true); - ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(kafkaProducer); + ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(originalProducer); assertThat(producerConfig.getLong("linger.ms")).isEqualTo(7L); } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java new file mode 100644 index 0000000..6288500 --- /dev/null +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java @@ -0,0 +1,76 @@ +package com.transferwise.kafka.tkms.test; + +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor; +import com.transferwise.kafka.tkms.config.TkmsKafkaProducerProvider; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class TestKafkaProducerPostProcessor implements ITkmsKafkaProducerPostProcessor, InitializingBean { + + public static final byte[] TEST_MESSAGE = "Testing ProducerPostProcessing".getBytes(StandardCharsets.UTF_8); + + private ProxyInvocationHandler handler; + + @Autowired + TkmsKafkaProducerProvider tkmsKafkaProducerProvider; + + @SuppressWarnings("unchecked") + @Override + public Producer apply(Producer producer) { + handler = new ProxyInvocationHandler(producer); + return (Producer) + Proxy.newProxyInstance( + TestKafkaProducerPostProcessor.class.getClassLoader(), + new Class[] {Producer.class}, + handler); + } + + @Override + public void afterPropertiesSet() throws Exception { + tkmsKafkaProducerProvider.addPostProcessor(this); + } + + private static class ProxyInvocationHandler implements InvocationHandler { + + private final Producer producer; + + public ProxyInvocationHandler(Producer producer) { + this.producer = producer; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if ("send".equals(method.getName()) + && method.getParameterCount() >= 1 + && method.getParameterTypes()[0] == ProducerRecord.class) { + ProducerRecord record = (ProducerRecord) args[0]; + if (Arrays.equals(TEST_MESSAGE, record.value())) { + record.headers().add("wrapTest", "wrapped".getBytes(StandardCharsets.UTF_8)); + } + Callback callback = + method.getParameterCount() >= 2 + && method.getParameterTypes()[1] == Callback.class + ? (Callback) args[1] + : null; + return producer.send(record, callback); + } else { + try { + return method.invoke(producer, args); + } catch (InvocationTargetException exception) { + throw exception.getCause(); + } + } + } + } +} \ No newline at end of file From 17aca9608ac34d16559ed59799cedfed049efbbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Sin=C3=B3ros-Szab=C3=B3?= Date: Mon, 9 Sep 2024 18:04:09 +0200 Subject: [PATCH 2/5] refactor --- CHANGELOG.md | 4 ++++ gradle.properties | 2 +- .../kafka/tkms/TkmsStorageToKafkaProxy.java | 4 ---- .../tkms/config/ITkmsKafkaProducerProvider.java | 6 ------ .../tkms/config/TkmsKafkaProducerProvider.java | 12 +----------- .../kafka/tkms/MessagePostProcessingTest.java | 15 ++++----------- .../tkms/test/TestKafkaProducerPostProcessor.java | 13 +------------ 7 files changed, 11 insertions(+), 45 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 43f1af6..883a53a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.31.0] - 2024-09-09 +### Added +- Added a postprocessor to the `TkmsKafkaProducerProvider` to allow features like tracing attached to the Kafka Producer. + ## [0.30.1] - 2024-08-08 ### Changed - MeterFilter's applied by the library are no longer explicitly applied and are instead diff --git a/gradle.properties b/gradle.properties index fe6a7f0..229d380 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.30.1 +version=0.31.0 diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java index 380197c..0621942 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java @@ -16,7 +16,6 @@ import com.transferwise.kafka.tkms.api.ITkmsMessageInterceptors; import com.transferwise.kafka.tkms.api.TkmsShardPartition; import com.transferwise.kafka.tkms.config.ITkmsDaoProvider; -import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor; import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; import com.transferwise.kafka.tkms.config.TkmsProperties; @@ -43,7 +42,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.mutable.MutableObject; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -81,8 +79,6 @@ public class TkmsStorageToKafkaProxy implements GracefulShutdownStrategy, ITkmsS @Autowired private ITkmsMessageInterceptors messageIntereceptors; @Autowired - private ITkmsKafkaProducerPostProcessor tkmsKafkaProducerPostProcessor; - @Autowired private SharedReentrantLockBuilderFactory lockBuilderFactory; @Autowired private ITkmsInterrupterService tkmsInterrupterService; diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java index 3402902..dbddf13 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java @@ -9,12 +9,6 @@ public interface ITkmsKafkaProducerProvider { Producer getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition); - default void addPostProcessor(ITkmsKafkaProducerPostProcessor postProcessor) { - } - - default void removePostProcessors() { - } - void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); void closeKafkaProducerForTopicValidation(TkmsShardPartition tkmsShardPartition); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java index 193e4a2..1137878 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.util.Assert; @Slf4j public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, GracefulShutdownStrategy { @@ -43,18 +42,9 @@ public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, Gr private Map, ProducerEntry> producers = new ConcurrentHashMap<>(); + @Autowired private List postProcessors = new ArrayList<>(); - @Override - public void addPostProcessor(ITkmsKafkaProducerPostProcessor postProcessor) { - Assert.notNull(postProcessor, "'postProcessor' cannot be null"); - this.postProcessors.add(postProcessor); - } - - @Override - public void removePostProcessors() { - this.postProcessors.clear(); - } @Override public Producer getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) { diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java index 756c7b2..d1fa8d5 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java @@ -10,35 +10,28 @@ import com.transferwise.kafka.tkms.api.TkmsMessage; import com.transferwise.kafka.tkms.test.BaseIntTest; import com.transferwise.kafka.tkms.test.ITkmsSentMessagesCollector; +import com.transferwise.kafka.tkms.test.TestMessagesInterceptor; import com.transferwise.kafka.tkms.test.TestProperties; import java.nio.charset.StandardCharsets; import java.util.stream.StreamSupport; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; import org.springframework.beans.factory.annotation.Autowired; -@TestInstance(TestInstance.Lifecycle.PER_CLASS) class MessagePostProcessingTest extends BaseIntTest { + @Autowired + private TestMessagesInterceptor testMessagesInterceptor; @Autowired private TransactionalKafkaMessageSender transactionalKafkaMessageSender; - @Autowired private TestProperties testProperties; - @Autowired private ITransactionsHelper transactionsHelper; - @BeforeEach - void setupTest() { - tkmsSentMessagesCollector.clear(); - } - @AfterEach void cleanupTest() { - tkmsSentMessagesCollector.clear(); + testMessagesInterceptor.setBeforeSendingToKafkaFunction(null); } @Test diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java index 6288500..b101a5b 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java @@ -1,7 +1,6 @@ package com.transferwise.kafka.tkms.test; import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor; -import com.transferwise.kafka.tkms.config.TkmsKafkaProducerProvider; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -11,20 +10,15 @@ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component -public class TestKafkaProducerPostProcessor implements ITkmsKafkaProducerPostProcessor, InitializingBean { +public class TestKafkaProducerPostProcessor implements ITkmsKafkaProducerPostProcessor { public static final byte[] TEST_MESSAGE = "Testing ProducerPostProcessing".getBytes(StandardCharsets.UTF_8); private ProxyInvocationHandler handler; - @Autowired - TkmsKafkaProducerProvider tkmsKafkaProducerProvider; - @SuppressWarnings("unchecked") @Override public Producer apply(Producer producer) { @@ -36,11 +30,6 @@ public Producer apply(Producer producer) { handler); } - @Override - public void afterPropertiesSet() throws Exception { - tkmsKafkaProducerProvider.addPostProcessor(this); - } - private static class ProxyInvocationHandler implements InvocationHandler { private final Producer producer; From 38c079fa1e5761524559c395bf3ec76fe7aadfff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Sin=C3=B3ros-Szab=C3=B3?= Date: Mon, 9 Sep 2024 18:12:48 +0200 Subject: [PATCH 3/5] add autoconfiguration in case we don't have a PP for the Producer --- .../kafka/tkms/config/TkmsAutoConfiguration.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java index b9f8b61..4aa822c 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java @@ -37,8 +37,6 @@ public IMeterCache twDefaultMeterCache(MeterRegistry meterRegistry) { return new MeterCache(meterRegistry); } - - @Bean @ConditionalOnMissingBean(ITransactionsHelper.class) public TransactionsHelper twTransactionsHelper() { @@ -51,4 +49,9 @@ public List messageDecorators() { return Collections.emptyList(); } + @Bean + @ConditionalOnMissingBean(ITkmsKafkaProducerPostProcessor.class) + public List producerPostProcessors() { + return Collections.emptyList(); + } } From 0c899300748ea3fa19d39ba8af47d4d8e9c5433e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Sin=C3=B3ros-Szab=C3=B3?= Date: Tue, 1 Oct 2024 15:11:48 +0200 Subject: [PATCH 4/5] add snapshot version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 229d380..79df6de 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.31.0 +version=0.31.0-SNAPSHOT-0 From b49bff73d81f411da78ea5ff33f4cc780a649df7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Sin=C3=B3ros-Szab=C3=B3?= Date: Fri, 11 Oct 2024 15:57:35 +0200 Subject: [PATCH 5/5] use @Autowired required false --- .../kafka/tkms/config/TkmsAutoConfiguration.java | 5 ----- .../kafka/tkms/config/TkmsKafkaProducerProvider.java | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java index 4aa822c..987a80c 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsAutoConfiguration.java @@ -49,9 +49,4 @@ public List messageDecorators() { return Collections.emptyList(); } - @Bean - @ConditionalOnMissingBean(ITkmsKafkaProducerPostProcessor.class) - public List producerPostProcessors() { - return Collections.emptyList(); - } } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java index 1137878..7072890 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java @@ -42,7 +42,7 @@ public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, Gr private Map, ProducerEntry> producers = new ConcurrentHashMap<>(); - @Autowired + @Autowired(required = false) private List postProcessors = new ArrayList<>();