From 4a46f0d2c3f637d6791f9c7e863f7dd1c1d34363 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 17 Apr 2024 19:05:09 -0400 Subject: [PATCH] Remove the deprectated DSL_STORE_CONFIG property `StreamsConfig.DEFAULT_DSL_STORE_CONFIG` (`default.dsl.store`) is deprecated in Kafka Streams `3.7.0` in preference to `DslStoreSuppliers`. Remove using this property in the test that verifying in-memory state stores and use the `DslStoreSuppliers` mechanism instead. See KIP-954 for more details https://cwiki.apache.org/confluence/display/KAFKA/KIP-954%3A+expand+default+DSL+store+configuration+to+custom+types --- ...sBuilderFactoryBeanInMemoryStateStoreTests.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanInMemoryStateStoreTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanInMemoryStateStoreTests.java index 4d9bd19200..07185ba917 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanInMemoryStateStoreTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanInMemoryStateStoreTests.java @@ -25,11 +25,14 @@ import java.util.Map; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers; +import org.apache.kafka.streams.state.KeyValueStore; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -45,6 +48,7 @@ /** * @author Cédric Schaller + * @author Soby Chacko */ @SpringJUnitConfig @DirtiesContext @@ -80,18 +84,16 @@ public KafkaStreamsConfiguration kStreamsConfigWithInMemoryStateStores() { props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); props.put(StreamsConfig.STATE_DIR_CONFIG, stateStoreDir.toString()); - - // Property introduced with KIP-591 (Kafka 3.2) and deprecated (but still supported) with Kafka 3.7 - props.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, "in_memory"); return new KafkaStreamsConfiguration(props); } @Bean public KTable table(StreamsBuilder builder) { KStream stream = builder.stream("source-topic"); - return stream.groupByKey() - .count(Materialized.as("store")); - + return stream + .groupByKey() + .count(Materialized.>as("store") + .withStoreType(BuiltInDslStoreSuppliers.IN_MEMORY)); } } }