Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip with proxies #89

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.transferwise.kafka.tkms.api.ITkmsMessageInterceptors;
import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import com.transferwise.kafka.tkms.config.ITkmsDaoProvider;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase;
import com.transferwise.kafka.tkms.config.TkmsProperties;
Expand Down Expand Up @@ -43,6 +44,7 @@
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.InterruptException;
Expand Down Expand Up @@ -79,6 +81,8 @@ public class TkmsStorageToKafkaProxy implements GracefulShutdownStrategy, ITkmsS
@Autowired
private ITkmsMessageInterceptors messageIntereceptors;
@Autowired
private ITkmsKafkaProducerPostProcessor tkmsKafkaProducerPostProcessor;
@Autowired
private SharedReentrantLockBuilderFactory lockBuilderFactory;
@Autowired
private ITkmsInterrupterService tkmsInterrupterService;
Expand Down Expand Up @@ -166,7 +170,7 @@ private void poll(Control control, TkmsShardPartition shardPartition) {
}
}

private void poll0(Control control, TkmsShardPartition shardPartition, KafkaProducer<String, byte[]> kafkaProducer) {
private void poll0(Control control, TkmsShardPartition shardPartition, Producer<String, byte[]> kafkaProducer) {

int pollerBatchSize = properties.getPollerBatchSize(shardPartition.getShard());
long startTimeMs = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.transferwise.kafka.tkms.config;

import java.util.function.Function;
import org.apache.kafka.clients.producer.Producer;

public interface ITkmsKafkaProducerPostProcessor extends Function<Producer<String, byte[]>, Producer<String, byte[]>> {
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package com.transferwise.kafka.tkms.config;

import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;

public interface ITkmsKafkaProducerProvider {

KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);
Producer<String, byte[]> getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);

KafkaProducer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition);
Producer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition);

default void addPostProcessor(ITkmsKafkaProducerPostProcessor postProcessor) {
}

default void removePostProcessors() {
}

void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -16,10 +18,12 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;

@Slf4j
public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, GracefulShutdownStrategy {
Expand All @@ -39,8 +43,21 @@ public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, Gr

private Map<Pair<TkmsShardPartition, UseCase>, ProducerEntry> producers = new ConcurrentHashMap<>();

private List<ITkmsKafkaProducerPostProcessor> postProcessors = new ArrayList<>();

@Override
public void addPostProcessor(ITkmsKafkaProducerPostProcessor postProcessor) {
Assert.notNull(postProcessor, "'postProcessor' cannot be null");
this.postProcessors.add(postProcessor);
}

@Override
public void removePostProcessors() {
this.postProcessors.clear();
}

@Override
public KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) {
public Producer<String, byte[]> getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) {
return producers.computeIfAbsent(Pair.of(shardPartition, useCase), key -> {
Map<String, Object> configs = new HashMap<>();

Expand Down Expand Up @@ -84,16 +101,24 @@ public KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition shardPa
}
}

final var producer = new KafkaProducer<String, byte[]>(configs);
final var producer = getKafkaProducer(configs);
final var kafkaClientMetrics = new KafkaClientMetrics(producer);
kafkaClientMetrics.bindTo(meterRegistry);

return new ProducerEntry().setProducer(producer).setKafkaClientMetric(kafkaClientMetrics);
}).getProducer();
}

private Producer<String, byte[]> getKafkaProducer(Map<String, Object> configs) {
Producer<String, byte[]> producer = new KafkaProducer<>(configs);
for (ITkmsKafkaProducerPostProcessor pp : this.postProcessors) {
producer = pp.apply(producer);
}
return producer;
}

@Override
public KafkaProducer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition) {
public Producer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition) {
return getKafkaProducer(TkmsShardPartition.of(shardPartition.getShard(), 0), UseCase.TOPIC_VALIDATION);
}

