Skip to content

Commit

Permalink
Fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
onukristo committed Dec 15, 2023
1 parent 86f3691 commit 82d94a4
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import com.transferwise.kafka.tkms.config.ITkmsDaoProvider;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase;
import com.transferwise.kafka.tkms.config.TkmsProperties;
import com.transferwise.kafka.tkms.config.TkmsProperties.DatabaseDialect;
import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationLevel;
Expand Down Expand Up @@ -72,7 +71,7 @@ public void afterPropertiesSet() {
environmentValidator.validate();

for (String topic : properties.getTopics()) {
validateTopic(TkmsShardPartition.of(properties.getDefaultShard(), 0), topic);
validateTopic(topic);
}

validateDeleteBatchSizes();
Expand Down Expand Up @@ -169,7 +168,7 @@ public SendMessagesResult sendMessages(SendMessagesRequest request) {

var topic = tkmsMessage.getTopic();
if (!validatedTopics.contains(topic)) {
validateTopic(shardPartition, topic);
validateTopic(topic);
validatedTopics.add(topic);
}

Expand Down Expand Up @@ -265,7 +264,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) {
validateMessageSize(message, 0);

var topic = message.getTopic();
validateTopic(shardPartition, topic);
validateTopic(topic);

if (deferMessageRegistrationUntilCommit) {
// Transaction is guaranteed to be active here.
Expand Down Expand Up @@ -375,8 +374,8 @@ public void afterCompletion(int status) {
/**
* Every call to normal `KafkaProducer.send()` uses metadata for a topic as well, so should be very fast.
*/
protected void validateTopic(TkmsShardPartition shardPartition, String topic) {
kafkaProducerProvider.getKafkaProducer(shardPartition, UseCase.TOPIC_VALIDATION).partitionsFor(topic);
protected void validateTopic(String topic) {
kafkaProducerProvider.getKafkaProducerForTopicValidation().partitionsFor(topic);
}

protected void validateMessages(SendMessagesRequest request) {
Expand All @@ -393,7 +392,7 @@ protected void validateMessage(TkmsMessage message, int messageIdx) {
Preconditions.checkArgument(message.getPartition() >= 0, "%s: Partition number can not be negative: %s", messageIdx, message.getPartition());
}
if (message.getKey() != null) {
Preconditions.checkArgument(message.getKey().length() > 0, "%s: Key can not be an empty string.", messageIdx);
Preconditions.checkArgument(!message.getKey().isEmpty(), "%s: Key can not be an empty string.", messageIdx);
}
if (message.getShard() != null) {
Preconditions.checkArgument(message.getShard() >= 0, "%s: Shard number can not be negative :%s", messageIdx, message.getShard());
Expand Down Expand Up @@ -463,7 +462,7 @@ private int utf8Length(CharSequence s) {
protected void fireMessageRegisteredEvent(TkmsShardPartition shardPartition, Long id, TkmsMessage message) {
var listeners = getTkmsEventsListeners();
if (log.isDebugEnabled()) {
log.debug("Message was registered for {} with storage id {}. Listeners count: ", shardPartition, id, listeners.size());
log.debug("Message was registered for {} with storage id {}. Listeners count: {}.", shardPartition, id, listeners.size());
}

if (tkmsEventsListeners.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ public interface ITkmsKafkaProducerProvider {

KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);

KafkaProducer<String, byte[]> getKafkaProducerForTopicValidation();

void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);

void closeKafkaProducerForTopicValidation();

enum UseCase {
PROXY,
TEST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ public KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition shardPa
configs.put(ProducerConfig.ACKS_CONFIG, "all");
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, "163840");
configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(tkmsProperties.getMaximumMessageBytes()));
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

// The following block is to guarantee the messages order.
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configs.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Please specify 'tw-tkms.kafka.bootstrap.servers'.");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000");
configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000");
Expand Down Expand Up @@ -69,6 +74,11 @@ public KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition shardPa
}).getProducer();
}

@Override
public KafkaProducer<String, byte[]> getKafkaProducerForTopicValidation() {
return getKafkaProducer(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), UseCase.TOPIC_VALIDATION);
}

@Override
public void closeKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) {
var producerEntry = producers.remove(Pair.of(shardPartition, useCase));
Expand All @@ -86,6 +96,11 @@ public void closeKafkaProducer(TkmsShardPartition shardPartition, UseCase useCas
}
}

@Override
public void closeKafkaProducerForTopicValidation() {
closeKafkaProducer(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), UseCase.TOPIC_VALIDATION);
}

@Override
public void applicationTerminating() {
producers.keySet().forEach(key -> closeKafkaProducer(key.getLeft(), key.getRight()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.transferwise.kafka.tkms.api.TkmsMessage.Header;
import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import com.transferwise.kafka.tkms.config.ITkmsDaoProvider;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider;
import com.transferwise.kafka.tkms.dao.FaultInjectedTkmsDao;
import com.transferwise.kafka.tkms.metrics.TkmsMetricsTemplate;
import com.transferwise.kafka.tkms.test.BaseIntTest;
Expand Down Expand Up @@ -65,6 +66,8 @@ abstract class EndToEndIntTest extends BaseIntTest {
private ITkmsDaoProvider tkmsDaoProvider;
@Autowired
private TkmsStorageToKafkaProxy tkmsStorageToKafkaProxy;
@Autowired
private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider;

private FaultInjectedTkmsDao faultInjectedTkmsDao;

Expand Down Expand Up @@ -168,7 +171,8 @@ void testExactlyOnceDelivery(int scenario) throws Exception {
setupConfig(deferUntilCommit);
tkmsProperties.setValidateSerialization(validateSerialization);

String message = "Hello World!";
// For producer to create more batches and spread messages around different partitions.
String message = StringUtils.repeat("Hello World!", 100);
int threadsCount = 20;
int batchesCount = 20;
int batchSize = 20;
Expand Down Expand Up @@ -462,11 +466,17 @@ void testThatMessagesOrderForAnEntityIsPreservedWithBatches(boolean deferUntilCo
@ValueSource(booleans = {false, true})
@SneakyThrows
void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean deferUntilCommit) {
setupConfig(deferUntilCommit);
try {
setupConfig(deferUntilCommit);

assertThatThrownBy(() -> transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender
.sendMessage(new TkmsMessage().setTopic("NotExistingTopic").setValue("Stuff".getBytes(StandardCharsets.UTF_8)))))
.hasMessageContaining("Topic NotExistingTopic not present in metadata");
assertThatThrownBy(() -> transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender
.sendMessage(new TkmsMessage().setTopic("NotExistingTopic").setValue("Stuff".getBytes(StandardCharsets.UTF_8)))))
.hasMessageContaining("Topic NotExistingTopic not present in metadata");
}

Check warning on line 475 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.1.6)

RightCurlySame

'}' at column 5 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).

Check warning on line 475 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(2.7.18)

RightCurlySame

'}' at column 5 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).

Check warning on line 475 in tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle Report-(3.0.13)

RightCurlySame

'}' at column 5 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
finally {
// Stop logs spam about not existing topic in metadata.
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
}
}

@ParameterizedTest
Expand Down

0 comments on commit 82d94a4

Please sign in to comment.