Skip to content

Commit

Permalink
Kafka Streams sample demo for TopologyTestDriver
Browse files Browse the repository at this point in the history
Fixes: #1853

* Demonstrating Kafka Streams `TopologyTestDriver`
* README updates in the sample
* Update Spring Boot parent version of the samples to 3.2.5
  • Loading branch information
nachomdo authored and sobychacko committed Apr 29, 2024
1 parent 4439bf3 commit e547d0a
Show file tree
Hide file tree
Showing 14 changed files with 385 additions and 10 deletions.
11 changes: 6 additions & 5 deletions samples/README.adoc
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
== Samples

* sample-01 - simple producer/consumer with dead-letter topic
* sample-02 - multi-method listener
* sample-03 - transactions
* sample-04 - topic based (non-blocking) retry
* sample-05 - global embedded Kafka testing
* sample-01 - Simple producer/consumer with dead-letter topic
* sample-02 - Multi-method listener
* sample-03 - Transactions
* sample-04 - Topic based (non-blocking) retry
* sample-05 - Global embedded Kafka testing
* sample-06 - Kafka Streams tests with TopologyTestDriver
2 changes: 1 addition & 1 deletion samples/sample-01/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.1</version>
<version>3.2.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

Expand Down
2 changes: 1 addition & 1 deletion samples/sample-02/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.1</version>
<version>3.2.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

Expand Down
2 changes: 1 addition & 1 deletion samples/sample-03/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.1</version>
<version>3.2.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

Expand Down
2 changes: 1 addition & 1 deletion samples/sample-04/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.1</version>
<version>3.2.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

Expand Down
2 changes: 1 addition & 1 deletion samples/sample-05/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.1</version>
<version>3.2.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

Expand Down
36 changes: 36 additions & 0 deletions samples/sample-06/README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
== Sample 6

This sample demonstrates a simple Kafka Streams topology tested with TopologyTestDriver.

The application contains a simple Kafka Streams topology that counts the keys seen so far in a stateful manner.
The corresponding `TopologyTestDriver` based JUnit test verifies the behavior of the business logic in the Kafka Streams topology.


Console output describe the topology as shown below:

. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.5.2)

2021-06-30 17:38:33.637 INFO 92063 --- [ main] com.example.ApplicationTests : Starting ApplicationTests using Java 11.0.10 on C02FL1KSMD6T with PID 92063 (started by igomez in /Users/igomez/Projects/spring-kafka/samples/sample-05)
2021-06-30 17:38:33.638 INFO 92063 --- [ main] com.example.ApplicationTests : The following profiles are active: test
2021-06-30 17:38:35.027 INFO 92063 --- [ main] com.example.ApplicationTests : Started ApplicationTests in 1.73 seconds (JVM running for 2.833)
2021-06-30 17:38:35.695 INFO 92063 --- [ main] com.example.ApplicationTests : Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-AGGREGATE-0000000002
Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
--> KTABLE-SUPPRESS-0000000003
<-- KSTREAM-SOURCE-0000000000
Processor: KTABLE-SUPPRESS-0000000003 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000004])
--> KTABLE-TOSTREAM-0000000005
<-- KSTREAM-AGGREGATE-0000000002
Processor: KTABLE-TOSTREAM-0000000005 (stores: [])
--> KSTREAM-SINK-0000000006
<-- KTABLE-SUPPRESS-0000000003
Sink: KSTREAM-SINK-0000000006 (topic: output)
<-- KTABLE-TOSTREAM-0000000005
128 changes: 128 additions & 0 deletions samples/sample-06/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.example</groupId>
<artifactId>kafka-sample-06</artifactId>
<version>3.2.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>kafka-sample-06</name>
<description>Kafka Sample 6</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>17</java.version>
</properties>

<dependencies>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>test</scope>
<classifier>test</classifier>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
</repository>
<repository>
<id>rsocket-snapshots</id>
<name>RSocket Snapshots</name>
<url>https://oss.jfrog.org/oss-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/release</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/release</url>
</pluginRepository>
</pluginRepositories>


</project>
41 changes: 41 additions & 0 deletions samples/sample-06/src/main/java/com/example/Application.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafkaStreams;

/**
*
* @author Nacho Munoz
* @since 3.2.0
*/
@EnableKafkaStreams
@SpringBootApplication
public class Application {

private final Logger logger = LoggerFactory.getLogger(Application.class);

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}
64 changes: 64 additions & 0 deletions samples/sample-06/src/main/java/com/example/Topology.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example;

import java.time.Duration;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Suppressed;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;


/**
* A basic topology that counts records by key and materialises the output into a new topic
*
* @author Nacho Munoz
* @author Soby Chacko
* @since 3.2.0
*/
@Configuration
@Component
public class Topology {
private final String inputTopic;

private final String outputTopic;

@Autowired
public Topology(@Value("${input-topic.name}") final String inputTopic,
@Value("${output-topic.name}") final String outputTopic) {
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
}

@Autowired
public void defaultTopology(final StreamsBuilder builder) {
builder.stream(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String()))
.groupByKey()
.count()
.suppress(Suppressed.untilTimeLimit(Duration.ofMillis(5), Suppressed.BufferConfig.unbounded()))
.toStream()
.to(outputTopic);

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
logging.level.root=off
logging.level.com.example=info

bootstrap.servers=
spring.kafka.properties.bootstrap.servers=${bootstrap.servers}
spring.kafka.streams.application-id=Sample-06-Service-Test
4 changes: 4 additions & 0 deletions samples/sample-06/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
logging.level.root=off
logging.level.com.example=info

spring.kafka.streams.application-id=Sample-06-Service
6 changes: 6 additions & 0 deletions samples/sample-06/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
logging:
level.root: info
input-topic:
name: input
output-topic:
name: output
Loading

0 comments on commit e547d0a

Please sign in to comment.