Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add KafkaProducer post-processor to enable extensions from other libraries #90

Merged
merged 6 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.31.0] - 2024-09-09
### Added
- Added a postprocessor to the `TkmsKafkaProducerProvider` to allow features like tracing attached to the Kafka Producer.

## [0.30.1] - 2024-08-08
### Changed
- MeterFilter's applied by the library are no longer explicitly applied and are instead
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.30.1
version=0.31.0-SNAPSHOT-0
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.InterruptException;
Expand Down Expand Up @@ -166,7 +166,7 @@ private void poll(Control control, TkmsShardPartition shardPartition) {
}
}

private void poll0(Control control, TkmsShardPartition shardPartition, KafkaProducer<String, byte[]> kafkaProducer) {
private void poll0(Control control, TkmsShardPartition shardPartition, Producer<String, byte[]> kafkaProducer) {

int pollerBatchSize = properties.getPollerBatchSize(shardPartition.getShard());
long startTimeMs = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.transferwise.kafka.tkms.config;

import java.util.function.Function;
import org.apache.kafka.clients.producer.Producer;

public interface ITkmsKafkaProducerPostProcessor extends Function<Producer<String, byte[]>, Producer<String, byte[]>> {
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.transferwise.kafka.tkms.config;

import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;

public interface ITkmsKafkaProducerProvider {

KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);
Producer<String, byte[]> getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);

KafkaProducer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition);
Producer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition);

void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public IMeterCache twDefaultMeterCache(MeterRegistry meterRegistry) {
return new MeterCache(meterRegistry);
}



@Bean
@ConditionalOnMissingBean(ITransactionsHelper.class)
public TransactionsHelper twTransactionsHelper() {
Expand All @@ -51,4 +49,9 @@ public List<ITkmsMessageDecorator> messageDecorators() {
return Collections.emptyList();
}

@Bean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably would be better to declare the autowire usage as
@Autowired(required = false)
Then you don't need to explicitly provide an empty list bean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed that.

@ConditionalOnMissingBean(ITkmsKafkaProducerPostProcessor.class)
public List<ITkmsKafkaProducerPostProcessor> producerPostProcessors() {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -16,6 +18,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand All @@ -39,8 +42,12 @@ public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, Gr

private Map<Pair<TkmsShardPartition, UseCase>, ProducerEntry> producers = new ConcurrentHashMap<>();

@Autowired
private List<ITkmsKafkaProducerPostProcessor> postProcessors = new ArrayList<>();
normanma-tw marked this conversation as resolved.
Show resolved Hide resolved


@Override
public KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) {
public Producer<String, byte[]> getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) {
return producers.computeIfAbsent(Pair.of(shardPartition, useCase), key -> {
Map<String, Object> configs = new HashMap<>();

Expand Down Expand Up @@ -84,16 +91,24 @@ public KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition shardPa
}
}

final var producer = new KafkaProducer<String, byte[]>(configs);
final var producer = getKafkaProducer(configs);
final var kafkaClientMetrics = new KafkaClientMetrics(producer);
kafkaClientMetrics.bindTo(meterRegistry);

return new ProducerEntry().setProducer(producer).setKafkaClientMetric(kafkaClientMetrics);
}).getProducer();
}

private Producer<String, byte[]> getKafkaProducer(Map<String, Object> configs) {
Producer<String, byte[]> producer = new KafkaProducer<>(configs);
for (ITkmsKafkaProducerPostProcessor pp : this.postProcessors) {
producer = pp.apply(producer);
}
return producer;
}

@Override
public KafkaProducer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition) {
public Producer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition) {
return getKafkaProducer(TkmsShardPartition.of(shardPartition.getShard(), 0), UseCase.TOPIC_VALIDATION);
}

