Skip to content

Commit

Permalink
Add an option to leave group when closing streams
Browse files Browse the repository at this point in the history
Fixes: #3168 

* Adding an option in `StreamsBuilderFactoryBean` to allow the consumer
   to leave the group upon closing the Kafka Streams.
  • Loading branch information
juwit authored Mar 28, 2024
1 parent ae775d8 commit 43df65f
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ You can declare and use any additional `StreamsBuilderFactoryBean` beans as well
You can perform additional customization of that bean, by providing a bean that implements `StreamsBuilderFactoryBeanConfigurer`.
If there are multiple such beans, they will be applied according to their `Ordered.order` property.


=== Cleanup & Stop configuration

When the factory is stopped, the `KafkaStreams.close()` is called with 2 parameters :

* closeTimeout : how long to to wait for the threads to shutdown (defaults to `DEFAULT_CLOSE_TIMEOUT` set to 10 seconds). Can be configured using `StreamsBuilderFactoryBean.setCloseTimeout()`.
* leaveGroupOnClose : to trigger consumer leave call from the group (defaults to `false`). Can be configured using `StreamsBuilderFactoryBean.setLeaveGroupOnClose()`.

By default, when the factory bean is stopped, the `KafkaStreams.cleanUp()` method is called.
Starting with version 2.1.2, the factory bean has additional constructors, taking a `CleanupConfig` object that has properties to let you control whether the `cleanUp()` method is called during `start()` or `stop()` or neither.
Starting with version 2.7, the default is to never clean up local state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
* @author Nurettin Yilmaz
* @author Denis Washington
* @author Gary Russell
* @author Julien Wittouck
*
* @since 1.1.4
*/
Expand Down Expand Up @@ -100,6 +101,8 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde

private Duration closeTimeout = DEFAULT_CLOSE_TIMEOUT;

private boolean leaveGroupOnClose = false;

private KafkaStreams kafkaStreams;

private volatile boolean running;
Expand Down Expand Up @@ -225,6 +228,15 @@ public void setCloseTimeout(int closeTimeout) {
this.closeTimeout = Duration.ofSeconds(closeTimeout); // NOSONAR (sync)
}

/**
* Specify if the consumer should leave the group when stopping Kafka Streams. Defaults to false.
* @param leaveGroupOnClose true to leave the group when stopping the Streams
* @since 3.2.0
*/
public void setLeaveGroupOnClose(boolean leaveGroupOnClose) {
this.leaveGroupOnClose = leaveGroupOnClose;
}

/**
* Providing access to the associated {@link Topology} of this
* {@link StreamsBuilderFactoryBean}.
Expand Down Expand Up @@ -383,7 +395,10 @@ public void stop() {
if (this.running) {
try {
if (this.kafkaStreams != null) {
this.kafkaStreams.close(this.closeTimeout);
this.kafkaStreams.close(new KafkaStreams.CloseOptions()
.timeout(this.closeTimeout)
.leaveGroup(this.leaveGroupOnClose)
);
if (this.cleanupConfig.cleanupOnStop()) {
this.kafkaStreams.cleanUp();
}
Expand Down

0 comments on commit 43df65f

Please sign in to comment.