Expand Down Expand Up @@ -139,7 +164,7 @@ public boolean canShutdown() {
@Accessors(chain = true)
protected static class ProducerEntry {

private KafkaProducer<String, byte[]> producer;
private Producer<String, byte[]> producer;

private KafkaClientMetrics kafkaClientMetric;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.transferwise.kafka.tkms;

import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.test.BaseIntTest;
import com.transferwise.kafka.tkms.test.ITkmsSentMessagesCollector;
import com.transferwise.kafka.tkms.test.TestProperties;
import java.nio.charset.StandardCharsets;
import java.util.stream.StreamSupport;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.springframework.beans.factory.annotation.Autowired;

import static com.transferwise.kafka.tkms.test.TestKafkaProducerPostProcessor.TEST_MESSAGE;

Check warning on line 17 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Import statement for 'com.transferwise.kafka.tkms.test.TestKafkaProducerPostProcessor.TEST_MESSAGE' is in the wrong order. Should be in the 'STATIC' group, expecting not assigned imports on this line.

Check warning on line 17 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Import statement for 'com.transferwise.kafka.tkms.test.TestKafkaProducerPostProcessor.TEST_MESSAGE' is in the wrong order. Should be in the 'STATIC' group, expecting not assigned imports on this line.
import static org.awaitility.Awaitility.await;

Check warning on line 18 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Import statement for 'org.awaitility.Awaitility.await' is in the wrong order. Should be in the 'STATIC' group, expecting not assigned imports on this line.

Check warning on line 18 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Import statement for 'org.awaitility.Awaitility.await' is in the wrong order. Should be in the 'STATIC' group, expecting not assigned imports on this line.
import static org.junit.jupiter.api.Assertions.assertEquals;

Check warning on line 19 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Import statement for 'org.junit.jupiter.api.Assertions.assertEquals' is in the wrong order. Should be in the 'STATIC' group, expecting not assigned imports on this line.

Check warning on line 19 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Import statement for 'org.junit.jupiter.api.Assertions.assertEquals' is in the wrong order. Should be in the 'STATIC' group, expecting not assigned imports on this line.
import static org.junit.jupiter.api.Assertions.assertTrue;

Check warning on line 20 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Import statement for 'org.junit.jupiter.api.Assertions.assertTrue' is in the wrong order. Should be in the 'STATIC' group, expecting not assigned imports on this line.

Check warning on line 20 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Import statement for 'org.junit.jupiter.api.Assertions.assertTrue' is in the wrong order. Should be in the 'STATIC' group, expecting not assigned imports on this line.

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MessagePostProcessingTest extends BaseIntTest {


@Autowired

Check warning on line 26 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'member def modifier' has incorrect indentation level 4, expected level should be 2.

Check warning on line 26 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'member def modifier' has incorrect indentation level 4, expected level should be 2.
protected ITkmsSentMessagesCollector tkmsSentMessagesCollector;
@Autowired

Check warning on line 28 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'member def modifier' has incorrect indentation level 4, expected level should be 2.

Check warning on line 28 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'member def modifier' has incorrect indentation level 4, expected level should be 2.
private TransactionalKafkaMessageSender transactionalKafkaMessageSender;
@Autowired

Check warning on line 30 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'member def modifier' has incorrect indentation level 4, expected level should be 2.

Check warning on line 30 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'member def modifier' has incorrect indentation level 4, expected level should be 2.
private TestProperties testProperties;
@Autowired

Check warning on line 32 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'member def modifier' has incorrect indentation level 4, expected level should be 2.

Check warning on line 32 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'member def modifier' has incorrect indentation level 4, expected level should be 2.
private ITransactionsHelper transactionsHelper;

@BeforeEach

Check warning on line 35 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def modifier' has incorrect indentation level 4, expected level should be 2.

Check warning on line 35 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def modifier' has incorrect indentation level 4, expected level should be 2.
void setupTest() {
tkmsSentMessagesCollector.clear();

Check warning on line 37 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 6, expected level should be 4.

Check warning on line 37 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 6, expected level should be 4.
}

Check warning on line 38 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def rcurly' has incorrect indentation level 4, expected level should be 2.

Check warning on line 38 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def rcurly' has incorrect indentation level 4, expected level should be 2.

@AfterEach

Check warning on line 40 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def modifier' has incorrect indentation level 4, expected level should be 2.

Check warning on line 40 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def modifier' has incorrect indentation level 4, expected level should be 2.
void cleanupTest() {
tkmsSentMessagesCollector.clear();

Check warning on line 42 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 6, expected level should be 4.

Check warning on line 42 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 6, expected level should be 4.
}

Check warning on line 43 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def rcurly' has incorrect indentation level 4, expected level should be 2.

Check warning on line 43 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def rcurly' has incorrect indentation level 4, expected level should be 2.

@Test

Check warning on line 45 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def modifier' has incorrect indentation level 4, expected level should be 2.

Check warning on line 45 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def modifier' has incorrect indentation level 4, expected level should be 2.
void messagesAreInstrumentedWithProducerPostProcessor() {
byte[] someValue = TEST_MESSAGE;

Check warning on line 47 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.

Check warning on line 47 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.

String topic = testProperties.getTestTopic();

Check warning on line 49 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.

Check warning on line 49 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.

transactionsHelper.withTransaction().run(() ->

Check warning on line 51 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.

Check warning on line 51 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.
transactionalKafkaMessageSender.sendMessages(new ITransactionalKafkaMessageSender.SendMessagesRequest()
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("1").setValue(someValue))
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("2").setValue(someValue))
));

await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2);

Check warning on line 57 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.

Check warning on line 57 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.
var messages = tkmsSentMessagesCollector.getSentMessages(topic);

Check warning on line 58 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.

Check warning on line 58 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.

assertEquals(2, messages.size());

Check warning on line 60 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.

Check warning on line 60 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.
checkForHeader(messages.get(0), "wrapTest", "wrapped");

Check warning on line 61 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.

Check warning on line 61 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.
checkForHeader(messages.get(1), "wrapTest", "wrapped");

Check warning on line 62 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.

Check warning on line 62 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.
}

