Skip to content

Commit

Permalink
Use SmartInitializingSingleton to build topology before start.
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenchickenlove committed Apr 3, 2024
1 parent 20ba75d commit 9fafa67
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;

import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.AbstractFactoryBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
Expand Down Expand Up @@ -61,7 +62,7 @@
* @since 1.1.4
*/
public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilder>
implements SmartLifecycle, BeanNameAware {
implements SmartLifecycle, BeanNameAware, SmartInitializingSingleton {

/**
* The default {@link Duration} of {@code 10 seconds} for close timeout.
Expand Down Expand Up @@ -338,12 +339,6 @@ protected StreamsBuilder createInstance() {

@Override
public boolean isAutoStartup() {
try {
this.topology = getObject().build(this.properties);
}
catch (Exception e) {
throw new RuntimeException(e);
}
return this.autoStartup;
}

Expand All @@ -363,8 +358,6 @@ public void start() {
try {
Assert.state(this.properties != null,
"streams configuration properties must not be null");
this.infrastructureCustomizer.configureTopology(this.topology);
LOGGER.debug(() -> this.topology.describe().toString());
this.kafkaStreams = new KafkaStreams(this.topology, this.properties, this.clientSupplier);
this.kafkaStreams.setStateListener(this.stateListener);
this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener);
Expand Down Expand Up @@ -437,6 +430,18 @@ public boolean isRunning() {
}
}

@Override
public void afterSingletonsInstantiated() {
try {
this.topology = getObject().build(this.properties);
this.infrastructureCustomizer.configureTopology(this.topology);
LOGGER.debug(() -> this.topology.describe().toString());
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Called whenever a {@link KafkaStreams} is added or removed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,58 +93,45 @@ public void testCleanupStreams() throws IOException {
}

@Test
public void testBuildWithPropertiesAndAutoStartUp() throws Exception {
boolean autoStartUp = true;
public void testBuildWithProperties() throws Exception {
streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) {
@Override
protected StreamsBuilder createInstance() {
return spy(super.createInstance());
}
};
streamsBuilderFactoryBean.setAutoStartup(autoStartUp);
streamsBuilderFactoryBean.afterPropertiesSet();
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
builder.stream(Pattern.compile("foo"));


boolean isAutoStartUp = streamsBuilderFactoryBean.isAutoStartup();
if (isAutoStartUp) {
streamsBuilderFactoryBean.start();
}

streamsBuilderFactoryBean.afterSingletonsInstantiated();
streamsBuilderFactoryBean.start();
StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties());
assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull();
assertThat(isAutoStartUp).isTrue();
assertThat(streamsBuilderFactoryBean.isRunning()).isTrue();
}

@Test
public void testBuildWithPropertiesAndNoAutoStartUp() throws Exception {
boolean autoStartUp = false;
public void testGetTopologyBeforeKafkaStreamsStart() throws Exception {
// Given
streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) {
@Override
protected StreamsBuilder createInstance() {
return spy(super.createInstance());
}
};
streamsBuilderFactoryBean.setAutoStartup(autoStartUp);
streamsBuilderFactoryBean.afterPropertiesSet();
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
builder.stream(Pattern.compile("foo"));

boolean isAutoStartUp = streamsBuilderFactoryBean.isAutoStartup();
if (isAutoStartUp) {
streamsBuilderFactoryBean.start();
}
// When
streamsBuilderFactoryBean.afterSingletonsInstantiated();

StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties());
// Then
assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull();
assertThat(isAutoStartUp).isFalse();
assertThat(streamsBuilderFactoryBean.isRunning()).isFalse();
}


@Configuration
@EnableKafkaStreams
public static class KafkaStreamsConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Properties;
import java.util.regex.Pattern;

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -79,13 +80,10 @@ public void testStreamsBuilderFactoryWithConfigProvidedLater() throws Exception
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
streamsBuilderFactoryBean.setStreamsConfiguration(props);
streamsBuilderFactoryBean.setAutoStartup(isAutoStartUp);
streamsBuilderFactoryBean.getObject().stream(Pattern.compile("foo"));

assertThat(streamsBuilderFactoryBean.isRunning()).isFalse();
boolean shouldAutoStartUp = streamsBuilderFactoryBean.isAutoStartup();
streamsBuilderFactoryBean.start();
assertThat(streamsBuilderFactoryBean.isRunning()).isTrue();
assertThat(shouldAutoStartUp).isEqualTo(isAutoStartUp);
}

@Configuration
Expand All @@ -100,6 +98,23 @@ public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder() {
return streamsBuilderFactoryBean;
}

@Bean
public KafkaStreamsService kafkaStreamsService(StreamsBuilder streamsBuilder) {
return new KafkaStreamsService(streamsBuilder);
}

}

static class KafkaStreamsService {
private final StreamsBuilder streamsBuilder;

public KafkaStreamsService(StreamsBuilder streamsBuilder) {
this.streamsBuilder = streamsBuilder;
buildPipeline();
}

public void buildPipeline() {
this.streamsBuilder.stream("foo");
}
}
}

0 comments on commit 9fafa67

Please sign in to comment.