Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
sszp committed Sep 9, 2024
1 parent e376f5d commit 17aca96
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 45 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.31.0] - 2024-09-09
### Added
- Added a postprocessor to the `TkmsKafkaProducerProvider` to allow features like tracing attached to the Kafka Producer.

## [0.30.1] - 2024-08-08
### Changed
- MeterFilter's applied by the library are no longer explicitly applied and are instead
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.30.1
version=0.31.0
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.transferwise.kafka.tkms.api.ITkmsMessageInterceptors;
import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import com.transferwise.kafka.tkms.config.ITkmsDaoProvider;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase;
import com.transferwise.kafka.tkms.config.TkmsProperties;
Expand All @@ -43,7 +42,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand Down Expand Up @@ -81,8 +79,6 @@ public class TkmsStorageToKafkaProxy implements GracefulShutdownStrategy, ITkmsS
@Autowired
private ITkmsMessageInterceptors messageIntereceptors;
@Autowired
private ITkmsKafkaProducerPostProcessor tkmsKafkaProducerPostProcessor;
@Autowired
private SharedReentrantLockBuilderFactory lockBuilderFactory;
@Autowired
private ITkmsInterrupterService tkmsInterrupterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@ public interface ITkmsKafkaProducerProvider {

Producer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition);

default void addPostProcessor(ITkmsKafkaProducerPostProcessor postProcessor) {
}

default void removePostProcessors() {
}

void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);

void closeKafkaProducerForTopicValidation(TkmsShardPartition tkmsShardPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;

@Slf4j
public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, GracefulShutdownStrategy {
Expand All @@ -43,18 +42,9 @@ public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, Gr

private Map<Pair<TkmsShardPartition, UseCase>, ProducerEntry> producers = new ConcurrentHashMap<>();

@Autowired
private List<ITkmsKafkaProducerPostProcessor> postProcessors = new ArrayList<>();

@Override
public void addPostProcessor(ITkmsKafkaProducerPostProcessor postProcessor) {
Assert.notNull(postProcessor, "'postProcessor' cannot be null");
this.postProcessors.add(postProcessor);
}

@Override
public void removePostProcessors() {
this.postProcessors.clear();
}

@Override
public Producer<String, byte[]> getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,28 @@
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.test.BaseIntTest;
import com.transferwise.kafka.tkms.test.ITkmsSentMessagesCollector;
import com.transferwise.kafka.tkms.test.TestMessagesInterceptor;
import com.transferwise.kafka.tkms.test.TestProperties;
import java.nio.charset.StandardCharsets;
import java.util.stream.StreamSupport;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.springframework.beans.factory.annotation.Autowired;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MessagePostProcessingTest extends BaseIntTest {

@Autowired
private TestMessagesInterceptor testMessagesInterceptor;
@Autowired
private TransactionalKafkaMessageSender transactionalKafkaMessageSender;

@Autowired
private TestProperties testProperties;

@Autowired
private ITransactionsHelper transactionsHelper;

@BeforeEach
void setupTest() {
tkmsSentMessagesCollector.clear();
}

@AfterEach
void cleanupTest() {
tkmsSentMessagesCollector.clear();
testMessagesInterceptor.setBeforeSendingToKafkaFunction(null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.transferwise.kafka.tkms.test;

import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerPostProcessor;
import com.transferwise.kafka.tkms.config.TkmsKafkaProducerProvider;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
Expand All @@ -11,20 +10,15 @@
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TestKafkaProducerPostProcessor implements ITkmsKafkaProducerPostProcessor, InitializingBean {
public class TestKafkaProducerPostProcessor implements ITkmsKafkaProducerPostProcessor {

public static final byte[] TEST_MESSAGE = "Testing ProducerPostProcessing".getBytes(StandardCharsets.UTF_8);

private ProxyInvocationHandler handler;

@Autowired
TkmsKafkaProducerProvider tkmsKafkaProducerProvider;

@SuppressWarnings("unchecked")
@Override
public Producer<String, byte[]> apply(Producer<String, byte[]> producer) {
Expand All @@ -36,11 +30,6 @@ public Producer<String, byte[]> apply(Producer<String, byte[]> producer) {
handler);
}

@Override
public void afterPropertiesSet() throws Exception {
tkmsKafkaProducerProvider.addPostProcessor(this);
}

private static class ProxyInvocationHandler implements InvocationHandler {

private final Producer<String, byte[]> producer;
Expand Down

0 comments on commit 17aca96

Please sign in to comment.