Skip to content

Commit

Permalink
add KafkaProducer post-processor to enable extensions from other libr…
Browse files Browse the repository at this point in the history
…aries (#90)

* add KafkaProducer post-processor

* refactor

* add autoconfiguration in case we don't have a PP for the Producer

* add snapshot version

* use @Autowired required false
  • Loading branch information
sszp authored Oct 14, 2024
1 parent 538d223 commit 0c6c711
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 18 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.31.1] - 2024-10-09
## [0.32.0] - 2024-10-11
### Added
- Added a postprocessor to the `TkmsKafkaProducerProvider` to allow features like tracing attached to the Kafka Producer.

## [0.31.1] - 2024-10-09
### Fixed
- Override the lombok generated `TkmsMessage#setHeaders` to copy the passed headers into a mutable `ArrayList` in order to avoid `UnsupportedOperationException`

## [0.31.0] - 2024-10-07

### Changed

Added two new methods to the `TkmsMessage` that allow to conveniently use standard uuid and priority headers:
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.31.1
version=0.32.0
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.InterruptException;
Expand Down Expand Up @@ -166,7 +166,7 @@ private void poll(Control control, TkmsShardPartition shardPartition) {
}
}

private void poll0(Control control, TkmsShardPartition shardPartition, KafkaProducer<String, byte[]> kafkaProducer) {
private void poll0(Control control, TkmsShardPartition shardPartition, Producer<String, byte[]> kafkaProducer) {

int pollerBatchSize = properties.getPollerBatchSize(shardPartition.getShard());
long startTimeMs = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.transferwise.kafka.tkms.config;

import java.util.function.Function;
import org.apache.kafka.clients.producer.Producer;

public interface ITkmsKafkaProducerPostProcessor extends Function<Producer<String, byte[]>, Producer<String, byte[]>> {
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.transferwise.kafka.tkms.config;

import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;

public interface ITkmsKafkaProducerProvider {

KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);
Producer<String, byte[]> getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);

KafkaProducer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition);
Producer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition);

void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public IMeterCache twDefaultMeterCache(MeterRegistry meterRegistry) {
return new MeterCache(meterRegistry);
}



@Bean
@ConditionalOnMissingBean(ITransactionsHelper.class)
public TransactionsHelper twTransactionsHelper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -16,6 +18,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand All @@ -39,8 +42,12 @@ public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, Gr

private Map<Pair<TkmsShardPartition, UseCase>, ProducerEntry> producers = new ConcurrentHashMap<>();

@Autowired(required = false)
private List<ITkmsKafkaProducerPostProcessor> postProcessors = new ArrayList<>();


@Override
public KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) {
public Producer<String, byte[]> getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) {
return producers.computeIfAbsent(Pair.of(shardPartition, useCase), key -> {
Map<String, Object> configs = new HashMap<>();

Expand Down Expand Up @@ -84,16 +91,24 @@ public KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition shardPa
}
}

final var producer = new KafkaProducer<String, byte[]>(configs);
final var producer = getKafkaProducer(configs);
final var kafkaClientMetrics = new KafkaClientMetrics(producer);
kafkaClientMetrics.bindTo(meterRegistry);

return new ProducerEntry().setProducer(producer).setKafkaClientMetric(kafkaClientMetrics);
}).getProducer();
}

private Producer<String, byte[]> getKafkaProducer(Map<String, Object> configs) {
Producer<String, byte[]> producer = new KafkaProducer<>(configs);
for (ITkmsKafkaProducerPostProcessor pp : this.postProcessors) {
producer = pp.apply(producer);
}
return producer;
}

@Override
public KafkaProducer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition) {
public Producer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition) {
return getKafkaProducer(TkmsShardPartition.of(shardPartition.getShard(), 0), UseCase.TOPIC_VALIDATION);
}

