Skip to content

Commit

Permalink
Remove the deprectated DSL_STORE_CONFIG property
Browse files Browse the repository at this point in the history
`StreamsConfig.DEFAULT_DSL_STORE_CONFIG` (`default.dsl.store`) is deprecated
in Kafka Streams `3.7.0` in preference to `DslStoreSuppliers`.

Remove using this property in the test that verifying in-memory state stores
and use the `DslStoreSuppliers` mechanism instead.

See KIP-954 for more details

https://cwiki.apache.org/confluence/display/KAFKA/KIP-954%3A+expand+default+DSL+store+configuration+to+custom+types
  • Loading branch information
sobychacko committed Apr 17, 2024
1 parent 5446be0 commit 4a46f0d
Showing 1 changed file with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import java.util.Map;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.KeyValueStore;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

Expand All @@ -45,6 +48,7 @@

/**
* @author Cédric Schaller
* @author Soby Chacko
*/
@SpringJUnitConfig
@DirtiesContext
Expand Down Expand Up @@ -80,18 +84,16 @@ public KafkaStreamsConfiguration kStreamsConfigWithInMemoryStateStores() {
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateStoreDir.toString());

// Property introduced with KIP-591 (Kafka 3.2) and deprecated (but still supported) with Kafka 3.7
props.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, "in_memory");
return new KafkaStreamsConfiguration(props);
}

@Bean
public KTable<?, ?> table(StreamsBuilder builder) {
KStream<Object, Object> stream = builder.stream("source-topic");
return stream.groupByKey()
.count(Materialized.as("store"));

return stream
.groupByKey()
.count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("store")
.withStoreType(BuiltInDslStoreSuppliers.IN_MEMORY));
}
}
}

0 comments on commit 4a46f0d

Please sign in to comment.