Tutorial ini akan melakukan hands on mengenai cara stream Kafka ke HTML dengan menggunakan metode Server Send Event (SSE). Kelebihan SSE dibandingkan dengan menggunakan Web Socket (ws://) adalah SSE menggunakan protokol http(s) dan satu arah, hanya dari server ke client saja.
Prerequsite tutorial ini adalah:
- Java 8
- Maven 3.6.0
- IDE kesayangan anda (Eclipse, VSCode, Netbeans, InteliJ, dll)
- Git client
- Dasar pemrograman Java dan Spring boot
> wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.4.1/kafka_2.12-2.4.1.tgz
> tar -xzf kafka_2.12-2.4.1.tgz
> cd kafka_2.12-2.4.1
> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties
Langkah diatas dilakukan untuk menjalankan server zookeeper di port 2181 dan server Kafka di port 9092
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
Langkah diatas dilakukan untuk membuat Topic Kafka baru dengan nama mytopic dengan jumlah partisi 1 dengan faktor replikasi sebanyak 1
Download source code dari repository GitHub saya:
> git clone https://github.com/erfinfeluzy/training-spring-sse-kafka.git
> cd training-spring-sse-kafka.git
> mvn spring-boot:run
Struktur code sebagai berikut:\
Buka browser dengan url http://localhost:8080. akan terlihat halaman sbb:\
Note: Untuk browser modern (eg: Chrome, Safari,etc) sudah mensupport untuk Server Sent Event (SSE). Hal ini mungkin tidak berjalan di IE :)
Tambahkan library kafka client pada file pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Snippet berikut pada file KafkaTopicGenerator.java untuk mempublish random data ke Topic dengan nama mytopic.
@Async
@Scheduled(fixedRate = 5000)
public void doNotify() throws IOException {
//randomly generate kafka message to topic:mytopic every 5 seconds
kafkaTemplate.send("mytopic", "Data tanggal : " + new Date () + "; id : " + UUID.randomUUID() );
}
Snippet dibawah untuk subscribe ke topic mytopic.
@KafkaListener(topics = "mytopic", groupId = "consumer-group-id-1")
public void listen(@Payload String message, @Header(KafkaHeaders.OFFSET) String offset) {
//process incoming message from kafka
doNotify("Kafka Offset=" + offset + "; message=" + message);
}
Kemudian data dari topic di teruskan ke SSE emitter.
private void doNotify(String message) {
List<SseEmitter> deadEmitters = new ArrayList<>();
emitters.forEach(emitter -> {
try {
//send message to frontend
emitter
.send(SseEmitter.event()
.data(message));
} catch (Exception e) {
deadEmitters.add(emitter);
}
});
emitters.removeAll(deadEmitters);
}
Snippet berikut untuk melakukan stream data via http dengan menggunakan SSE pada endpoint /stream.
@RestController
public class StreamController {
@Autowired
KafkaConsumer kafkaConsumer;
@GetMapping("/stream")
SseEmitter stream() throws IOException {
final SseEmitter emitter = new SseEmitter();
kafkaConsumer.addEmitter(emitter);
emitter.onCompletion(() -> kafkaConsumer.removeEmitter(emitter));
emitter.onTimeout(() -> kafkaConsumer.removeEmitter(emitter));
return emitter;
}
}
Hasil dapat dicek dengan menggunakan perintah curl
> curl http://localhost:8080/stream
Snippet javascript dibawah digunakan untuk menampilkan SSE event pada halaman html
function initialize() {
const eventSource = new EventSource('http://localhost:8080/stream');
eventSource.onmessage = e => {
const msg = e.data;
document.getElementById("mycontent").innerHTML += "<br/>" + msg;
};
eventSource.onopen = e => console.log('open');
eventSource.onerror = e => {
if (e.readyState == EventSource.CLOSED) {
console.log('close');
}
else {
console.log(e);
}
};
eventSource.addEventListener('second', function(e) {
console.log('second', e.data);
}, false);
}
window.onload = initialize;
Kamu dapat memcoba dengan menggunakan browser pada url berikut http://localhost:8080.
Deploy aplikasi kamu dengan mudah menggunakan fitur Source to Image (s2i) pada OpenShift.
Note: install CodeReady Container (crc) untuk lebih mudah.
> oc login
> oc new-project my-project
> oc new-app redhat-openjdk18-openshift~https://github.com/erfinfeluzy/training-spring-sse-kafka.git
> oc expose svc/training-spring-sse-kafka
> oc get route
hasil dari perintah oc get route barupa url yang dapat dibuka di browser anda.