-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Separate kafka producers for each proxy #78
Conversation
jakartaValidationApi : 'jakarta.validation:jakarta.validation-api:3.0.2', | ||
javaxValidationApi : "javax.validation:validation-api:2.0.1.Final", | ||
kafkaStreams : 'org.apache.kafka:kafka-streams:3.4.0', | ||
kafkaStreams : 'org.apache.kafka:kafka-streams:3.2.3', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Syncing with WJP 2.7
|
||
### Removed | ||
|
||
- Support for Spring Boot 2.6 . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setup.md
states that Spring Boot 2.5 is supported.
CHANGELOG.md
Outdated
This would allow to recover from unforeseen kafka clients' bugs and also release resources when another pod takes over the proxying. | ||
|
||
- The default linger time on kafka producer was increased from 5 ms. to 1000 ms. | ||
This would allow potentially larger batches to get formed. We are not increasing the latency, because we override the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds contradictory: we either let messages linger around a bit longer and then the latency is increased a bit or we don't. I think some further explanation could be useful here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just add the word "substantially" after latency.
} catch (InterruptException e) { | ||
log.error("Kafka producer was interrupted for " + shardPartition + ".", e); | ||
// Rethrow and force the recreation of the producer. | ||
throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we handle this case differently from the one below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, we are logging down a trackable message.
And we force the poll loop to exit, and thus, a new producer instance to be created.
@@ -71,7 +71,7 @@ public void afterPropertiesSet() { | |||
environmentValidator.validate(); | |||
|
|||
for (String topic : properties.getTopics()) { | |||
validateTopic(properties.getDefaultShard(), topic); | |||
validateTopic(topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid we might have an edge case here when the topic exists, the service has DESCRIBE ACL on it, but it does not have WRITE. Topic validation will pass but produce requests will fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should implement topic validation via AdminClient instead.
Currently if we ask for a not existing topic, the metadata logs about it will get spammed until the producer is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if we use compression for message sending too or only for DB writes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The compression is enabled for only db writes.
However, we should enable compression for sends as well, but already via a separate PR.
private IExecutorServicesProvider executorServicesProvider; | ||
private ScheduledTaskExecutor scheduledTaskExecutor; | ||
|
||
public void afterPropertiesSet() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can ScheduledTaskExecutor
be already passed from the constructor as we should have access to it in TkmsConfiguration
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently no, currently only the IExecutorServicesProvider bean is available.
Context
Wise had Kafka producer flush hanged when service had troubles with direct buffers limits. This made it hard to debug what is going on. We could have preferred to get at least some kind of error message.
Removed
Changed
Every proxy has its own, independent, Kafka producer.
Before, one producer was shared by all partitions. And, the default shard's producer was also used for topics validation.
Kafka producer's flush will be now interrupted from another thread, by a separate housekeeping service.
Wise had an incident, where the
flush()
call hanged forever, and it was not easy to derive that this is the case.Now we will at least get clear error logs, when this happens.
Proxies' Kafka producers will be closed after the poll loop exits.
This would allow to recover from unforeseen kafka clients' bugs and also release resources when another pod takes over the proxying.
The default linger time on kafka producer was increased from 5 ms. to 1000 ms.
This would allow potentially larger batches to get formed. We are not increasing the latency, because we override the
lingering mechanism via
flush
call anyway.Checklist