-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-2391: Add Kraft Version of EmbeddedKafkaBroker #2820
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
But there are some questions.
Thanks
Two implementations are provided: | ||
|
||
* `EmbeddedKafkaZKBroker` - legacy implementation which starts an embedded `Zookeeper` instance. | ||
* `EmbeddedKafkaKraftBroker` - uses Kraft instead of `Zookeeper` in combinder controller and broker modes (since 3.6) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is 3.6
?
You might mean 3.1
?
@@ -283,6 +296,8 @@ public class KafkaStreamsTests { | |||
|
|||
Starting with version 2.2.4, you can also use the `@EmbeddedKafka` annotation to specify the Kafka ports property. | |||
|
|||
Starting with version 3.6; set the `kraft` property to `true` to use an `EmbeddedKafkaKraftBroker` instead of an `EmbeddedKafkaZKBroker`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DITTO
@@ -12,3 +12,8 @@ For changes in earlier version, see xref:appendix/change-history.adoc[Change His | |||
|
|||
This version requires the 3.5.1 `kafka-clients`. | |||
|
|||
[[x30-ekb]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
x31
?
} | ||
|
||
@Override | ||
public void setZkPort(int zkPort) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this method belongs to this class.
So, probably just bite a bullet and remove it from the interface as well?
* | ||
* @since 2.2 | ||
*/ | ||
public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker, InitializingBean, DisposableBean { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like those InitializingBean
& DisposableBean
are on the EmbeddedKafkaBroker
interface...
public void afterPropertiesSet() { | ||
overrideExitMethods(); | ||
addDefaultBrokerPropsIfAbsent(this.brokerProperties, this.count); | ||
start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you start it from the afterPropertiesSet()
why do we need a SmartLifecycle
contract then?
Why cannot we stop everything in the destroy()
as it is for ZK variant?
.build()); | ||
this.brokerProperties.forEach((k, v) -> clusterBuilder.setConfigProp((String) k, (String) v)); | ||
this.cluster = clusterBuilder.build(); | ||
// cluster.nonFatalFaultHandler().setIgnore(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have anything else in mind for this line of code?
* @return whether to use KRaft. | ||
* @since 3.6 | ||
*/ | ||
boolean kraft() default false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why cannot we use Kraft by default?
Isn't it faster?
|
||
@Test | ||
void testUpDown() { | ||
LogFactory.getLog(getClass()).info("foo"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure in this log message...
EmbeddedKafkaKraftBroker kafka = new EmbeddedKafkaKraftBroker(1, 1, "topic1"); | ||
kafka.afterPropertiesSet(); | ||
kafka.start(); | ||
kafka.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like this test verifies anything.
Or you just haven't finished here yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Just one time.
I guess even if you use that kraft = false
in the modified tests, many others are in default KRaft mode anyway.
spring-kafka-docs/src/main/antora/modules/ROOT/pages/testing.adoc
Outdated
Show resolved
Hide resolved
Co-authored-by: Artem Bilan <[email protected]>
Thanks |
Resolves #2391