diff --git a/samples/README.adoc b/samples/README.adoc index fff00cbed6..3e2e536e77 100644 --- a/samples/README.adoc +++ b/samples/README.adoc @@ -1,7 +1,8 @@ == Samples -* sample-01 - simple producer/consumer with dead-letter topic -* sample-02 - multi-method listener -* sample-03 - transactions -* sample-04 - topic based (non-blocking) retry -* sample-05 - global embedded Kafka testing +* sample-01 - Simple producer/consumer with dead-letter topic +* sample-02 - Multi-method listener +* sample-03 - Transactions +* sample-04 - Topic based (non-blocking) retry +* sample-05 - Global embedded Kafka testing +* sample-06 - Kafka Streams tests with TopologyTestDriver diff --git a/samples/sample-01/pom.xml b/samples/sample-01/pom.xml index 9fd028c1de..ab08e189cc 100644 --- a/samples/sample-01/pom.xml +++ b/samples/sample-01/pom.xml @@ -14,7 +14,7 @@ org.springframework.boot spring-boot-starter-parent - 3.2.1 + 3.2.5 diff --git a/samples/sample-02/pom.xml b/samples/sample-02/pom.xml index 7238312935..32dcc5c9ec 100644 --- a/samples/sample-02/pom.xml +++ b/samples/sample-02/pom.xml @@ -14,7 +14,7 @@ org.springframework.boot spring-boot-starter-parent - 3.2.1 + 3.2.5 diff --git a/samples/sample-03/pom.xml b/samples/sample-03/pom.xml index 93c8eddac6..d522310aa8 100644 --- a/samples/sample-03/pom.xml +++ b/samples/sample-03/pom.xml @@ -14,7 +14,7 @@ org.springframework.boot spring-boot-starter-parent - 3.2.1 + 3.2.5 diff --git a/samples/sample-04/pom.xml b/samples/sample-04/pom.xml index 1d7640c50f..9c2d91151c 100644 --- a/samples/sample-04/pom.xml +++ b/samples/sample-04/pom.xml @@ -14,7 +14,7 @@ org.springframework.boot spring-boot-starter-parent - 3.2.1 + 3.2.5 diff --git a/samples/sample-05/pom.xml b/samples/sample-05/pom.xml index 00441781ff..66ce23ee95 100644 --- a/samples/sample-05/pom.xml +++ b/samples/sample-05/pom.xml @@ -5,7 +5,7 @@ org.springframework.boot spring-boot-starter-parent - 3.2.1 + 3.2.5 diff --git a/samples/sample-06/README.adoc b/samples/sample-06/README.adoc new file mode 100644 index 0000000000..0bfaa0fda2 --- /dev/null +++ b/samples/sample-06/README.adoc @@ -0,0 +1,36 @@ +== Sample 6 + +This sample demonstrates a simple Kafka Streams topology tested with TopologyTestDriver. + +The application contains a simple Kafka Streams topology that counts the keys seen so far in a stateful manner. +The corresponding `TopologyTestDriver` based JUnit test verifies the behavior of the business logic in the Kafka Streams topology. + + +Console output describe the topology as shown below: + + . ____ _ __ _ _ + /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ +( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ + \\/ ___)| |_)| | | | | || (_| | ) ) ) ) + ' |____| .__|_| |_|_| |_\__, | / / / / + =========|_|==============|___/=/_/_/_/ + :: Spring Boot :: (v2.5.2) + +2021-06-30 17:38:33.637 INFO 92063 --- [ main] com.example.ApplicationTests : Starting ApplicationTests using Java 11.0.10 on C02FL1KSMD6T with PID 92063 (started by igomez in /Users/igomez/Projects/spring-kafka/samples/sample-05) +2021-06-30 17:38:33.638 INFO 92063 --- [ main] com.example.ApplicationTests : The following profiles are active: test +2021-06-30 17:38:35.027 INFO 92063 --- [ main] com.example.ApplicationTests : Started ApplicationTests in 1.73 seconds (JVM running for 2.833) +2021-06-30 17:38:35.695 INFO 92063 --- [ main] com.example.ApplicationTests : Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-AGGREGATE-0000000002 + Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001]) + --> KTABLE-SUPPRESS-0000000003 + <-- KSTREAM-SOURCE-0000000000 + Processor: KTABLE-SUPPRESS-0000000003 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000004]) + --> KTABLE-TOSTREAM-0000000005 + <-- KSTREAM-AGGREGATE-0000000002 + Processor: KTABLE-TOSTREAM-0000000005 (stores: []) + --> KSTREAM-SINK-0000000006 + <-- KTABLE-SUPPRESS-0000000003 + Sink: KSTREAM-SINK-0000000006 (topic: output) + <-- KTABLE-TOSTREAM-0000000005 diff --git a/samples/sample-06/pom.xml b/samples/sample-06/pom.xml new file mode 100644 index 0000000000..117beeb954 --- /dev/null +++ b/samples/sample-06/pom.xml @@ -0,0 +1,128 @@ + + + 4.0.0 + + com.example + kafka-sample-06 + 3.2.0-SNAPSHOT + jar + + kafka-sample-06 + Kafka Sample 6 + + org.springframework.boot + spring-boot-starter-parent + 3.2.5 + + + + + UTF-8 + UTF-8 + 17 + + + + + + org.springframework.kafka + spring-kafka + + + + org.apache.kafka + kafka-streams + + + + org.springframework.boot + spring-boot-starter-test + test + + + + org.springframework.boot + spring-boot-starter-web + + + + org.apache.kafka + kafka-streams-test-utils + test + + + + org.apache.kafka + kafka-clients + test + test + + + + org.awaitility + awaitility + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + + spring-milestones + Spring milestones + https://repo.spring.io/libs-milestone-local + + + rsocket-snapshots + RSocket Snapshots + https://oss.jfrog.org/oss-snapshot-local + + true + + + + spring-releases + Spring Releases + https://repo.spring.io/release + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + spring-milestones + Spring Milestones + https://repo.spring.io/milestone + + false + + + + spring-releases + Spring Releases + https://repo.spring.io/release + + + + + diff --git a/samples/sample-06/src/main/java/com/example/Application.java b/samples/sample-06/src/main/java/com/example/Application.java new file mode 100644 index 0000000000..3e21fa45da --- /dev/null +++ b/samples/sample-06/src/main/java/com/example/Application.java @@ -0,0 +1,41 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.kafka.annotation.EnableKafkaStreams; + +/** + * + * @author Nacho Munoz + * @since 3.2.0 + */ +@EnableKafkaStreams +@SpringBootApplication +public class Application { + + private final Logger logger = LoggerFactory.getLogger(Application.class); + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + +} diff --git a/samples/sample-06/src/main/java/com/example/Topology.java b/samples/sample-06/src/main/java/com/example/Topology.java new file mode 100644 index 0000000000..a58741e824 --- /dev/null +++ b/samples/sample-06/src/main/java/com/example/Topology.java @@ -0,0 +1,64 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import java.time.Duration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Suppressed; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + + +/** + * A basic topology that counts records by key and materialises the output into a new topic + * + * @author Nacho Munoz + * @author Soby Chacko + * @since 3.2.0 + */ +@Configuration +@Component +public class Topology { + private final String inputTopic; + + private final String outputTopic; + + @Autowired + public Topology(@Value("${input-topic.name}") final String inputTopic, + @Value("${output-topic.name}") final String outputTopic) { + this.inputTopic = inputTopic; + this.outputTopic = outputTopic; + } + + @Autowired + public void defaultTopology(final StreamsBuilder builder) { + builder.stream(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())) + .groupByKey() + .count() + .suppress(Suppressed.untilTimeLimit(Duration.ofMillis(5), Suppressed.BufferConfig.unbounded())) + .toStream() + .to(outputTopic); + + } + +} diff --git a/samples/sample-06/src/main/resources/application-test.properties b/samples/sample-06/src/main/resources/application-test.properties new file mode 100644 index 0000000000..af04773f38 --- /dev/null +++ b/samples/sample-06/src/main/resources/application-test.properties @@ -0,0 +1,6 @@ +logging.level.root=off +logging.level.com.example=info + +bootstrap.servers= +spring.kafka.properties.bootstrap.servers=${bootstrap.servers} +spring.kafka.streams.application-id=Sample-06-Service-Test diff --git a/samples/sample-06/src/main/resources/application.properties b/samples/sample-06/src/main/resources/application.properties new file mode 100644 index 0000000000..c6fae65697 --- /dev/null +++ b/samples/sample-06/src/main/resources/application.properties @@ -0,0 +1,4 @@ +logging.level.root=off +logging.level.com.example=info + +spring.kafka.streams.application-id=Sample-06-Service diff --git a/samples/sample-06/src/main/resources/application.yml b/samples/sample-06/src/main/resources/application.yml new file mode 100644 index 0000000000..1f06f69c7f --- /dev/null +++ b/samples/sample-06/src/main/resources/application.yml @@ -0,0 +1,6 @@ +logging: + level.root: info +input-topic: + name: input +output-topic: + name: output diff --git a/samples/sample-06/src/test/java/com/example/ApplicationTests.java b/samples/sample-06/src/test/java/com/example/ApplicationTests.java new file mode 100644 index 0000000000..76cd9b521e --- /dev/null +++ b/samples/sample-06/src/test/java/com/example/ApplicationTests.java @@ -0,0 +1,89 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import java.time.Duration; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.config.StreamsBuilderFactoryBean; + +/** + * @author Nacho Munoz + * @author Soby Chacko + * @since 3.2.0 + */ +@SpringBootTest +public class ApplicationTests { + private final Logger logger = LoggerFactory.getLogger(ApplicationTests.class); + + private TopologyTestDriver testDriver; + + @Value("${input-topic.name}") + private String inputTopicName; + + @Value("${output-topic.name}") + private String outputTopicName; + + private TestInputTopic inputTopic; + + private TestOutputTopic outputTopic; + + @Autowired + private StreamsBuilderFactoryBean streamsBuilder; + + @BeforeEach + public void setup() { + this.testDriver = new TopologyTestDriver(streamsBuilder.getTopology(), streamsBuilder.getStreamsConfiguration()); + logger.info(streamsBuilder.getTopology().describe().toString()); + this.inputTopic = testDriver.createInputTopic(inputTopicName, Serdes.Integer().serializer(), Serdes.String().serializer()); + this.outputTopic = testDriver.createOutputTopic(outputTopicName, Serdes.Integer().deserializer(), Serdes.Long().deserializer()); + } + + @AfterEach + public void after() { + if (testDriver != null) { + this.testDriver.close(); + } + } + + @Test + public void testTopologyLogic() { + inputTopic.pipeInput(1, "test", 1L); + inputTopic.pipeInput(1, "test", 10L); + inputTopic.pipeInput(2, "test", 2L); + + Awaitility.waitAtMost(Duration.ofSeconds(5)).until(() -> outputTopic.getQueueSize() == 2L); + assertThat(outputTopic.readValuesToList()).isEqualTo(List.of(2L, 1L)); + } + +}