Skip to content

Commit

Permalink
GH-3176: Take property default.dsl.store into account
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-schaller committed Apr 5, 2024
1 parent f1e48f4 commit f0a0aaa
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
Expand Down Expand Up @@ -58,6 +60,7 @@
* @author Gary Russell
* @author Julien Wittouck
* @author Sanghyeok An
* @author Cédric Schaller
*
* @since 1.1.4
*/
Expand Down Expand Up @@ -328,7 +331,7 @@ protected StreamsBuilder createInstance() {
Assert.state(this.properties != null,
"streams configuration properties must not be null");
}
StreamsBuilder builder = new StreamsBuilder();
StreamsBuilder builder = createStreamBuilder();
this.infrastructureCustomizer.configureBuilder(builder);
return builder;
}
Expand Down Expand Up @@ -442,6 +445,17 @@ public void afterSingletonsInstantiated() {
}
}

private StreamsBuilder createStreamBuilder() {
if (this.properties == null) {
return new StreamsBuilder();
}
else {
StreamsConfig streamsConfig = new StreamsConfig(this.properties);
TopologyConfig topologyConfig = new TopologyConfig(streamsConfig);
return new StreamsBuilder(topologyConfig);
}
}

/**
* Called whenever a {@link KafkaStreams} is added or removed.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.config;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Cédric Schaller
*/
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka
public class StreamsBuilderFactoryBeanInMemoryStateStoreTests {

private static Path stateStoreDir;

@BeforeAll
static void beforeAll() throws IOException {
stateStoreDir = Files.createTempDirectory(StreamsBuilderFactoryBeanInMemoryStateStoreTests.class.getSimpleName());
}

@Test
void testStateStoreIsInMemory() {
// Testing that an in-memory state store is used requires accessing the internal state of StreamsBuilder via reflection
// Therefore, we check the non-existence of RocksDB files instead
assertThat(stateStoreDir).isEmptyDirectory();
}

@Configuration
@EnableKafkaStreams
static class KafkaStreamsConfig {

@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigWithInMemoryStateStores() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "should-be-stored-in-memory");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateStoreDir.toString());

// Property introduced with KIP-591 (Kafka 3.2) and deprecated (but still supported) with Kafka 3.7
props.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, "in_memory");
return new KafkaStreamsConfiguration(props);
}

@Bean
public KTable<?, ?> table(StreamsBuilder builder) {
KStream<Object, Object> stream = builder.stream("source-topic");
return stream.groupByKey()
.count(Materialized.as("store"));

}
}
}

0 comments on commit f0a0aaa

Please sign in to comment.