From e4ececcf088be499d2074d78bfa8b7220fa93881 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Thu, 10 Aug 2023 14:17:28 +0300 Subject: [PATCH] Avoiding and softening issues around corrupted messages. (#74) * Avoiding and softening issues around corrupted messages. --- CHANGELOG.md | 21 ++- build.common.gradle | 3 + build.gradle | 1 + build.library.gradle | 1 + gradle.properties | 2 +- tw-tkms-starter/build.gradle | 120 +++++++++++++++++- .../kafka/tkms/TkmsStorageToKafkaProxy.java | 106 +++++++++------- .../tkms/TransactionalKafkaMessageSender.java | 41 ++++-- .../kafka/tkms/config/TkmsProperties.java | 26 ++++ .../transferwise/kafka/tkms/dao/TkmsDao.java | 56 +++++--- .../kafka/tkms/dao/TkmsMessageSerializer.java | 41 ++++-- .../kafka/tkms/EndToEndIntTest.java | 9 +- 12 files changed, 336 insertions(+), 91 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d25a82e..efb53d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.25.0] - 2023-08-09 + +### Added + +* Message id into error logs. +* Message id into MDC. +* `validateSerialization` option as a guardrail for corrupted gzip inflation, due to zlib bugs etc. + +### Changed + +* `protobuf-java` will be shaded to avoid incompatibility issues in services. + Articles point out that it is recommended to use the same `protobuf-java` version which was used to generate java stubs (in our case StoredMessage). + Even when historically `protobuf-java` has had good backward compatibility then it is not guaranteed. And forward compatibility had been pretty bad + in our experience. + ## [0.24.3] - 2023-08-01 ### Added @@ -37,9 +52,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 In case there is a rare long-running transaction and its messages need to get sent out. * An option to defer insertion of messages into the database to the very end of a transaction, just before commit - - `deferMessageRegistrationUntilCommit` - This would generate fairly recent ids for the messages and earliest visible messages system has less chance to not see and thus skip those. - With that, the earliest visible message system can configure a fairly small look-back window and reduce CPU consumption even further. + - `deferMessageRegistrationUntilCommit` + This would generate fairly recent ids for the messages and earliest visible messages system has less chance to not see and thus skip those. + With that, the earliest visible message system can configure a fairly small look-back window and reduce CPU consumption even further. It can also help to reduce total transaction latency, as all individual messages collected are sent out in batches. diff --git a/build.common.gradle b/build.common.gradle index 723c44c..1b16f2f 100644 --- a/build.common.gradle +++ b/build.common.gradle @@ -45,6 +45,9 @@ configurations { testAnnotationProcessor { extendsFrom(local) } + shadow { + extendsFrom(local) + } all { resolutionStrategy { diff --git a/build.gradle b/build.gradle index bfa4800..3ffb09c 100644 --- a/build.gradle +++ b/build.gradle @@ -14,6 +14,7 @@ plugins { id "at.zierler.yamlvalidator" version "1.5.0" id 'org.ajoberstar.grgit' version '5.2.0' id 'io.github.gradle-nexus.publish-plugin' version "1.1.0" + id 'com.github.johnrengelman.shadow' version '8.1.1' apply false } idea.project { diff --git a/build.library.gradle b/build.library.gradle index d165f76..86b5269 100644 --- a/build.library.gradle +++ b/build.library.gradle @@ -76,4 +76,5 @@ publishing { } } } + } diff --git a/gradle.properties b/gradle.properties index 8a29913..37a3805 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.24.3 +version=0.25.0 \ No newline at end of file diff --git a/tw-tkms-starter/build.gradle b/tw-tkms-starter/build.gradle index f659d92..3baaa61 100644 --- a/tw-tkms-starter/build.gradle +++ b/tw-tkms-starter/build.gradle @@ -3,14 +3,19 @@ plugins { id 'idea' id "com.google.protobuf" id "docker-compose" + id 'com.github.johnrengelman.shadow' + id 'maven-publish' + id 'signing' } +ext.projectGitHubRepoName = "tw-tkms" +ext.projectScmUrl = "https://github.com/transferwise/${projectGitHubRepoName}" +ext.projectScmConnection = "scm:git:git://github.com/transferwise/${projectGitHubRepoName}.git" ext.projectName = "tw-tkms-starter" ext.projectDescription = "tw-tkms-starter" ext.projectArtifactName = "tw-tkms-starter" apply from: "$rootProject.rootDir/build.common.gradle" -apply from: "$rootProject.rootDir/build.library.gradle" dependencies { annotationProcessor libraries.springBootConfigurationProcessor @@ -99,3 +104,116 @@ test { dockerCompose.exposeAsEnvironment(test) } } + +/* + Protobuf version in a service may not be compatible with our generated `StoredMessage`. + It is safer and better to shadow the version we used. + */ +shadowJar { + dependencies { + dependencies { + exclude(dependency { + it.moduleName != 'protobuf-java' + }) + } + } + manifest { + attributes 'Implementation-Version': "$project.version" + } + relocate('com.google.protobuf', 'com.transferwise.kafka.tkms.shadow.com.google.protobuf') + + // Minimize does not reduce the jar much (1.9->1.5 MB), so let's not risk/mess with that. + /* + minimize {} + */ +} + +jar.enabled = false +jar.dependsOn shadowJar + +shadowJar { + archiveClassifier.set('') +} + +publishing { + publications { + entrypoints(MavenPublication) { publication -> + artifactId projectArtifactName + + artifacts = [shadowJar, javadocJar, sourcesJar] + /* + This ensures that libraries will have explicit dependency versions in their Maven POM and Gradle module files, so that there would be less + ambiguity and less chances of dependency conflicts. + */ + versionMapping { + usage('java-api') { + fromResolutionOf('runtimeClasspath') + } + usage('java-runtime') { + fromResolutionOf('runtimeClasspath') + } + } + + pom { + name = projectName + description = projectDescription + url = projectScmUrl + packaging = "jar" + licenses { + license { + name = 'The Apache License, Version 2.0, Copyright 2021 TransferWise Ltd' + url = 'http://www.apache.org/licenses/LICENSE-2.0.txt' + } + } + developers { + developer { + id = 'onukristo' + name = 'Kristo Kuusküll' + email = "kristo.kuuskull@transferwise.com" + organization = "Transferwise Ltd" + organizationUrl = "https://github.com/transferwise" + } + } + scm { + connection = projectScmConnection + developerConnection = projectScmConnection + url = projectScmUrl + } + withXml { xml -> + def dependenciesNode = xml.asNode().get('dependencies') ?: xml.asNode().appendNode('dependencies') + + project.configurations.getByName("runtimeClasspath").resolvedConfiguration.firstLevelModuleDependencies.forEach { + if (it.configuration != "platform-runtime" && it.moduleName != 'protobuf-java') { + def dependencyNode = dependenciesNode.appendNode('dependency') + dependencyNode.appendNode('groupId', it.moduleGroup) + dependencyNode.appendNode('artifactId', it.moduleName) + dependencyNode.appendNode('version', it.moduleVersion) + dependencyNode.appendNode('scope', 'runtime') + } + } + + if (!asNode().dependencyManagement.isEmpty()) { + throw new IllegalStateException("There should not be any `dependencyManagement` block in POM.") + } + } + } + } + } + + if (System.getenv("OSS_SIGNING_KEY")) { + signing { + useInMemoryPgpKeys(System.getenv("OSS_SIGNING_KEY"), System.getenv("OSS_SIGNING_PASSWORD")) + sign publishing.publications.entrypoints + } + } + + repositories { + maven { + url System.getenv("MAVEN_URL") + credentials { + username = System.getenv("MAVEN_USER") + password = System.getenv("MAVEN_PASSWORD") + } + } + } +} \ No newline at end of file diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java index 93c4857..677f979 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java @@ -46,6 +46,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; +import org.slf4j.MDC; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; @@ -223,7 +224,7 @@ private void poll(Control control, TkmsShardPartition shardPartition) { // Essentially forces polling of all records earliestMessageIdToUse = -1L; - log.info("Polling all messages for {}, to make sure we are not missing some created by long running transactions.", + log.info("Polling all messages for '{}', to make sure we are not missing some created by long running transactions.", shardPartition); lastPollAllTimeMs.setValue(System.currentTimeMillis()); @@ -271,57 +272,65 @@ private void poll(Control control, TkmsShardPartition shardPartition) { for (int i = 0; i < records.size(); i++) { MessageRecord messageRecord = records.get(i); - ProducerRecord preCreatedProducerRecord = producerRecordMap.get(i); - ProducerRecord producerRecord = - preCreatedProducerRecord == null ? toProducerRecord(messageRecord) : preCreatedProducerRecord; - contexts[i] = new MessageProcessingContext().setProducerRecord(producerRecord).setMessageRecord(messageRecord) - .setShardPartition(shardPartition); - MessageProcessingContext context = contexts[i]; - MessageInterceptionDecision interceptionDecision = interceptionDecisions == null ? null : interceptionDecisions.get(i); - if (interceptionDecision != null) { - if (interceptionDecision == MessageInterceptionDecision.DISCARD) { - log.warn("Discarding message {}:{}.", shardPartition, messageRecord.getId()); - context.setAcked(true); - continue; - } else if (interceptionDecision == MessageInterceptionDecision.RETRY) { - // In this context retry means - allowing interceptors to try to execute their logic again. - continue; + MDC.put(properties.getMdc().getMessageIdKey(), String.valueOf(messageRecord.getId())); + try { + ProducerRecord preCreatedProducerRecord = producerRecordMap.get(i); + ProducerRecord producerRecord = + preCreatedProducerRecord == null ? toProducerRecord(messageRecord) : preCreatedProducerRecord; + contexts[i] = new MessageProcessingContext().setProducerRecord(producerRecord).setMessageRecord(messageRecord) + .setShardPartition(shardPartition); + MessageProcessingContext context = contexts[i]; + + MessageInterceptionDecision interceptionDecision = interceptionDecisions == null ? null : interceptionDecisions.get(i); + if (interceptionDecision != null) { + if (interceptionDecision == MessageInterceptionDecision.DISCARD) { + log.warn("Discarding message {}:{}.", shardPartition, messageRecord.getId()); + context.setAcked(true); + continue; + } else if (interceptionDecision == MessageInterceptionDecision.RETRY) { + // In this context retry means - allowing interceptors to try to execute their logic again. + continue; + } } - } - try { - // Theoretically, to be absolutely sure, about the ordering, we would need to wait for the future result immediately. - // But it would not be practical. I mean we could send one message from each partitions concurrently, but - // there is a high chance that all the messages in this thread would reside in the same transaction, so it would not work. - // TODO: Consider transactions. They would need heavy performance testing though. - Future future = kafkaProducer.send(producerRecord, (metadata, exception) -> { - try { - shardPartition.putIntoMdc(); - - if (exception == null) { - context.setAcked(true); - fireMessageAcknowledgedEvent(shardPartition, messageRecord.getId(), producerRecord); - Instant insertTime = messageRecord.getMessage().hasInsertTimestamp() - ? Instant.ofEpochMilli(messageRecord.getMessage().getInsertTimestamp().getValue()) : null; - metricsTemplate.recordProxyMessageSendSuccess(shardPartition, producerRecord.topic(), insertTime); - } else { - failedSendsCount.incrementAndGet(); - handleKafkaError(shardPartition, "Sending message " + messageRecord.getId() + " in " + shardPartition + " failed.", - exception, - context); - metricsTemplate.recordProxyMessageSendFailure(shardPartition, producerRecord.topic()); + try { + // Theoretically, to be absolutely sure, about the ordering, we would need to wait for the future result immediately. + // But it would not be practical. I mean we could send one message from each partitions concurrently, but + // there is a high chance that all the messages in this thread would reside in the same transaction, so it would not work. + // TODO: Consider transactions. They would need heavy performance testing though. + Future future = kafkaProducer.send(producerRecord, (metadata, exception) -> { + MDC.put(properties.getMdc().getMessageIdKey(), String.valueOf(messageRecord.getId())); + try { + shardPartition.putIntoMdc(); + + if (exception == null) { + context.setAcked(true); + fireMessageAcknowledgedEvent(shardPartition, messageRecord.getId(), producerRecord); + Instant insertTime = messageRecord.getMessage().hasInsertTimestamp() + ? Instant.ofEpochMilli(messageRecord.getMessage().getInsertTimestamp().getValue()) : null; + metricsTemplate.recordProxyMessageSendSuccess(shardPartition, producerRecord.topic(), insertTime); + } else { + failedSendsCount.incrementAndGet(); + handleKafkaError(shardPartition, "Sending message " + messageRecord.getId() + " in " + shardPartition + " failed.", + exception, + context); + metricsTemplate.recordProxyMessageSendFailure(shardPartition, producerRecord.topic()); + } + } finally { + shardPartition.removeFromMdc(); + MDC.remove(properties.getMdc().getMessageIdKey()); } - } finally { - shardPartition.removeFromMdc(); - } - }); - atLeastOneSendDone = true; + }); + atLeastOneSendDone = true; - contexts[i].setKafkaSenderFuture(future); - } catch (Throwable t) { - failedSendsCount.incrementAndGet(); - handleKafkaError(shardPartition, "Sending message " + messageRecord.getId() + " in " + shardPartition + " failed.", t, context); + contexts[i].setKafkaSenderFuture(future); + } catch (Throwable t) { + failedSendsCount.incrementAndGet(); + handleKafkaError(shardPartition, "Sending message " + messageRecord.getId() + " in " + shardPartition + " failed.", t, context); + } + } finally { + MDC.remove(properties.getMdc().getMessageIdKey()); } } @@ -335,7 +344,8 @@ private void poll(Control control, TkmsShardPartition shardPartition) { try { context.getKafkaSenderFuture().get(); } catch (Throwable t) { - handleKafkaError(shardPartition, "Sending message in " + shardPartition + " failed.", t, context); + handleKafkaError(shardPartition, "Sending message " + context.getMessageRecord().getId() + " in " + shardPartition + " failed.", + t, context); } } } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java index 006d671..1e16e30 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java @@ -26,10 +26,10 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; -import org.springframework.context.annotation.Lazy; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationAdapter; import org.springframework.transaction.support.TransactionSynchronizationManager; @@ -215,14 +215,19 @@ public SendMessagesResult sendMessages(SendMessagesRequest request) { var tkmsMessageWithSequence = tkmsMessageWithSequences.get(i); var insertMessageResult = insertMessageResults.get(i); - fireMessageRegisteredEvent(shardPartition, insertMessageResult.getStorageId(), tkmsMessageWithSequence.getTkmsMessage()); + MDC.put(properties.getMdc().getMessageIdKey(), String.valueOf(insertMessageResult.getStorageId())); + try { + fireMessageRegisteredEvent(shardPartition, insertMessageResult.getStorageId(), tkmsMessageWithSequence.getTkmsMessage()); - metricsTemplate.recordMessageRegistering(tkmsMessageWithSequence.getTkmsMessage().getTopic(), shardPartition, false); + metricsTemplate.recordMessageRegistering(tkmsMessageWithSequence.getTkmsMessage().getTopic(), shardPartition, false); - responses[insertMessageResult.getSequence()] = - new SendMessageResult().setStorageId(insertMessageResult.getStorageId()).setShardPartition(shardPartition); + responses[insertMessageResult.getSequence()] = + new SendMessageResult().setStorageId(insertMessageResult.getStorageId()).setShardPartition(shardPartition); - transactionContext.countMessage(); + transactionContext.countMessage(); + } finally { + MDC.remove(properties.getMdc().getMessageIdKey()); + } } } } finally { @@ -284,10 +289,16 @@ public SendMessageResult sendMessage(SendMessageRequest request) { var tkmsDao = tkmsDaoProvider.getTkmsDao(shardPartition.getShard()); var insertMessageResult = tkmsDao.insertMessage(shardPartition, message); - fireMessageRegisteredEvent(shardPartition, insertMessageResult.getStorageId(), message); - metricsTemplate.recordMessageRegistering(topic, insertMessageResult.getShardPartition(), false); - transactionContext.countMessage(); - return new SendMessageResult().setStorageId(insertMessageResult.getStorageId()).setShardPartition(shardPartition); + + MDC.put(properties.getMdc().getMessageIdKey(), String.valueOf(insertMessageResult.getStorageId())); + try { + fireMessageRegisteredEvent(shardPartition, insertMessageResult.getStorageId(), message); + metricsTemplate.recordMessageRegistering(topic, insertMessageResult.getShardPartition(), false); + transactionContext.countMessage(); + return new SendMessageResult().setStorageId(insertMessageResult.getStorageId()).setShardPartition(shardPartition); + } finally { + MDC.remove(properties.getMdc().getMessageIdKey()); + } }); } } finally { @@ -337,10 +348,14 @@ public void beforeCommit(boolean readOnly) { for (int i = 0; i < messagesWithSequences.size(); i++) { var tkmsMessageWithSequence = messagesWithSequences.get(i); var insertMessageResult = insertMessageResults.get(i); + MDC.put(properties.getMdc().getMessageIdKey(), String.valueOf(insertMessageResult.getStorageId())); + try { + fireMessageRegisteredEvent(shardPartition, insertMessageResult.getStorageId(), tkmsMessageWithSequence.getTkmsMessage()); - fireMessageRegisteredEvent(shardPartition, insertMessageResult.getStorageId(), tkmsMessageWithSequence.getTkmsMessage()); - - metricsTemplate.recordMessageRegistering(tkmsMessageWithSequence.getTkmsMessage().getTopic(), shardPartition, true); + metricsTemplate.recordMessageRegistering(tkmsMessageWithSequence.getTkmsMessage().getTopic(), shardPartition, true); + } finally { + MDC.remove(properties.getMdc().getMessageIdKey()); + } } } } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java index a1940c2..b4f0f85 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java @@ -183,6 +183,18 @@ public void afterPropertiesSet() { @jakarta.validation.constraints.NotNull private Duration proxyStopTimeout = Duration.ofSeconds(15); + /** + * Tries to deserialize a message before writing it into database. + * + *

We have had a case where JDK's GZIP implementation generated a corrupted output. This in turn stopped the messages proxying, because next + * message could not be deserialized. + * + *

Enabling this option would allow to detect that situation early on and isolate the fault to few messages only. + * + *

It has a small CPU and memory allocation hit per every message. + */ + private boolean validateSerialization = false; + /** * List topics used by the lib. * @@ -268,6 +280,11 @@ public static class Mdc { @ResolvedValue @LegacyResolvedValue private String partitionKey = "tkmsPartition"; + @NotBlank + @jakarta.validation.constraints.NotBlank + @ResolvedValue + @LegacyResolvedValue + private String messageIdKey = "tkmsMessageId"; } @Data @@ -286,6 +303,7 @@ public static class ShardProperties { private Duration proxyStopTimeout; private boolean compressionOverridden; private Boolean deferRegisteredMessagesUntilCommit; + private Boolean validateSerialization; @Valid @jakarta.validation.Valid private Compression compression = new Compression(); @@ -300,6 +318,14 @@ public static class ShardProperties { private Map kafka = new HashMap<>(); } + public boolean isValidateSerialization(int shard) { + var shardProperties = shards.get(shard); + if (shardProperties != null && shardProperties.getValidateSerialization() != null) { + return shardProperties.getValidateSerialization(); + } + return validateSerialization; + } + public Duration getProxyStopTimeout(int shard) { var shardProperties = shards.get(shard); if (shardProperties != null && shardProperties.getProxyStopTimeout() != null) { diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/dao/TkmsDao.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/dao/TkmsDao.java index d302422..121f31a 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/dao/TkmsDao.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/dao/TkmsDao.java @@ -28,6 +28,7 @@ import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.MDC; import org.springframework.beans.factory.InitializingBean; import org.springframework.dao.DataIntegrityViolationException; import org.springframework.jdbc.core.JdbcTemplate; @@ -106,12 +107,16 @@ public List insertMessages(TkmsShardPartition shardPartitio try { var sql = insertMessageSqls.computeIfAbsent(shardPartition, this::getInsertSql); var ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); + var closeableStreams = new ArrayList(); try { var batchSize = Math.min(properties.getInsertBatchSize(shardPartition.getShard()), tkmsMessages.size() - idx.intValue()); for (int i = 0; i < batchSize; i++) { TkmsMessageWithSequence tkmsMessageWithSequence = tkmsMessages.get(idx.intValue() + i); - ps.setBinaryStream(1, serializeMessage(shardPartition, tkmsMessageWithSequence.getTkmsMessage())); + var serializedMessageStream = serializeMessage(shardPartition, tkmsMessageWithSequence.getTkmsMessage()); + + closeableStreams.add(serializedMessageStream); + ps.setBinaryStream(1, serializedMessageStream); ps.addBatch(); @@ -141,6 +146,13 @@ public List insertMessages(TkmsShardPartition shardPartitio idx.add(batchSize); } finally { ps.close(); + for (var is : closeableStreams) { + try { + is.close(); + } catch (Throwable t) { + log.error("Failed to close input stream.", t); + } + } } } finally { DataSourceUtils.releaseConnection(con, dataSource); @@ -157,16 +169,21 @@ public InsertMessageResult insertMessage(TkmsShardPartition shardPartition, Tkms final KeyHolder keyHolder = new GeneratedKeyHolder(); var sql = insertMessageSqls.computeIfAbsent(shardPartition, k -> getInsertSql(shardPartition)); - jdbcTemplate.update(con -> { - PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); - try { - ps.setBinaryStream(1, serializeMessage(shardPartition, message)); - return ps; - } catch (Exception e) { - ps.close(); - throw e; + + ExceptionUtils.doUnchecked(() -> { + try (var is = serializeMessage(shardPartition, message)) { + jdbcTemplate.update(con -> ExceptionUtils.doUnchecked(() -> { + PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); + try { + ps.setBinaryStream(1, is); + return ps; + } catch (Exception e) { + ps.close(); + throw e; + } + }), keyHolder); } - }, keyHolder); + }); metricsTemplate.recordDaoMessageInsert(shardPartition, message.getTopic()); @@ -206,11 +223,20 @@ public List getMessages(TkmsShardPartition shardPartition, long e metricsTemplate.recordDaoPollFirstResult(shardPartition, startNanoTime); } - MessageRecord messageRecord = new MessageRecord(); - messageRecord.setId(rs.getLong(1)); - messageRecord.setMessage(messageSerializer.deserialize(shardPartition, rs.getBinaryStream(2))); - - records.add(messageRecord); + var messageId = rs.getLong(1); + MDC.put(properties.getMdc().getMessageIdKey(), String.valueOf(messageId)); + try { + MessageRecord messageRecord = new MessageRecord(); + messageRecord.setId(messageId); + messageRecord.setMessage(messageSerializer.deserialize(shardPartition, rs.getBinaryStream(2))); + + records.add(messageRecord); + } catch (Throwable t) { + throw new RuntimeException( + "Failed to deserialize message " + messageId + ", retrieved from table '" + getTableName(shardPartition) + "'.", t); + } finally { + MDC.remove(properties.getMdc().getMessageIdKey()); + } } } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/dao/TkmsMessageSerializer.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/dao/TkmsMessageSerializer.java index 42c00d0..224f346 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/dao/TkmsMessageSerializer.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/dao/TkmsMessageSerializer.java @@ -21,11 +21,14 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Arrays; +import java.util.Objects; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import net.jpountz.lz4.LZ4BlockInputStream; import net.jpountz.lz4.LZ4BlockOutputStream; import net.jpountz.lz4.LZ4Factory; +import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream; import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream; import org.springframework.beans.factory.annotation.Autowired; import org.xerial.snappy.SnappyFramedInputStream; @@ -53,7 +56,7 @@ public InputStream serialize(TkmsShardPartition shardPartition, TkmsMessage tkms Message storedMessage = toStoredMessage(tkmsMessage); int serializedSize = storedMessage.getSerializedSize(); - UnsynchronizedByteArrayOutputStream os = new UnsynchronizedByteArrayOutputStream(serializedSize / 4); + var os = new UnsynchronizedByteArrayOutputStream(serializedSize / 4); // 3 byte header for future use os.write(0); @@ -103,7 +106,29 @@ public InputStream serialize(TkmsShardPartition shardPartition, TkmsMessage tkms metricsTemplate.recordMessageSerialization(shardPartition, compressionAlgorithm, serializedSize, os.size()); - return os.toInputStream(); + os.flush(); + + var serializedBytes = os.toByteArray(); + + if (properties.isValidateSerialization(shardPartition.getShard())) { + var deSerializedMessage = deserialize(shardPartition, new UnsynchronizedByteArrayInputStream(serializedBytes)); + + if (!Arrays.equals(deSerializedMessage.getValue().toByteArray(), tkmsMessage.getValue()) + || !areSimilar(deSerializedMessage.getKey(), tkmsMessage.getKey()) + || !areSimilar(deSerializedMessage.getTopic(), tkmsMessage.getTopic()) + ) { + throw new IllegalStateException("Data corruption detected. Serialized and deserialized messages are not equal."); + } + } + + return new UnsynchronizedByteArrayInputStream(serializedBytes); + } + + private boolean areSimilar(String s0, String s1) { + if ((s0 == null || s0.length() == 0) && (s1 == null || s1.length() == 0)) { + return true; + } + return Objects.equals(s0, s1); } @Override @@ -114,7 +139,7 @@ public Message deserialize(TkmsShardPartition shardPartition, InputStream is) th is.read(); byte h2 = (byte) is.read(); - InputStream decompressedStream = decompress(h2, is); + var decompressedStream = decompress(h2, is); try { return StoredMessage.Message.parseFrom(decompressedStream); } finally { @@ -123,33 +148,33 @@ public Message deserialize(TkmsShardPartition shardPartition, InputStream is) th } protected void compressSnappy(StoredMessage.Message storedMessage, OutputStream out, Integer blockSize) throws IOException { - try (SnappyOutputStream compressOut = new SnappyOutputStream(out, blockSize == null ? 32 * 1024 : blockSize)) { + try (var compressOut = new SnappyOutputStream(out, blockSize == null ? 32 * 1024 : blockSize)) { storedMessage.writeTo(compressOut); } } protected void compressSnappyFramed(StoredMessage.Message storedMessage, OutputStream out, Integer blockSize) throws IOException { - try (SnappyFramedOutputStream compressOut = new SnappyFramedOutputStream(out, + try (var compressOut = new SnappyFramedOutputStream(out, blockSize == null ? SnappyFramedOutputStream.DEFAULT_BLOCK_SIZE : blockSize, DEFAULT_MIN_COMPRESSION_RATIO)) { storedMessage.writeTo(compressOut); } } protected void compressZstd(StoredMessage.Message storedMessage, OutputStream out, Integer level) throws IOException { - try (ZstdOutputStream compressOut = level == null ? new ZstdOutputStream(out) : new ZstdOutputStream(out, level)) { + try (var compressOut = level == null ? new ZstdOutputStream(out) : new ZstdOutputStream(out, level)) { storedMessage.writeTo(compressOut); } } protected void compressLz4(StoredMessage.Message storedMessage, OutputStream out, Integer blockSize) throws IOException { - try (LZ4BlockOutputStream compressOut = new LZ4BlockOutputStream(out, blockSize == null ? 1 << 16 : blockSize, + try (var compressOut = new LZ4BlockOutputStream(out, blockSize == null ? 1 << 16 : blockSize, LZ4Factory.fastestJavaInstance().fastCompressor())) { storedMessage.writeTo(compressOut); } } protected void compressGzip(StoredMessage.Message storedMessage, OutputStream out) throws IOException { - try (GZIPOutputStream compressOut = new GZIPOutputStream(out)) { + try (var compressOut = new GZIPOutputStream(out)) { storedMessage.writeTo(compressOut); } } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java index ada27e0..6fb41f7 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java @@ -84,6 +84,7 @@ public void cleanup() { tkmsStorageToKafkaProxy.setTkmsDaoProvider(tkmsDaoProvider); ((TransactionalKafkaMessageSender) transactionalKafkaMessageSender).setTkmsDaoProvider(tkmsDaoProvider); tkmsProperties.setDeferMessageRegistrationUntilCommit(false); + tkmsProperties.setValidateSerialization(false); } protected void setupConfig(boolean deferUntilCommit) { @@ -159,9 +160,13 @@ void testThatJsonStringMessageCanBeSentAndRetrieved(boolean deferUntilCommit) { } @ParameterizedTest - @ValueSource(booleans = {false, true}) - void testExactlyOnceDelivery(boolean deferUntilCommit) throws Exception { + @ValueSource(ints = {0, 1, 2, 3}) + void testExactlyOnceDelivery(int scenario) throws Exception { + var deferUntilCommit = scenario == 0 || scenario == 2; + var validateSerialization = scenario == 1 || scenario == 3; + setupConfig(deferUntilCommit); + tkmsProperties.setValidateSerialization(validateSerialization); String message = "Hello World!"; int threadsCount = 20;