-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Separate kafka producers for each proxy #78
Changes from all commits
1705649
5c34bd5
51c7bcf
86f3691
82d94a4
c208ab3
dc041a7
f6a7cfc
81eb6c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,30 +1,30 @@ | ||
ext { | ||
protobufVersion = "3.22.4" | ||
springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: '2.6.15'}" | ||
protobufVersion = "3.24.0" | ||
springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: '2.7.18'}" | ||
libraries = [ | ||
// version defined | ||
awaitility : 'org.awaitility:awaitility:4.2.0', | ||
commonsIo : 'commons-io:commons-io:2.11.0', | ||
curatorFramework : 'org.apache.curator:curator-framework:5.5.0', | ||
curatorRecipes : 'org.apache.curator:curator-recipes:5.5.0', | ||
guava : 'com.google.guava:guava:31.1-jre', | ||
guava : 'com.google.guava:guava:32.1.3-jre', | ||
jakartaValidationApi : 'jakarta.validation:jakarta.validation-api:3.0.2', | ||
javaxValidationApi : "javax.validation:validation-api:2.0.1.Final", | ||
kafkaStreams : 'org.apache.kafka:kafka-streams:3.4.0', | ||
kafkaStreams : 'org.apache.kafka:kafka-streams:3.2.3', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Syncing with WJP 2.7 |
||
lz4Java : 'org.lz4:lz4-java:1.8.0', | ||
protobufJava : "com.google.protobuf:protobuf-java:${protobufVersion}", | ||
semver4j : "com.vdurmont:semver4j:3.1.0", | ||
snappyJava : 'org.xerial.snappy:snappy-java:1.1.10.1', | ||
snappyJava : 'org.xerial.snappy:snappy-java:1.1.10.4', | ||
spotbugsAnnotations : "com.github.spotbugs:spotbugs-annotations:${spotbugs.toolVersion.get()}", | ||
springBootDependencies : "org.springframework.boot:spring-boot-dependencies:${springBootVersion}", | ||
twBaseUtils : 'com.transferwise.common:tw-base-utils:1.10.1', | ||
twContext : 'com.transferwise.common:tw-context:0.12.0', | ||
twContextStarter : 'com.transferwise.common:tw-context-starter:0.12.0', | ||
twGracefulShutdown : 'com.transferwise.common:tw-graceful-shutdown:2.11.0', | ||
twGracefulShutdownInterfaces : 'com.transferwise.common:tw-graceful-shutdown-interfaces:2.11.0', | ||
twLeaderSelector : 'com.transferwise.common:tw-leader-selector:1.10.0', | ||
twLeaderSelectorStarter : 'com.transferwise.common:tw-leader-selector-starter:1.10.0', | ||
zstdJni : 'com.github.luben:zstd-jni:1.5.0-4', | ||
twBaseUtils : 'com.transferwise.common:tw-base-utils:1.12.1', | ||
twContext : 'com.transferwise.common:tw-context:1.0.0', | ||
twContextStarter : 'com.transferwise.common:tw-context-starter:1.0.0', | ||
twGracefulShutdown : 'com.transferwise.common:tw-graceful-shutdown:2.14.2', | ||
twGracefulShutdownInterfaces : 'com.transferwise.common:tw-graceful-shutdown-interfaces:2.14.2', | ||
twLeaderSelector : 'com.transferwise.common:tw-leader-selector:1.10.1', | ||
twLeaderSelectorStarter : 'com.transferwise.common:tw-leader-selector-starter:1.10.1', | ||
zstdJni : 'com.github.luben:zstd-jni:1.5.2-1', | ||
|
||
// versions managed by spring-boot-dependencies platform | ||
commonsLang3 : 'org.apache.commons:commons-lang3', | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
version=0.25.1 | ||
version=0.26.0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
distributionBase=GRADLE_USER_HOME | ||
distributionPath=wrapper/dists | ||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip | ||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip | ||
networkTimeout=10000 | ||
zipStoreBase=GRADLE_USER_HOME | ||
zipStorePath=wrapper/dists |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package com.transferwise.kafka.tkms; | ||
|
||
import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor.TaskHandle; | ||
import java.time.Duration; | ||
|
||
public interface ITkmsInterrupterService { | ||
|
||
TaskHandle interruptAfter(Thread t, Duration duration); | ||
|
||
/** | ||
* Cancels the previously set interruption task. | ||
* | ||
* <p>The handle has to be the one returned from the `interruptAfter` call. | ||
*/ | ||
void cancelInterruption(TaskHandle handler); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package com.transferwise.kafka.tkms; | ||
|
||
import com.transferwise.common.baseutils.concurrency.IExecutorServicesProvider; | ||
import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor; | ||
import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor.TaskHandle; | ||
import java.time.Duration; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.springframework.beans.factory.InitializingBean; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
|
||
@Slf4j | ||
public class TkmsInterrupterService implements ITkmsInterrupterService, InitializingBean { | ||
|
||
@Autowired | ||
private IExecutorServicesProvider executorServicesProvider; | ||
private ScheduledTaskExecutor scheduledTaskExecutor; | ||
|
||
public void afterPropertiesSet() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently no, currently only the IExecutorServicesProvider bean is available. |
||
this.scheduledTaskExecutor = executorServicesProvider.getGlobalScheduledTaskExecutor(); | ||
} | ||
|
||
@Override | ||
public TaskHandle interruptAfter(Thread t, Duration duration) { | ||
return scheduledTaskExecutor.scheduleOnce(() -> { | ||
var threadName = Thread.currentThread().getName(); | ||
try { | ||
Thread.currentThread().setName("tkms-interrupt"); | ||
log.warn("Had to interrupt thread '{}'.", t.getName()); | ||
t.interrupt(); | ||
} finally { | ||
Thread.currentThread().setName(threadName); | ||
} | ||
}, duration); | ||
} | ||
|
||
@Override | ||
public void cancelInterruption(TaskHandle handler) { | ||
handler.stop(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
import com.transferwise.kafka.tkms.api.TkmsShardPartition; | ||
import com.transferwise.kafka.tkms.config.ITkmsDaoProvider; | ||
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; | ||
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; | ||
import com.transferwise.kafka.tkms.config.TkmsProperties; | ||
import com.transferwise.kafka.tkms.dao.ITkmsDao.MessageRecord; | ||
import com.transferwise.kafka.tkms.metrics.ITkmsMetricsTemplate; | ||
|
@@ -41,8 +42,10 @@ | |
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.commons.lang3.mutable.MutableLong; | ||
import org.apache.commons.lang3.mutable.MutableObject; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.clients.producer.RecordMetadata; | ||
import org.apache.kafka.common.errors.InterruptException; | ||
import org.apache.kafka.common.errors.RetriableException; | ||
import org.apache.kafka.common.header.Header; | ||
import org.apache.kafka.common.header.internals.RecordHeader; | ||
|
@@ -77,6 +80,8 @@ public class TkmsStorageToKafkaProxy implements GracefulShutdownStrategy, ITkmsS | |
private ITkmsMessageInterceptors messageIntereceptors; | ||
@Autowired | ||
private SharedReentrantLockBuilderFactory lockBuilderFactory; | ||
@Autowired | ||
private ITkmsInterrupterService tkmsInterrupterService; | ||
|
||
@TestOnly | ||
private volatile boolean paused = false; | ||
|
@@ -87,7 +92,6 @@ public class TkmsStorageToKafkaProxy implements GracefulShutdownStrategy, ITkmsS | |
private final List<LeaderSelectorV2> leaderSelectors = new ArrayList<>(); | ||
private RateLimiter exceptionRateLimiter = RateLimiter.create(2); | ||
|
||
|
||
@Override | ||
public void afterPropertiesSet() { | ||
for (int s = 0; s < properties.getShardsCount(); s++) { | ||
|
@@ -153,6 +157,16 @@ public void afterPropertiesSet() { | |
} | ||
|
||
private void poll(Control control, TkmsShardPartition shardPartition) { | ||
var kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(shardPartition, UseCase.PROXY); | ||
|
||
try { | ||
poll0(control, shardPartition, kafkaProducer); | ||
} finally { | ||
tkmsKafkaProducerProvider.closeKafkaProducer(shardPartition, UseCase.PROXY); | ||
} | ||
} | ||
|
||
private void poll0(Control control, TkmsShardPartition shardPartition, KafkaProducer<String, byte[]> kafkaProducer) { | ||
|
||
int pollerBatchSize = properties.getPollerBatchSize(shardPartition.getShard()); | ||
long startTimeMs = System.currentTimeMillis(); | ||
|
@@ -257,7 +271,7 @@ private void poll(Control control, TkmsShardPartition shardPartition) { | |
var contexts = new MessageProcessingContext[records.size()]; | ||
|
||
final var kafkaSendStartNanoTime = System.nanoTime(); | ||
var kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(shardPartition.getShard()); | ||
|
||
boolean atLeastOneSendDone = false; | ||
|
||
producerRecordMap.clear(); | ||
|
@@ -335,7 +349,13 @@ private void poll(Control control, TkmsShardPartition shardPartition) { | |
} | ||
|
||
if (atLeastOneSendDone) { | ||
kafkaProducer.flush(); | ||
var interruptionHandle = | ||
tkmsInterrupterService.interruptAfter(Thread.currentThread(), properties.getInternals().getFlushInterruptionDuration()); | ||
try { | ||
kafkaProducer.flush(); | ||
} finally { | ||
tkmsInterrupterService.cancelInterruption(interruptionHandle); | ||
} | ||
} | ||
|
||
for (int i = 0; i < records.size(); i++) { | ||
|
@@ -372,6 +392,10 @@ private void poll(Control control, TkmsShardPartition shardPartition) { | |
if (failedSendsCount.get() > 0) { | ||
proxyCyclePauseRequest.setValue(tkmsPaceMaker.getPollingPauseOnError(shardPartition)); | ||
} | ||
} catch (InterruptException e) { | ||
log.error("Kafka producer was interrupted for " + shardPartition + ".", e); | ||
// Rethrow and force the recreation of the producer. | ||
throw e; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we handle this case differently from the one below? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, we are logging down a trackable message. And we force the poll loop to exit, and thus, a new producer instance to be created. |
||
} catch (Throwable t) { | ||
log.error(t.getMessage(), t); | ||
proxyCyclePauseRequest.setValue(tkmsPaceMaker.getPollingPauseOnError(shardPartition)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setup.md
states that Spring Boot 2.5 is supported.