Expand Down Expand Up @@ -139,7 +154,7 @@ public boolean canShutdown() {
@Accessors(chain = true)
protected static class ProducerEntry {

private KafkaProducer<String, byte[]> producer;
private Producer<String, byte[]> producer;

private KafkaClientMetrics kafkaClientMetric;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.transferwise.kafka.tkms;

import static com.transferwise.kafka.tkms.test.TestKafkaProducerPostProcessor.TEST_MESSAGE;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.test.BaseIntTest;
import com.transferwise.kafka.tkms.test.ITkmsSentMessagesCollector;
import com.transferwise.kafka.tkms.test.TestMessagesInterceptor;
import com.transferwise.kafka.tkms.test.TestProperties;
import java.nio.charset.StandardCharsets;
import java.util.stream.StreamSupport;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

class MessagePostProcessingTest extends BaseIntTest {

@Autowired
private TestMessagesInterceptor testMessagesInterceptor;
@Autowired
private TransactionalKafkaMessageSender transactionalKafkaMessageSender;
@Autowired
private TestProperties testProperties;
@Autowired
private ITransactionsHelper transactionsHelper;

@AfterEach
void cleanupTest() {
testMessagesInterceptor.setBeforeSendingToKafkaFunction(null);
}

@Test
void messagesAreInstrumentedWithProducerPostProcessor() {
byte[] someValue = TEST_MESSAGE;

String topic = testProperties.getTestTopic();

transactionsHelper.withTransaction().run(() ->
transactionalKafkaMessageSender.sendMessages(new ITransactionalKafkaMessageSender.SendMessagesRequest()
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("1").setValue(someValue))
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("2").setValue(someValue))
));

await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2);
var messages = tkmsSentMessagesCollector.getSentMessages(topic);

assertEquals(2, messages.size());
checkForHeader(messages.get(0), "wrapTest", "wrapped");
checkForHeader(messages.get(1), "wrapTest", "wrapped");
}

private void checkForHeader(ITkmsSentMessagesCollector.SentMessage sentMessage, String key, String value) {
assertTrue(
StreamSupport.stream(sentMessage.getProducerRecord().headers().spliterator(), false)
.anyMatch(h -> h.key().equals(key) && value.equals(new String(h.value(), StandardCharsets.UTF_8)))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase;
import com.transferwise.kafka.tkms.test.BaseIntTest;
import java.lang.reflect.Field;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -16,13 +18,20 @@ class TkmsKafkaProducerProviderTestServer extends BaseIntTest {
@Autowired
private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider;


@Test
void shardKafkaPropertiesAreApplied() throws Exception {
KafkaProducer<String, byte[]> kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(TkmsShardPartition.of(1, 0), UseCase.PROXY);
Producer<String, byte[]> kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(TkmsShardPartition.of(1, 0), UseCase.PROXY);

InvocationHandler handler = Proxy.getInvocationHandler(kafkaProducer);

Field originalProducerFiled = handler.getClass().getDeclaredField("producer");
originalProducerFiled.setAccessible(true);
Object originalProducer = originalProducerFiled.get(handler);

Field producerConfigField = kafkaProducer.getClass().getDeclaredField("producerConfig");
Field producerConfigField = originalProducer.getClass().getDeclaredField("producerConfig");
producerConfigField.setAccessible(true);
ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(kafkaProducer);
ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(originalProducer);

assertThat(producerConfig.getLong("linger.ms")).isEqualTo(7L);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.transferwise.kafka.tkms.test;

import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Component;

@Component
public class TestKafkaProducerPostProcessor implements ITkmsKafkaProducerPostProcessor {

public static final byte[] TEST_MESSAGE = "Testing ProducerPostProcessing".getBytes(StandardCharsets.UTF_8);

private ProxyInvocationHandler handler;

@SuppressWarnings("unchecked")
@Override
public Producer<String, byte[]> apply(Producer<String, byte[]> producer) {
handler = new ProxyInvocationHandler(producer);
return (Producer<String, byte[]>)
Proxy.newProxyInstance(
TestKafkaProducerPostProcessor.class.getClassLoader(),
new Class<?>[] {Producer.class},
handler);
}

private static class ProxyInvocationHandler implements InvocationHandler {

private final Producer<String, byte[]> producer;

public ProxyInvocationHandler(Producer<String, byte[]> producer) {
this.producer = producer;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("send".equals(method.getName())
&& method.getParameterCount() >= 1
&& method.getParameterTypes()[0] == ProducerRecord.class) {
ProducerRecord<String, byte[]> record = (ProducerRecord<String, byte[]>) args[0];
if (Arrays.equals(TEST_MESSAGE, record.value())) {
record.headers().add("wrapTest", "wrapped".getBytes(StandardCharsets.UTF_8));
}
Callback callback =
method.getParameterCount() >= 2
&& method.getParameterTypes()[1] == Callback.class
? (Callback) args[1]
: null;
return producer.send(record, callback);
} else {
try {
return method.invoke(producer, args);
} catch (InvocationTargetException exception) {
throw exception.getCause();
}
}
}
}
}
Loading