Check warning on line 63 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def rcurly' has incorrect indentation level 4, expected level should be 2.

Check warning on line 63 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def rcurly' has incorrect indentation level 4, expected level should be 2.

private void checkForHeader(ITkmsSentMessagesCollector.SentMessage sentMessage, String key, String value) {

Check warning on line 65 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def modifier' has incorrect indentation level 4, expected level should be 2.

Check warning on line 65 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def modifier' has incorrect indentation level 4, expected level should be 2.
assertTrue(

Check warning on line 66 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.

Check warning on line 66 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def' child has incorrect indentation level 8, expected level should be 4.
StreamSupport.stream(sentMessage.getProducerRecord().headers().spliterator(), false)
.anyMatch(h -> h.key().equals(key) && value.equals(new String(h.value(), StandardCharsets.UTF_8)))
);

Check warning on line 69 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method call rparen' has incorrect indentation level 8, expected level should be 4.

Check warning on line 69 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method call rparen' has incorrect indentation level 8, expected level should be 4.
}

Check warning on line 70 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def rcurly' has incorrect indentation level 4, expected level should be 2.

Check warning on line 70 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagePostProcessingTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.IndentationCheck

'method def rcurly' has incorrect indentation level 4, expected level should be 2.
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,37 @@
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase;
import com.transferwise.kafka.tkms.test.BaseIntTest;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import com.transferwise.kafka.tkms.test.TestKafkaProducerPostProcessor;

Check warning on line 11 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'com.transferwise.kafka.tkms.test.TestKafkaProducerPostProcessor' import. Should be before 'java.lang.reflect.Proxy'.

Check warning on line 11 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'com.transferwise.kafka.tkms.test.TestKafkaProducerPostProcessor' import. Should be before 'java.lang.reflect.Proxy'.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;