Expand Down Expand Up @@ -139,7 +154,7 @@ public boolean canShutdown() {
@Accessors(chain = true)
protected static class ProducerEntry {

private KafkaProducer<String, byte[]> producer;
private Producer<String, byte[]> producer;

private KafkaClientMetrics kafkaClientMetric;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.transferwise.kafka.tkms;

import static com.transferwise.kafka.tkms.test.TestKafkaProducerPostProcessor.TEST_MESSAGE;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.test.BaseIntTest;
import com.transferwise.kafka.tkms.test.ITkmsSentMessagesCollector;
import com.transferwise.kafka.tkms.test.TestMessagesInterceptor;
import com.transferwise.kafka.tkms.test.TestProperties;
import java.nio.charset.StandardCharsets;
import java.util.stream.StreamSupport;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

class MessagePostProcessingTest extends BaseIntTest {

@Autowired
private TestMessagesInterceptor testMessagesInterceptor;
@Autowired
private TransactionalKafkaMessageSender transactionalKafkaMessageSender;
@Autowired
private TestProperties testProperties;
@Autowired
private ITransactionsHelper transactionsHelper;

@AfterEach
void cleanupTest() {
testMessagesInterceptor.setBeforeSendingToKafkaFunction(null);
}

@Test
void messagesAreInstrumentedWithProducerPostProcessor() {
byte[] someValue = TEST_MESSAGE;

String topic = testProperties.getTestTopic();

transactionsHelper.withTransaction().run(() ->
transactionalKafkaMessageSender.sendMessages(new ITransactionalKafkaMessageSender.SendMessagesRequest()
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("1").setValue(someValue))
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("2").setValue(someValue))
));

await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2);
var messages = tkmsSentMessagesCollector.getSentMessages(topic);

assertEquals(2, messages.size());
checkForHeader(messages.get(0), "wrapTest", "wrapped");
checkForHeader(messages.get(1), "wrapTest", "wrapped");
}

private void checkForHeader(ITkmsSentMessagesCollector.SentMessage sentMessage, String key, String value) {
assertTrue(
StreamSupport.stream(sentMessage.getProducerRecord().headers().spliterator(), false)
.anyMatch(h -> h.key().equals(key) && value.equals(new String(h.value(), StandardCharsets.UTF_8)))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase;
import com.transferwise.kafka.tkms.test.BaseIntTest;
import java.lang.reflect.Field;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -16,13 +18,20 @@ class TkmsKafkaProducerProviderTestServer extends BaseIntTest {
@Autowired
private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider;


@Test
void shardKafkaPropertiesAreApplied() throws Exception {
KafkaProducer<String, byte[]> kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(TkmsShardPartition.of(1, 0), UseCase.PROXY);
Producer<String, byte[]> kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(TkmsShardPartition.of(1, 0), UseCase.PROXY);

InvocationHandler handler = Proxy.getInvocationHandler(kafkaProducer);

Field originalProducerFiled = handler.getClass().getDeclaredField("producer");
originalProducerFiled.setAccessible(true);
Object originalProducer = originalProducerFiled.get(handler);

Field producerConfigField = kafkaProducer.getClass().getDeclaredField("producerConfig");
Field producerConfigField = originalProducer.getClass().getDeclaredField("producerConfig");
producerConfigField.setAccessible(true);
ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(kafkaProducer);
ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(originalProducer);

assertThat(producerConfig.getLong("linger.ms")).isEqualTo(7L);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.transferwise.kafka.tkms.test;

import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Component;

@Component
public class TestKafkaProducerPostProcessor implements ITkmsKafkaProducerPostProcessor {

public static final byte[] TEST_MESSAGE = "Testing ProducerPostProcessing".getBytes(StandardCharsets.UTF_8);

private ProxyInvocationHandler handler;

@SuppressWarnings("unchecked")
@Override
public Producer<String, byte[]> apply(Producer<String, byte[]> producer) {
handler = new ProxyInvocationHandler(producer);
return (Producer<String, byte[]>)
Proxy.newProxyInstance(
TestKafkaProducerPostProcessor.class.getClassLoader(),
new Class<?>[] {Producer.class},
handler);
}

private static class ProxyInvocationHandler implements InvocationHandler {

private final Producer<String, byte[]> producer;

public ProxyInvocationHandler(Producer<String, byte[]> producer) {
this.producer = producer;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("send".equals(method.getName())
&& method.getParameterCount() >= 1
&& method.getParameterTypes()[0] == ProducerRecord.class) {
ProducerRecord<String, byte[]> record = (ProducerRecord<String, byte[]>) args[0];
if (Arrays.equals(TEST_MESSAGE, record.value())) {
record.headers().add("wrapTest", "wrapped".getBytes(StandardCharsets.UTF_8));
}
Callback callback =
method.getParameterCount() >= 2
&& method.getParameterTypes()[1] == Callback.class
? (Callback) args[1]
: null;
return producer.send(record, callback);
} else {
try {
return method.invoke(producer, args);
} catch (InvocationTargetException exception) {
throw exception.getCause();
}
}
}
}
}

0 comments on commit 0c6c711

Please sign in to comment.