diff --git a/README.md b/README.md index e322b92f..71c61d8c 100644 --- a/README.md +++ b/README.md @@ -268,7 +268,16 @@ to the client web browser when you're looking for a small subset of messages. F ### 5. Define Views -Views are the last step putting all of the pieces together. Views let you configure a Topic to consume from, configure which Message Formats the Topic uses, and optionally apply any Filters. +Views let you configure a Topic to consume from, configure which Message Formats the Topic uses, and optionally apply any Filters. + +### 6. Define Producers + +Producers let you write messages onto Kafka. With the limitation of one Producer per topic, you add the fully qualified class name of the object that will be represented by the message, and individually add field names that are remembered by the producer for easy message generation. +![Producer Configuration Screenshot](images/webproducer.PNG) + +Upon sending a message, the Producer does no data validation and the consumer is responsible for affirming proper object shape. +![Producer Send Message Screenshot](images/webproducer-sendmessage.PNG) + ## Writing Custom Deserializers diff --git a/images/webproducer-sendmessage.PNG b/images/webproducer-sendmessage.PNG new file mode 100644 index 00000000..2896a104 Binary files /dev/null and b/images/webproducer-sendmessage.PNG differ diff --git a/images/webproducer.PNG b/images/webproducer.PNG new file mode 100644 index 00000000..c3aa776c Binary files /dev/null and b/images/webproducer.PNG differ diff --git a/kafka-webview-ui/pom.xml b/kafka-webview-ui/pom.xml index 27c35a7e..c8fc3219 100644 --- a/kafka-webview-ui/pom.xml +++ b/kafka-webview-ui/pom.xml @@ -9,7 +9,7 @@ 4.0.0 kafka-webview-ui - 2.5.0 + 2.7.1 Kafka WebView UI @@ -259,6 +259,13 @@ 0.7 test + + + + org.springframework.kafka + spring-kafka + + diff --git a/kafka-webview-ui/src/main/frontend/js/app.js b/kafka-webview-ui/src/main/frontend/js/app.js index 89ab70f7..e98bc344 100755 --- a/kafka-webview-ui/src/main/frontend/js/app.js +++ b/kafka-webview-ui/src/main/frontend/js/app.js @@ -364,6 +364,20 @@ var ApiClient = { } }); }, + submitKafkaMessage: function(producerId, messageRequest, callback){ + jQuery.ajax({ + type: 'POST', + url: ApiClient.buildUrl('api/producer/' + producerId + '/send-message'), + data: messageRequest, + dataType: 'json', + headers: ApiClient.getCsrfHeader(), + success: callback, + error: ApiClient.defaultErrorHandler, + beforeSend: function(xhr) { + xhr.setRequestHeader('Content-type', 'application/json; charset=utf-8'); + } + }); + }, defaultErrorHandler: function(jqXHR, textStatus, errorThrown) { // convert response to json var response = jQuery.parseJSON(jqXHR.responseText); diff --git a/kafka-webview-ui/src/main/frontend/package-lock.json b/kafka-webview-ui/src/main/frontend/package-lock.json index 50f7cc65..9eeca041 100644 --- a/kafka-webview-ui/src/main/frontend/package-lock.json +++ b/kafka-webview-ui/src/main/frontend/package-lock.json @@ -969,7 +969,6 @@ "anymatch": "2.0.0", "async-each": "1.0.1", "braces": "2.3.2", - "fsevents": "1.2.4", "glob-parent": "3.1.0", "inherits": "2.0.3", "is-binary-path": "1.0.1", @@ -2113,542 +2112,6 @@ "integrity": "sha1-FQStJSMVjKpA20onh8sBQRmU6k8=", "dev": true }, - "fsevents": { - "version": "1.2.4", - "resolved": "https://registry.npmjs.org/fsevents/-/fsevents-1.2.4.tgz", - "integrity": "sha512-z8H8/diyk76B7q5wg+Ud0+CqzcAF3mBBI/bA5ne5zrRUUIvNkJY//D3BqyH571KuAC4Nr7Rw7CjWX4r0y9DvNg==", - "dev": true, - "optional": true, - "requires": { - "nan": "2.11.1", - "node-pre-gyp": "0.10.0" - }, - "dependencies": { - "abbrev": { - "version": "1.1.1", - "bundled": true, - "dev": true, - "optional": true - }, - "ansi-regex": { - "version": "2.1.1", - "bundled": true, - "dev": true - }, - "aproba": { - "version": "1.2.0", - "bundled": true, - "dev": true, - "optional": true - }, - "are-we-there-yet": { - "version": "1.1.4", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "delegates": "1.0.0", - "readable-stream": "2.3.6" - } - }, - "balanced-match": { - "version": "1.0.0", - "bundled": true, - "dev": true - }, - "brace-expansion": { - "version": "1.1.11", - "bundled": true, - "dev": true, - "requires": { - "balanced-match": "1.0.0", - "concat-map": "0.0.1" - } - }, - "chownr": { - "version": "1.0.1", - "bundled": true, - "dev": true, - "optional": true - }, - "code-point-at": { - "version": "1.1.0", - "bundled": true, - "dev": true - }, - "concat-map": { - "version": "0.0.1", - "bundled": true, - "dev": true - }, - "console-control-strings": { - "version": "1.1.0", - "bundled": true, - "dev": true - }, - "core-util-is": { - "version": "1.0.2", - "bundled": true, - "dev": true, - "optional": true - }, - "debug": { - "version": "2.6.9", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "ms": "2.0.0" - } - }, - "deep-extend": { - "version": "0.5.1", - "bundled": true, - "dev": true, - "optional": true - }, - "delegates": { - "version": "1.0.0", - "bundled": true, - "dev": true, - "optional": true - }, - "detect-libc": { - "version": "1.0.3", - "bundled": true, - "dev": true, - "optional": true - }, - "fs-minipass": { - "version": "1.2.5", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "minipass": "2.2.4" - } - }, - "fs.realpath": { - "version": "1.0.0", - "bundled": true, - "dev": true, - "optional": true - }, - "gauge": { - "version": "2.7.4", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "aproba": "1.2.0", - "console-control-strings": "1.1.0", - "has-unicode": "2.0.1", - "object-assign": "4.1.1", - "signal-exit": "3.0.2", - "string-width": "1.0.2", - "strip-ansi": "3.0.1", - "wide-align": "1.1.2" - } - }, - "glob": { - "version": "7.1.2", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "fs.realpath": "1.0.0", - "inflight": "1.0.6", - "inherits": "2.0.3", - "minimatch": "3.0.4", - "once": "1.4.0", - "path-is-absolute": "1.0.1" - } - }, - "has-unicode": { - "version": "2.0.1", - "bundled": true, - "dev": true, - "optional": true - }, - "iconv-lite": { - "version": "0.4.21", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "safer-buffer": "2.1.2" - } - }, - "ignore-walk": { - "version": "3.0.1", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "minimatch": "3.0.4" - } - }, - "inflight": { - "version": "1.0.6", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "once": "1.4.0", - "wrappy": "1.0.2" - } - }, - "inherits": { - "version": "2.0.3", - "bundled": true, - "dev": true - }, - "ini": { - "version": "1.3.5", - "bundled": true, - "dev": true, - "optional": true - }, - "is-fullwidth-code-point": { - "version": "1.0.0", - "bundled": true, - "dev": true, - "requires": { - "number-is-nan": "1.0.1" - } - }, - "isarray": { - "version": "1.0.0", - "bundled": true, - "dev": true, - "optional": true - }, - "minimatch": { - "version": "3.0.4", - "bundled": true, - "dev": true, - "requires": { - "brace-expansion": "1.1.11" - } - }, - "minimist": { - "version": "0.0.8", - "bundled": true, - "dev": true - }, - "minipass": { - "version": "2.2.4", - "bundled": true, - "dev": true, - "requires": { - "safe-buffer": "5.1.1", - "yallist": "3.0.2" - } - }, - "minizlib": { - "version": "1.1.0", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "minipass": "2.2.4" - } - }, - "mkdirp": { - "version": "0.5.1", - "bundled": true, - "dev": true, - "requires": { - "minimist": "0.0.8" - } - }, - "ms": { - "version": "2.0.0", - "bundled": true, - "dev": true, - "optional": true - }, - "nan": { - "version": "2.11.1", - "resolved": "https://registry.npmjs.org/nan/-/nan-2.11.1.tgz", - "integrity": "sha512-iji6k87OSXa0CcrLl9z+ZiYSuR2o+c0bGuNmXdrhTQTakxytAFsC56SArGYoiHlJlFoHSnvmhpceZJaXkVuOtA==", - "dev": true, - "optional": true - }, - "needle": { - "version": "2.2.0", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "debug": "2.6.9", - "iconv-lite": "0.4.21", - "sax": "1.2.4" - } - }, - "node-pre-gyp": { - "version": "0.10.0", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "detect-libc": "1.0.3", - "mkdirp": "0.5.1", - "needle": "2.2.0", - "nopt": "4.0.1", - "npm-packlist": "1.1.10", - "npmlog": "4.1.2", - "rc": "1.2.7", - "rimraf": "2.6.2", - "semver": "5.5.0", - "tar": "4.4.1" - } - }, - "nopt": { - "version": "4.0.1", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "abbrev": "1.1.1", - "osenv": "0.1.5" - } - }, - "npm-bundled": { - "version": "1.0.3", - "bundled": true, - "dev": true, - "optional": true - }, - "npm-packlist": { - "version": "1.1.10", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "ignore-walk": "3.0.1", - "npm-bundled": "1.0.3" - } - }, - "npmlog": { - "version": "4.1.2", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "are-we-there-yet": "1.1.4", - "console-control-strings": "1.1.0", - "gauge": "2.7.4", - "set-blocking": "2.0.0" - } - }, - "number-is-nan": { - "version": "1.0.1", - "bundled": true, - "dev": true - }, - "object-assign": { - "version": "4.1.1", - "bundled": true, - "dev": true, - "optional": true - }, - "once": { - "version": "1.4.0", - "bundled": true, - "dev": true, - "requires": { - "wrappy": "1.0.2" - } - }, - "os-homedir": { - "version": "1.0.2", - "bundled": true, - "dev": true, - "optional": true - }, - "os-tmpdir": { - "version": "1.0.2", - "bundled": true, - "dev": true, - "optional": true - }, - "osenv": { - "version": "0.1.5", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "os-homedir": "1.0.2", - "os-tmpdir": "1.0.2" - } - }, - "path-is-absolute": { - "version": "1.0.1", - "bundled": true, - "dev": true, - "optional": true - }, - "process-nextick-args": { - "version": "2.0.0", - "bundled": true, - "dev": true, - "optional": true - }, - "rc": { - "version": "1.2.7", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "deep-extend": "0.5.1", - "ini": "1.3.5", - "minimist": "1.2.0", - "strip-json-comments": "2.0.1" - }, - "dependencies": { - "minimist": { - "version": "1.2.0", - "bundled": true, - "dev": true, - "optional": true - } - } - }, - "readable-stream": { - "version": "2.3.6", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "core-util-is": "1.0.2", - "inherits": "2.0.3", - "isarray": "1.0.0", - "process-nextick-args": "2.0.0", - "safe-buffer": "5.1.1", - "string_decoder": "1.1.1", - "util-deprecate": "1.0.2" - } - }, - "rimraf": { - "version": "2.6.2", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "glob": "7.1.2" - } - }, - "safe-buffer": { - "version": "5.1.1", - "bundled": true, - "dev": true - }, - "safer-buffer": { - "version": "2.1.2", - "bundled": true, - "dev": true, - "optional": true - }, - "sax": { - "version": "1.2.4", - "bundled": true, - "dev": true, - "optional": true - }, - "semver": { - "version": "5.5.0", - "bundled": true, - "dev": true, - "optional": true - }, - "set-blocking": { - "version": "2.0.0", - "bundled": true, - "dev": true, - "optional": true - }, - "signal-exit": { - "version": "3.0.2", - "bundled": true, - "dev": true, - "optional": true - }, - "string-width": { - "version": "1.0.2", - "bundled": true, - "dev": true, - "requires": { - "code-point-at": "1.1.0", - "is-fullwidth-code-point": "1.0.0", - "strip-ansi": "3.0.1" - } - }, - "string_decoder": { - "version": "1.1.1", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "safe-buffer": "5.1.1" - } - }, - "strip-ansi": { - "version": "3.0.1", - "bundled": true, - "dev": true, - "requires": { - "ansi-regex": "2.1.1" - } - }, - "strip-json-comments": { - "version": "2.0.1", - "bundled": true, - "dev": true, - "optional": true - }, - "tar": { - "version": "4.4.1", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "chownr": "1.0.1", - "fs-minipass": "1.2.5", - "minipass": "2.2.4", - "minizlib": "1.1.0", - "mkdirp": "0.5.1", - "safe-buffer": "5.1.1", - "yallist": "3.0.2" - } - }, - "util-deprecate": { - "version": "1.0.2", - "bundled": true, - "dev": true, - "optional": true - }, - "wide-align": { - "version": "1.1.2", - "bundled": true, - "dev": true, - "optional": true, - "requires": { - "string-width": "1.0.2" - } - }, - "wrappy": { - "version": "1.0.2", - "bundled": true, - "dev": true - }, - "yallist": { - "version": "3.0.2", - "bundled": true, - "dev": true - } - } - }, "fstream": { "version": "1.0.11", "resolved": "https://registry.npmjs.org/fstream/-/fstream-1.0.11.tgz", diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/BaseController.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/BaseController.java index d087f2f7..d02e3c58 100644 --- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/BaseController.java +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/BaseController.java @@ -27,8 +27,10 @@ import org.sourcelab.kafka.webview.ui.configuration.AppProperties; import org.sourcelab.kafka.webview.ui.manager.user.CustomUserDetails; import org.sourcelab.kafka.webview.ui.model.Cluster; +import org.sourcelab.kafka.webview.ui.model.Producer; import org.sourcelab.kafka.webview.ui.model.View; import org.sourcelab.kafka.webview.ui.repository.ClusterRepository; +import org.sourcelab.kafka.webview.ui.repository.ProducerRepository; import org.sourcelab.kafka.webview.ui.repository.ViewRepository; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.authentication.AnonymousAuthenticationToken; @@ -52,6 +54,9 @@ public abstract class BaseController { @Autowired private ViewRepository viewRepository; + @Autowired + private ProducerRepository producerRepository; + @Autowired private AppProperties appProperties; @@ -108,10 +113,12 @@ public void addAttributes(Model model) { // TODO put a limit on these final Iterable clusters = clusterRepository.findAllByOrderByNameAsc(); final Iterable views = viewRepository.findAllByOrderByNameAsc(); + final Iterable producers = producerRepository.findAllByOrderByNameAsc(); model.addAttribute("MenuClusters", clusters); model.addAttribute("MenuViews", views); model.addAttribute("UserId", getLoggedInUserId()); + model.addAttribute( "MenuProducers", producers ); if (!appProperties.isUserAuthEnabled() || appProperties.getLdapProperties().isEnabled()) { model.addAttribute("MenuShowUserConfig", false); diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/ApiController.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/ApiController.java index 1c2f4335..405e6c43 100644 --- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/ApiController.java +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/ApiController.java @@ -24,20 +24,22 @@ package org.sourcelab.kafka.webview.ui.controller.api; +import org.apache.kafka.clients.producer.ProducerRecord; import org.sourcelab.kafka.webview.ui.controller.BaseController; import org.sourcelab.kafka.webview.ui.controller.api.exceptions.ApiException; import org.sourcelab.kafka.webview.ui.controller.api.exceptions.NotFoundApiException; -import org.sourcelab.kafka.webview.ui.controller.api.requests.ConsumeRequest; -import org.sourcelab.kafka.webview.ui.controller.api.requests.ConsumerRemoveRequest; import org.sourcelab.kafka.webview.ui.controller.api.requests.CreateTopicRequest; -import org.sourcelab.kafka.webview.ui.controller.api.requests.DeleteTopicRequest; import org.sourcelab.kafka.webview.ui.controller.api.requests.ModifyTopicConfigRequest; +import org.sourcelab.kafka.webview.ui.controller.api.requests.SendMessageRequest; +import org.sourcelab.kafka.webview.ui.controller.api.requests.DeleteTopicRequest; +import org.sourcelab.kafka.webview.ui.controller.api.requests.ConsumerRemoveRequest; +import org.sourcelab.kafka.webview.ui.controller.api.requests.ConsumeRequest; import org.sourcelab.kafka.webview.ui.controller.api.responses.ResultResponse; -import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaOperations; -import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaOperationsFactory; -import org.sourcelab.kafka.webview.ui.manager.kafka.SessionIdentifier; import org.sourcelab.kafka.webview.ui.manager.kafka.ViewCustomizer; +import org.sourcelab.kafka.webview.ui.manager.kafka.SessionIdentifier; import org.sourcelab.kafka.webview.ui.manager.kafka.WebKafkaConsumer; +import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaOperations; +import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaOperationsFactory; import org.sourcelab.kafka.webview.ui.manager.kafka.WebKafkaConsumerFactory; import org.sourcelab.kafka.webview.ui.manager.kafka.config.FilterDefinition; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ApiErrorResponse; @@ -55,40 +57,55 @@ import org.sourcelab.kafka.webview.ui.manager.kafka.dto.TopicDetails; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.TopicList; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.TopicListing; -import org.sourcelab.kafka.webview.ui.model.Cluster; -import org.sourcelab.kafka.webview.ui.model.Filter; -import org.sourcelab.kafka.webview.ui.model.View; -import org.sourcelab.kafka.webview.ui.repository.ClusterRepository; -import org.sourcelab.kafka.webview.ui.repository.FilterRepository; -import org.sourcelab.kafka.webview.ui.repository.ViewRepository; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; -import org.springframework.web.bind.annotation.ExceptionHandler; -import org.springframework.web.bind.annotation.ModelAttribute; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.util.concurrent.ListenableFuture; + + import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.ModelAttribute; + +import org.sourcelab.kafka.webview.ui.repository.ViewRepository; +import org.sourcelab.kafka.webview.ui.repository.ClusterRepository; +import org.sourcelab.kafka.webview.ui.repository.FilterRepository; +import org.sourcelab.kafka.webview.ui.repository.ProducerRepository; + +import org.sourcelab.kafka.webview.ui.model.View; +import org.sourcelab.kafka.webview.ui.model.Producer; +import org.sourcelab.kafka.webview.ui.model.Cluster; +import org.sourcelab.kafka.webview.ui.model.Filter; + +import java.util.UUID; +import java.util.Comparator; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Optional; import java.util.Collection; -import java.util.Comparator; +import java.util.Set; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.concurrent.ExecutionException; /** * Handles API requests. */ @Controller -@RequestMapping("/api") +@RequestMapping( "/api") public class ApiController extends BaseController { @Autowired private ViewRepository viewRepository; @@ -99,6 +116,9 @@ public class ApiController extends BaseController { @Autowired private FilterRepository filterRepository; + @Autowired + private ProducerRepository producerRepository; + @Autowired private WebKafkaConsumerFactory webKafkaConsumerFactory; @@ -603,6 +623,41 @@ public boolean removeConsumer( } } + /** + * POST put a message on kafka bus. + */ + @ResponseBody + @PostMapping(path = "/producer/{id}/send-message", produces = "application/json") + public void sendMessage(@PathVariable final Long id, @RequestBody final SendMessageRequest request ) { + + final Producer producer = producerRepository + .findById( id ) + .orElseThrow( () -> new NotFoundApiException( "Producer", "Unable to find producer" ) ); + + final Cluster cluster = clusterRepository + .findById( producer.getCluster().getId() ) + .orElseThrow( () -> new NotFoundApiException( "Producer", "Unable to find cluster" ) ); + + + Map producerFactoryConfigs = new HashMap<>(); + producerFactoryConfigs.put( "bootstrap.servers", cluster.getBrokerHosts() ); + producerFactoryConfigs.put( "key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerFactoryConfigs.put( "value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + KafkaTemplate template = new KafkaTemplate<>( new DefaultKafkaProducerFactory<>( producerFactoryConfigs ) ); + + ProducerRecord record = + new ProducerRecord<>( producer.getTopic(), UUID.randomUUID().toString(), request.getMessageAsJson() ); + + ListenableFuture> future = template.send( record ); + + try { + future.get(); // ensure it was successful. will throw exception otherwise + } catch ( ExecutionException | InterruptedException e ) { + //pfffft????? + } + } + /** * Error handler for ApiExceptions. */ diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/requests/SendMessageRequest.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/requests/SendMessageRequest.java new file mode 100644 index 00000000..7414ede4 --- /dev/null +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/requests/SendMessageRequest.java @@ -0,0 +1,62 @@ +/** + * MIT License + * + * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/) + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.sourcelab.kafka.webview.ui.controller.api.requests; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.Map; + +/** + * An API request body to put a message on the Kafka Bus. + */ +public class SendMessageRequest { + + private Map messageMap; + + public Map getMessageMap() { + return messageMap; + } + + public void setMessageMap( Map messageMap ) { + this.messageMap = messageMap; + } + + /** + * Get the message map as a JSON representation string. + * @return a JSON representation of the message map + */ + public String getMessageAsJson() { + String result = ""; + ObjectMapper mapper = new ObjectMapper(); + try { + result = mapper.writeValueAsString( messageMap ); + } catch ( JsonProcessingException e ) { + //lol oops + } + + return result; + } +} diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/configuration/producer/ProducerConfigController.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/configuration/producer/ProducerConfigController.java new file mode 100644 index 00000000..d93dbd77 --- /dev/null +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/configuration/producer/ProducerConfigController.java @@ -0,0 +1,299 @@ +package org.sourcelab.kafka.webview.ui.controller.configuration.producer; + +/** + * MIT License + * + * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/) + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +import org.sourcelab.kafka.webview.ui.controller.BaseController; +import org.sourcelab.kafka.webview.ui.controller.configuration.producer.forms.ProducerForm; +import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaOperations; +import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaOperationsFactory; +import org.sourcelab.kafka.webview.ui.manager.kafka.dto.TopicDetails; +import org.sourcelab.kafka.webview.ui.manager.kafka.dto.TopicList; +import org.sourcelab.kafka.webview.ui.manager.ui.BreadCrumbManager; +import org.sourcelab.kafka.webview.ui.manager.ui.FlashMessage; +import org.sourcelab.kafka.webview.ui.model.Cluster; +import org.sourcelab.kafka.webview.ui.model.Producer; +import org.sourcelab.kafka.webview.ui.model.ProducerMessage; +import org.sourcelab.kafka.webview.ui.repository.ClusterRepository; +import org.sourcelab.kafka.webview.ui.repository.MessageFormatRepository; +import org.sourcelab.kafka.webview.ui.repository.ProducerMessageRepository; +import org.sourcelab.kafka.webview.ui.repository.ProducerRepository; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Controller; +import org.springframework.ui.Model; +import org.springframework.validation.BindingResult; +import org.springframework.validation.FieldError; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.servlet.mvc.support.RedirectAttributes; + +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PathVariable; + + +import javax.validation.Valid; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Optional; + +/** + * Controller for CRUD over Producer entities. + */ +@Controller +@RequestMapping( "/configuration/producer") +public class ProducerConfigController extends BaseController { + @Autowired + private ClusterRepository clusterRepository; + + @Autowired + private MessageFormatRepository messageFormatRepository; + + @Autowired + private ProducerRepository producerRepository; + + @Autowired + private ProducerMessageRepository producerMessageRepository; + + @Autowired + private KafkaOperationsFactory kafkaOperationsFactory; + + /** + * Initialize the index page for Configuring Producers. + */ + @GetMapping + public String index(final Model model) { + // Setup breadcrumbs + setupBreadCrumbs(model, null, null); + + // Retrieve all message formats + final Iterable producerList = producerRepository.findAllByOrderByNameAsc(); + model.addAttribute("producers", producerList); + + return "configuration/producer/index"; + } + + /** + * Get the Create Producer page. + */ + @GetMapping( "/create") + public String createProducerForm( final ProducerForm producerForm, final Model model) { + // Setup breadcrubs + if (!model.containsAttribute( "BreadCrumbs" )) { + setupBreadCrumbs( model, "Create", null ); + } + + // Retrieve all clusters + model.addAttribute("clusters", clusterRepository.findAllByOrderByNameAsc()); + + // Retrieve all message formats + model.addAttribute("defaultMessageFormats", messageFormatRepository.findByIsDefaultOrderByNameAsc(true)); + model.addAttribute("customMessageFormats", messageFormatRepository.findByIsDefaultOrderByNameAsc(false)); + + model.addAttribute("topics", new ArrayList<>()); + + if ( producerForm.hasPropertyList() ) { + model.addAttribute( "producerMessagePropertyNames", producerForm.getPropertyNameListAsArray() ); + } + + if ( producerForm.getClusterId() != null ) { + clusterRepository.findById( producerForm.getClusterId() ).ifPresent( (cluster) -> { + try (final KafkaOperations operations = kafkaOperationsFactory.create(cluster, getLoggedInUserId())) { + final TopicList topics = operations.getAvailableTopics(); + model.addAttribute("topics", topics.getTopics()); + + // If we have a selected topic + if (producerForm.getTopic() != null && !"!".equals(producerForm.getTopic())) { + final TopicDetails topicDetails = operations.getTopicDetails(producerForm.getTopic()); + model.addAttribute("partitions", topicDetails.getPartitions()); + } + } + } ); + } + + return "configuration/producer/create"; + } + + /** + * Edit a Producer by Id. + */ + @GetMapping("/edit/{id}") + public String editProducer(@PathVariable final Long id, final ProducerForm producerForm, + final RedirectAttributes redirectAttributes, + final Model model) { + final Optional producerOptional = producerRepository.findById( id ); + if (!producerOptional.isPresent()) { + // Set flash message + redirectAttributes.addFlashAttribute("FlashMessage", FlashMessage.newWarning("Unable to find producer!")); + + // redirect to producer index + return "redirect:/configuration/producer"; + } + final Producer producer = producerOptional.get(); + + setupBreadCrumbs( model, "Edit: " + producer.getName(), null ); + + producerForm.setId( producer.getId() ); + producerForm.setClusterId( producer.getCluster().getId() ); + producerForm.setName( producer.getName() ); + producerForm.setProducerMessageClassName( producer.getProducerMessage().getQualifiedClassName() ); + producerForm.setProducerMessagePropertyNameList( producer.getProducerMessage().getPropertyNameList() ); + + return createProducerForm( producerForm, model ); + } + + /** + * Delete a Producer by Id. + */ + @PostMapping( path = "/delete/{id}" ) + public String deleteProducer(@PathVariable final Long id, final RedirectAttributes redirectAttributes) { + // Retrieve it + if (!producerRepository.existsById(id)) { + // Set flash message & redirect + redirectAttributes.addFlashAttribute("FlashMessage", FlashMessage.newWarning("Unable to find producer!")); + } else { + // Delete it + producerRepository.deleteById(id); + + redirectAttributes.addFlashAttribute("FlashMessage", FlashMessage.newSuccess("Deleted producer!")); + } + + // redirect to cluster index + return "redirect:/configuration/producer"; + } + + /** + * Update a producer. + */ + @PostMapping(path = "/update") + public String updateProducer( + @Valid final ProducerForm producerForm, + final BindingResult bindingResult, + final RedirectAttributes redirectAttributes, + final Model model) { + // Determine if we're updating or creating + final boolean updateExisting = producerForm.exists(); + + // Ensure that producer name is not already used. + final Producer existingProducer = producerRepository.findByName(producerForm.getName()); + if (existingProducer != null) { + // If we're updating, exclude our own id. + if (!updateExisting + || !producerForm.getId().equals(existingProducer.getId())) { + bindingResult.addError(new FieldError( + "producerForm", "name", producerForm.getName(), true, null, null, "Name is already used") + ); + } + } + + // If we have errors + if (bindingResult.hasErrors()) { + return createProducerForm(producerForm, model); + } + + // If we're updating + final Producer producer; + final ProducerMessage producerMessage; + final String successMessage; + if (updateExisting) { + // Retrieve producer + final Optional producerOptional = producerRepository.findById(producerForm.getId()); + if (!producerOptional.isPresent()) { + // Set flash message and redirect + redirectAttributes.addFlashAttribute("FlashMessage", FlashMessage.newWarning("Unable to find producer!")); + + // redirect to producer index + return "redirect:/configuration/producer"; + } + producer = producerOptional.get(); + + //Retrieve producer message + final Optional producerMessageOptional = producerMessageRepository.findByProducer( producer ); + if (!producerMessageOptional.isPresent()) { + // Set flash message and redirect + redirectAttributes.addFlashAttribute("FlashMessage", FlashMessage.newWarning("Unable to find producer's message!")); + + //redirect to producer index + return "redirect:/configuration/producer"; + } + producerMessage = producerMessageOptional.get(); + + successMessage = "Updated producer successfully!"; + } else { + producer = new Producer(); + producerMessage = new ProducerMessage(); + producer.setCreatedAt(new Timestamp(System.currentTimeMillis())); + producerMessage.setCreatedAt( new Timestamp( System.currentTimeMillis() ) ); + successMessage = "Created new producer!"; + } + + // Update properties +// TODO uncomment when we want to send more than a map of string/string as a kafka message +// final MessageFormat keyMessageFormat = messageFormatRepository.findById(producerForm.getKeyMessageFormatId()).get(); +// final MessageFormat valueMessageFormat = messageFormatRepository.findById(producerForm.getValueMessageFormatId()).get(); + final Cluster cluster = clusterRepository.findById(producerForm.getClusterId()).get(); + + producer.setName( producerForm.getName() ); + producer.setTopic( producerForm.getTopic() ); +// TODO uncomment when we want to send more than a map of string/string as a kafka message +// producer.setKeyMessageFormat( keyMessageFormat ); +// producer.setValueMessageFormat( valueMessageFormat ); + producer.setCluster( cluster ); + + producerMessage.setName( producer.getName() + "Message" ); + producerMessage.setQualifiedClassName( producerForm.getProducerMessageClassName() ); + producerMessage.setProducer( producer ); + producerMessage.setPropertyNameList( producerForm.getProducerMessagePropertyNameList() ); + + // Persist the producer + producer.setUpdatedAt(new Timestamp(System.currentTimeMillis())); + producerRepository.save(producer); + + // Persist the producer's message + producerMessage.setUpdatedAt( new Timestamp( System.currentTimeMillis() ) ); + producerMessageRepository.save( producerMessage ); + + + // Set flash message + redirectAttributes.addFlashAttribute("FlashMessage", FlashMessage.newSuccess(successMessage)); + + // redirect to cluster index + return "redirect:/configuration/producer"; + + } + + private void setupBreadCrumbs(final Model model, String name, String url) { + // Setup breadcrumbs + final BreadCrumbManager manager = new BreadCrumbManager(model) + .addCrumb("Configuration", "/configuration"); + + if (name != null) { + manager.addCrumb("Producers", "/configuration/producer"); + manager.addCrumb(name, url); + } else { + manager.addCrumb("Producers", null); + } + } + +} diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/configuration/producer/forms/ProducerForm.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/configuration/producer/forms/ProducerForm.java new file mode 100644 index 00000000..58676765 --- /dev/null +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/configuration/producer/forms/ProducerForm.java @@ -0,0 +1,153 @@ +package org.sourcelab.kafka.webview.ui.controller.configuration.producer.forms; + +/** + * MIT License + * + * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/) + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +/** + * The form for creating/updating a Producer. + */ +public class ProducerForm { + private Long id = null; + + @NotNull( message = "Enter a unique name") + @Size( min = 2, max = 255) + private String name; + + @NotNull(message = "Select a cluster") + private Long clusterId; +// TODO uncomment when we want to send more than a map of string/string as a kafka message +// @NotNull(message = "Select a message format") +// private Long keyMessageFormatId; +// +// @NotNull(message = "Select a message format") +// private Long valueMessageFormatId; + + @NotNull(message = "Select a topic") + @Size(min = 1, max = 255) + private String topic; + + @NotNull(message = "A producer must have property names to send a message") + private String producerMessagePropertyNameList; + + @NotNull(message = "A producer message must reference an existing class in the platform") + private String producerMessageClassName; + + public Long getId() { + return id; + } + + public void setId( Long id ) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName( String name ) { + this.name = name; + } + + public Long getClusterId() { + return clusterId; + } + + public void setClusterId( Long clusterId ) { + this.clusterId = clusterId; + } +// TODO uncomment when we want to send more than a map of string/string as a kafka message + +// public Long getKeyMessageFormatId() +// { +// return keyMessageFormatId; +// } +// +// public void setKeyMessageFormatId( Long keyMessageFormatId ) +// { +// this.keyMessageFormatId = keyMessageFormatId; +// } +// +// public Long getValueMessageFormatId() +// { +// return valueMessageFormatId; +// } +// +// public void setValueMessageFormatId( Long valueMessageFormatId ) +// { +// this.valueMessageFormatId = valueMessageFormatId; +// } + + public String getTopic() { + return topic; + } + + public void setTopic( String topic ) { + this.topic = topic; + } + + public String getProducerMessagePropertyNameList() { + return producerMessagePropertyNameList; + } + + public void setProducerMessagePropertyNameList( String producerMessagePropertyNameList ) { + this.producerMessagePropertyNameList = producerMessagePropertyNameList; + } + + public String getProducerMessageClassName() { + return producerMessageClassName; + } + + public void setProducerMessageClassName( String producerMessageClassName ) { + this.producerMessageClassName = producerMessageClassName; + } + + public boolean exists() { + return getId() != null; + } + + public boolean hasPropertyList() { + return producerMessagePropertyNameList != null; + } + + public String[] getPropertyNameListAsArray() { + return producerMessagePropertyNameList.split( "," ); + } + + @Override + public String toString() { + return "ProductForm{id=" + id + + ",name='" + name + '\'' + + ",clusterId=" + clusterId +// TODO uncomment when we want to send more than a map of string/string as a kafka message +// ",keyMessageFormatId=" + keyMessageFormatId + +// ",valueMessageFormatId=" + valueMessageFormatId + + + ",topic='" + topic + '\'' + + ",producerMessageKeys='" + producerMessagePropertyNameList + '\'' + + ",producerMessageClassName='" + producerMessageClassName + '\'' + + '}'; + } +} diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/producer/ProducerController.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/producer/ProducerController.java new file mode 100644 index 00000000..8a3c5944 --- /dev/null +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/producer/ProducerController.java @@ -0,0 +1,154 @@ +package org.sourcelab.kafka.webview.ui.controller.producer; + +/** + * MIT License + * + * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/) + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +import org.sourcelab.kafka.webview.ui.manager.ui.BreadCrumbManager; +import org.sourcelab.kafka.webview.ui.manager.ui.FlashMessage; +import org.sourcelab.kafka.webview.ui.model.Cluster; +import org.sourcelab.kafka.webview.ui.model.Producer; +import org.sourcelab.kafka.webview.ui.model.ProducerMessage; +import org.sourcelab.kafka.webview.ui.repository.ClusterRepository; +import org.sourcelab.kafka.webview.ui.repository.ProducerMessageRepository; +import org.sourcelab.kafka.webview.ui.repository.ProducerRepository; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Controller; +import org.springframework.ui.Model; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.servlet.mvc.support.RedirectAttributes; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Controller for Producer Operations. + */ +@Controller +@RequestMapping("/producer") +public class ProducerController { + @Autowired + private ProducerRepository producerRepository; + + @Autowired + private ClusterRepository clusterRepository; + + @Autowired + private ProducerMessageRepository producerMessageRepository; + + /** + * GET producers index. + */ + @GetMapping + public String index( + final Model model, + @RequestParam( name = "clusterId", required = false) final Long clusterId + ) { + // Setup breadcrumbs + final BreadCrumbManager breadCrumbManager = new BreadCrumbManager(model); + + // Retrieve all clusters and index by id + final Map clustersById = new HashMap<>(); + clusterRepository + .findAllByOrderByNameAsc() + .forEach((cluster) -> clustersById.put(cluster.getId(), cluster)); + + final Iterable producers; + if (clusterId == null) { + // Retrieve all views order by name asc. + producers = producerRepository.findAllByOrderByNameAsc(); + } else { + // Retrieve only views for the cluster + producers = producerRepository.findAllByClusterIdOrderByNameAsc(clusterId); + } + + // Set model Attributes + model.addAttribute("producerList", producers); + model.addAttribute("clustersById", clustersById); + + final String clusterName; + if (clusterId != null && clustersById.containsKey(clusterId)) { + // If filtered by a cluster + clusterName = clustersById.get(clusterId).getName(); + + // Add top level breadcrumb + breadCrumbManager + .addCrumb("Producer", "/producer") + .addCrumb("Cluster: " + clusterName); + } else { + // If showing all views + clusterName = null; + + // Add top level breadcrumb + breadCrumbManager.addCrumb("Producer", null); + } + model.addAttribute("clusterName", clusterName); + + return "producer/index"; + } + + + /** + * GET Displays producer for specified id. + */ + @GetMapping( path = "/{id}") + public String produce( + @PathVariable final Long id, + final RedirectAttributes redirectAttributes, + final Model model) { + + // Retrieve the producer + final Optional producerOptional = producerRepository.findById(id); + if (!producerOptional.isPresent()) { + // Set flash message + redirectAttributes.addFlashAttribute("FlashMessage", FlashMessage.newWarning("Unable to find producer!")); + + // redirect to home + return "redirect:/"; + } + final Producer producer = producerOptional.get(); + + final Optional producerMessageOptional = producerMessageRepository.findByProducer( producer ); + if (!producerMessageOptional.isPresent()) { + // yeah, I don't know. This shouldn't be possible + return "redirect:/"; + } + final ProducerMessage producerMessage = producerMessageOptional.get(); + + // Setup breadcrumbs + new BreadCrumbManager(model) + .addCrumb("Producer", "/producer") + .addCrumb(producer.getName()); + + // Set model Attributes + model.addAttribute("producer", producer); + model.addAttribute("cluster", producer.getCluster()); + model.addAttribute( "producerMessage", producerMessage ); + + return "producer/produce"; + } +} diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/model/Producer.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/model/Producer.java new file mode 100644 index 00000000..50e3764a --- /dev/null +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/model/Producer.java @@ -0,0 +1,153 @@ +package org.sourcelab.kafka.webview.ui.model; + +/** + * MIT License + * + * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/) + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +import org.hibernate.annotations.Cascade; + +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Column; +import javax.persistence.ManyToOne; +import javax.persistence.FetchType; +import javax.persistence.JoinColumn; +import javax.persistence.OneToOne; + +import java.sql.Timestamp; + +/** + * Producer Class. + */ +@Entity +public class Producer { + @Id + @GeneratedValue( strategy = GenerationType.IDENTITY) + private Long id; + + @Column( nullable = false, unique = true) + private String name; + + @ManyToOne(fetch = FetchType.LAZY) + private Cluster cluster; + + //TODO uncomment when we want to send more than a map of string/string as a kafka message +// @ManyToOne(fetch = FetchType.LAZY) +// private MessageFormat keyMessageFormat; +// +// @ManyToOne(fetch = FetchType.LAZY) +// private MessageFormat valueMessageFormat; + + @Column(nullable = false) + private String topic; + + @Column(nullable = false) + private Timestamp createdAt; + + @Column(nullable = false) + private Timestamp updatedAt; + + @JoinColumn(name = "id", referencedColumnName = "producer_id") + @OneToOne(fetch = FetchType.LAZY) + @Cascade( org.hibernate.annotations.CascadeType.DELETE ) + private ProducerMessage producerMessage; + + public Long getId() { + return id; + } + + public void setId( Long id ) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName( String name ) { + this.name = name; + } + + public Cluster getCluster() { + return cluster; + } + + public void setCluster( Cluster cluster ) { + this.cluster = cluster; + } +// TODO uncomment when we want to send more than a map of string/string as a kafka message + +// public MessageFormat getKeyMessageFormat() +// { +// return keyMessageFormat; +// } +// +// public void setKeyMessageFormat( MessageFormat keyMessageFormat ) +// { +// this.keyMessageFormat = keyMessageFormat; +// } +// +// public MessageFormat getValueMessageFormat() +// { +// return valueMessageFormat; +// } +// +// public void setValueMessageFormat( MessageFormat valueMessageFormat ) +// { +// this.valueMessageFormat = valueMessageFormat; +// } + + public String getTopic() { + return topic; + } + + public void setTopic( String topic ) { + this.topic = topic; + } + + public Timestamp getCreatedAt() { + return createdAt; + } + + public void setCreatedAt( Timestamp createdAt ) { + this.createdAt = createdAt; + } + + public Timestamp getUpdatedAt() { + return updatedAt; + } + + public void setUpdatedAt( Timestamp updatedAt ) { + this.updatedAt = updatedAt; + } + + public ProducerMessage getProducerMessage() { + return producerMessage; + } + + public void setProducerMessage( ProducerMessage producerMessage ) { + this.producerMessage = producerMessage; + } +} diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/model/ProducerMessage.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/model/ProducerMessage.java new file mode 100644 index 00000000..3ca0238d --- /dev/null +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/model/ProducerMessage.java @@ -0,0 +1,120 @@ +package org.sourcelab.kafka.webview.ui.model; + +/** + * MIT License + * + * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/) + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Column; +import javax.persistence.JoinColumn; +import javax.persistence.OneToOne; +import javax.persistence.FetchType; +import java.sql.Timestamp; + +/** + * Message associated with a Producer. + */ +@Entity +public class ProducerMessage { + @Id + @GeneratedValue( strategy = GenerationType.IDENTITY) + private Long id; + + @Column( nullable = false, unique = true) + private String name; + + @Column(nullable = false) + private String qualifiedClassName; + + @Column(nullable = false) + private Timestamp createdAt; + + @Column(nullable = false) + private Timestamp updatedAt; + + @JoinColumn(name = "producer_id", referencedColumnName = "id") + @OneToOne(fetch = FetchType.LAZY) + private Producer producer; + + @Column(nullable = false) + private String propertyNameList; + + public Long getId() { + return id; + } + + public void setId( Long id ) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName( String name ) { + this.name = name; + } + + public String getQualifiedClassName() { + return qualifiedClassName; + } + + public void setQualifiedClassName( String qualifiedClassName ) { + this.qualifiedClassName = qualifiedClassName; + } + + public Timestamp getCreatedAt() { + return createdAt; + } + + public void setCreatedAt( Timestamp createdAt ) { + this.createdAt = createdAt; + } + + public Timestamp getUpdatedAt() { + return updatedAt; + } + + public void setUpdatedAt( Timestamp updatedAt ) { + this.updatedAt = updatedAt; + } + + public Producer getProducer() { + return producer; + } + + public void setProducer( Producer producer ) { + this.producer = producer; + } + + public String getPropertyNameList() { + return propertyNameList; + } + + public void setPropertyNameList( String propertyNameList ) { + this.propertyNameList = propertyNameList; + } +} diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/repository/ProducerMessageRepository.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/repository/ProducerMessageRepository.java new file mode 100644 index 00000000..4ea67c01 --- /dev/null +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/repository/ProducerMessageRepository.java @@ -0,0 +1,41 @@ +package org.sourcelab.kafka.webview.ui.repository; + +/** + * MIT License + * + * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/) + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +import org.sourcelab.kafka.webview.ui.model.Producer; +import org.sourcelab.kafka.webview.ui.model.ProducerMessage; +import org.springframework.data.repository.CrudRepository; +import org.springframework.data.repository.query.Param; +import org.springframework.stereotype.Repository; + +import java.util.Optional; + +/** + * Producer Message Repository. + */ +@Repository +public interface ProducerMessageRepository extends CrudRepository { + Optional findByProducer(@Param("producer") Producer producer); +} diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/repository/ProducerRepository.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/repository/ProducerRepository.java new file mode 100644 index 00000000..588a1271 --- /dev/null +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/repository/ProducerRepository.java @@ -0,0 +1,45 @@ +package org.sourcelab.kafka.webview.ui.repository; + +/** + * MIT License + * + * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/) + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +import org.sourcelab.kafka.webview.ui.model.Producer; +import org.springframework.data.repository.CrudRepository; +import org.springframework.stereotype.Repository; + +/** + * Producer Repository. + */ +@Repository +public interface ProducerRepository extends CrudRepository { + Producer findByName(final String name); + + Iterable findAllByOrderByNameAsc(); + + Iterable findAllByClusterIdOrderByNameAsc(final long clusterId); + + Long countByClusterId(final long clusterId); + + Producer findByTopic(final String topic); +} diff --git a/kafka-webview-ui/src/main/resources/schema/migration/h2/V4__producerTable.sql b/kafka-webview-ui/src/main/resources/schema/migration/h2/V4__producerTable.sql new file mode 100644 index 00000000..fca0eb30 --- /dev/null +++ b/kafka-webview-ui/src/main/resources/schema/migration/h2/V4__producerTable.sql @@ -0,0 +1,34 @@ +/* +TODO update producer table with the following when we want to send more than a map of string/string as a kafka message + key_message_format_id INT(11) UNSIGNED NOT NULL, + value_message_format_id INT(11) UNSIGNED NOT NULL, + FOREIGN KEY (key_message_format_id) REFERENCES message_format(id), + FOREIGN KEY (value_message_format_id) REFERENCES message_format(id) +*/ +CREATE TABLE IF NOT EXISTS `producer` ( + id INT(11) UNSIGNED NOT NULL AUTO_INCREMENT, + name VARCHAR(255) UNIQUE NOT NULL, + cluster_id INT(11) UNSIGNED NOT NULL, + topic VARCHAR(150) UNIQUE NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + created_by INT(11) UNSIGNED DEFAULT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_by INT(11) UNSIGNED DEFAULT NULL, + PRIMARY KEY (id), + FOREIGN KEY (cluster_id) REFERENCES cluster(id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE IF NOT EXISTS `producer_message` ( + id INT(11) UNSIGNED NOT NULL AUTO_INCREMENT, + name VARCHAR(255) UNIQUE NOT NULL, + qualified_class_name TEXT NOT NULL, + producer_id INT(11) UNSIGNED NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + created_by INT(11) UNSIGNED DEFAULT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_by INT(11) UNSIGNED DEFAULT NULL, + property_name_list TEXT NOT NULL, + PRIMARY KEY (id), + FOREIGN KEY (producer_id) REFERENCES producer(id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + diff --git a/kafka-webview-ui/src/main/resources/schema/migration/h2/V5__compositeProducerKey.sql b/kafka-webview-ui/src/main/resources/schema/migration/h2/V5__compositeProducerKey.sql new file mode 100644 index 00000000..67138f64 --- /dev/null +++ b/kafka-webview-ui/src/main/resources/schema/migration/h2/V5__compositeProducerKey.sql @@ -0,0 +1,3 @@ +ALTER TABLE `producer` DROP CONSTRAINT `constraint_f2`; + +ALTER TABLE `producer` ADD CONSTRAINT `unique_producer_topic` UNIQUE(`topic`,`cluster_id`); \ No newline at end of file diff --git a/kafka-webview-ui/src/main/resources/templates/configuration/index.html b/kafka-webview-ui/src/main/resources/templates/configuration/index.html index 29ff5efb..89667ad3 100644 --- a/kafka-webview-ui/src/main/resources/templates/configuration/index.html +++ b/kafka-webview-ui/src/main/resources/templates/configuration/index.html @@ -79,6 +79,15 @@ This section allows you to view any active stream consumers. + + + Producers + + + This section allows you to define how you would like write messages to a configured + Kafka cluster and topic. + + diff --git a/kafka-webview-ui/src/main/resources/templates/configuration/producer/create.html b/kafka-webview-ui/src/main/resources/templates/configuration/producer/create.html new file mode 100644 index 00000000..e5d3b93f --- /dev/null +++ b/kafka-webview-ui/src/main/resources/templates/configuration/producer/create.html @@ -0,0 +1,390 @@ + + + + + Producer Configuration + + + +
+
+ + +
+
+
+
+ Create + +
+
+ +
+ +
Topic Selection
+
+ + +
+ +
+ +
+
+
+ + +
+ +
+ +
+
+
+ + +
+ +
+ +
+
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ +
+ +
+
+
+ + +
+ +
+ +
+
+
+ +
+
+
+ + + + + + + +
+
+
+
+ + + + + +
+ + + \ No newline at end of file diff --git a/kafka-webview-ui/src/main/resources/templates/configuration/producer/index.html b/kafka-webview-ui/src/main/resources/templates/configuration/producer/index.html new file mode 100644 index 00000000..aa710b2b --- /dev/null +++ b/kafka-webview-ui/src/main/resources/templates/configuration/producer/index.html @@ -0,0 +1,86 @@ + + + + + Producer Configuration + + + +
+
+
+
+
+
+ + Producers + +
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + +
NameClusterTopicAction
+ No producer found! +
+ + + +
+
+
+
+ +
+
+
+ + + \ No newline at end of file diff --git a/kafka-webview-ui/src/main/resources/templates/layout.html b/kafka-webview-ui/src/main/resources/templates/layout.html index fe6aab51..07f51a64 100644 --- a/kafka-webview-ui/src/main/resources/templates/layout.html +++ b/kafka-webview-ui/src/main/resources/templates/layout.html @@ -147,6 +147,21 @@ + + +
  • +