class TkmsKafkaProducerProviderTestServer extends BaseIntTest {

@Autowired
private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider;

@Test

// @Test

Check warning on line 25 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.indentation.CommentsIndentationCheck

Comment has incorrect indentation level 0, expected is 2, indentation should be the same level as line 26.

Check warning on line 25 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.indentation.CommentsIndentationCheck

Comment has incorrect indentation level 0, expected is 2, indentation should be the same level as line 26.
void shardKafkaPropertiesAreApplied() throws Exception {
KafkaProducer<String, byte[]> kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(TkmsShardPartition.of(1, 0), UseCase.PROXY);
Producer<String, byte[]> kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(TkmsShardPartition.of(1, 0), UseCase.PROXY);

Field producerConfigField = kafkaProducer.getClass().getDeclaredField("producerConfig");
InvocationHandler handler = Proxy.getInvocationHandler(kafkaProducer);

Field originalProducerFiled = handler.getClass().getDeclaredField("producer");
originalProducerFiled.setAccessible(true);
Object originalProducer = originalProducerFiled.get(handler);
Field producerConfigField = originalProducer.getClass().getDeclaredField("producerConfig");
// Field producerConfigField = kafkaProducer.getClass().getDeclaredField("producerConfig");
producerConfigField.setAccessible(true);
ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(kafkaProducer);
;


assertThat(producerConfig.getLong("linger.ms")).isEqualTo(7L);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.transferwise.kafka.tkms.test;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor;

Check warning on line 9 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor' import. Should be before 'java.util.Arrays'.

Check warning on line 9 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor' import. Should be before 'java.util.Arrays'.
import com.transferwise.kafka.tkms.config.TkmsKafkaProducerProvider;

Check warning on line 10 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'com.transferwise.kafka.tkms.config.TkmsKafkaProducerProvider' import. Should be before 'java.util.Arrays'.

Check warning on line 10 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'com.transferwise.kafka.tkms.config.TkmsKafkaProducerProvider' import. Should be before 'java.util.Arrays'.
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.stereotype.Component;

@Component
//@TestConfiguration
public class TestKafkaProducerPostProcessor implements ITkmsKafkaProducerPostProcessor, InitializingBean {

public static final byte[] TEST_MESSAGE = "Testing ProducerPostProcessing".getBytes(StandardCharsets.UTF_8);;

Check warning on line 23 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.whitespace.EmptyLineSeparatorCheck

';' should be separated from previous line.

Check warning on line 23 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.2.2)

com.puppycrawl.tools.checkstyle.checks.coding.OneStatementPerLineCheck

Only one statement per line allowed.

Check warning on line 23 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.whitespace.EmptyLineSeparatorCheck

';' should be separated from previous line.

Check warning on line 23 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestKafkaProducerPostProcessor.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.3.1)

com.puppycrawl.tools.checkstyle.checks.coding.OneStatementPerLineCheck

Only one statement per line allowed.

private MyInvocationHandler handler;

@Autowired
TkmsKafkaProducerProvider tkmsKafkaProducerProvider;

@SuppressWarnings("unchecked")
@Override
public Producer<String, byte[]> apply(Producer<String, byte[]> producer) {
handler = new MyInvocationHandler(producer);
return (Producer<String, byte[]>)
Proxy.newProxyInstance(
TestKafkaProducerPostProcessor.class.getClassLoader(),
new Class<?>[] {Producer.class},
handler);
}

@Override
public void afterPropertiesSet() throws Exception {
tkmsKafkaProducerProvider.addPostProcessor(this);
}

private static class MyInvocationHandler implements InvocationHandler {

private final Producer<String, byte[]> producer;

public MyInvocationHandler(Producer<String, byte[]> producer) {
this.producer = producer;
}

public Producer<String, byte[]> getProducer() {
return producer;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("send".equals(method.getName())
&& method.getParameterCount() >= 1
&& method.getParameterTypes()[0] == ProducerRecord.class) {
ProducerRecord<String, byte[]> record = (ProducerRecord<String, byte[]>) args[0];
if (Arrays.equals(TEST_MESSAGE, record.value())) {
record.headers().add("wrapTest", "wrapped".getBytes());
}
Callback callback =
method.getParameterCount() >= 2
&& method.getParameterTypes()[1] == Callback.class
? (Callback) args[1]
: null;
return producer.send(record, callback);
} else {
try {
return method.invoke(producer, args);
} catch (InvocationTargetException exception) {
throw exception.getCause();
}
}
}
}
}
5 changes: 3 additions & 2 deletions tw-tkms-test-starter/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ dependencies {
annotationProcessor libraries.spotbugsAnnotations

compileOnly libraries.springBootConfigurationProcessor

implementation project(":tw-tkms-starter")

implementation libraries.commonsLang3
implementation libraries.jacksonDatabind

implementation libraries.kafkaClients
implementation libraries.springBootStarter
implementation libraries.springJdbc
implementation libraries.twBaseUtils

}
Loading