From a1a1eb41b8b4652995c9b27911b088b77f5c67a1 Mon Sep 17 00:00:00 2001 From: tchiotludo Date: Sat, 16 Feb 2019 09:00:11 +0100 Subject: [PATCH] Move app to micronaut --- .gitignore | 4 +- build.gradle | 92 ++++- micronaut-cli.yml | 5 + src/main/java/org/kafkahq/App.java | 75 +--- .../kafkahq/configs/AbstractProperties.java | 20 + .../java/org/kafkahq/configs/Connection.java | 19 + .../java/org/kafkahq/configs/Default.java | 14 + .../controllers/AbstractController.java | 68 ++-- .../controllers/DataSseController.java | 107 ------ .../kafkahq/controllers/GroupController.java | 101 +++--- .../kafkahq/controllers/NodeController.java | 80 ++-- .../kafkahq/controllers/SchemaController.java | 152 ++++---- .../kafkahq/controllers/TopicController.java | 341 ++++++++++++------ .../middlewares/KafkaWrapperFilter.java | 47 +++ .../java/org/kafkahq/modules/KafkaModule.java | 79 ++-- .../org/kafkahq/modules/KafkaWrapper.java | 12 +- .../org/kafkahq/modules/RequestHelper.java | 49 +-- .../repositories/ClusterRepository.java | 14 +- .../repositories/ConfigRepository.java | 26 +- .../repositories/ConsumerGroupRepository.java | 22 +- .../repositories/LogDirRepository.java | 14 +- .../repositories/RecordRepository.java | 31 +- .../SchemaRegistryRepository.java | 16 +- .../kafkahq/repositories/TopicRepository.java | 19 +- src/main/java/org/kafkahq/utils/Debug.java | 8 +- src/main/resources/application.yml | 26 ++ {conf => src/main/resources}/logback.xml | 31 +- .../main/resources/views}/blocks/configs.ftl | 0 .../resources/views}/blocks/group/members.ftl | 0 .../resources/views}/blocks/group/topic.ftl | 0 .../resources/views}/blocks/navbar-search.ftl | 0 .../resources/views}/blocks/topic/data.ftl | 5 +- .../views}/blocks/topic/dataBody.ftl | 0 .../views}/blocks/topic/pagination.ftl | 2 + .../views}/blocks/topic/partitions.ftl | 4 +- .../main/resources/views}/group.ftl | 6 +- .../main/resources/views}/groupList.ftl | 6 +- .../main/resources/views}/groupUpdate.ftl | 4 +- .../resources/views}/includes/functions.ftl | 0 .../main/resources/views}/includes/group.ftl | 0 .../main/resources/views}/includes/log.ftl | 2 +- .../main/resources/views}/includes/node.ftl | 0 .../main/resources/views}/includes/schema.ftl | 0 .../resources/views}/includes/template.ftl | 6 +- {public => src/main/resources/views}/node.ftl | 10 +- .../main/resources/views}/nodeList.ftl | 4 +- .../main/resources/views}/schema.ftl | 4 +- .../main/resources/views}/schemaCreate.ftl | 4 +- .../main/resources/views}/schemaList.ftl | 4 +- .../main/resources/views}/topic.ftl | 12 +- .../main/resources/views}/topicCreate.ftl | 4 +- .../main/resources/views}/topicList.ftl | 6 +- .../main/resources/views}/topicProduce.ftl | 8 +- .../main/resources/views}/topicSearch.ftl | 2 +- src/test/java/org/kafkahq/BaseTest.java | 10 +- .../java/org/kafkahq/KafkaTestCluster.java | 27 +- src/test/java/org/kafkahq/StreamTest.java | 11 +- .../repositories/RecordRepositoryTest.java | 6 +- .../test/resources/logback.xml | 16 +- webpack.config.js | 3 +- 60 files changed, 845 insertions(+), 793 deletions(-) create mode 100644 micronaut-cli.yml create mode 100644 src/main/java/org/kafkahq/configs/AbstractProperties.java create mode 100644 src/main/java/org/kafkahq/configs/Connection.java create mode 100644 src/main/java/org/kafkahq/configs/Default.java delete mode 100644 src/main/java/org/kafkahq/controllers/DataSseController.java create mode 100644 src/main/java/org/kafkahq/middlewares/KafkaWrapperFilter.java create mode 100644 src/main/resources/application.yml rename {conf => src/main/resources}/logback.xml (52%) rename {public => src/main/resources/views}/blocks/configs.ftl (100%) rename {public => src/main/resources/views}/blocks/group/members.ftl (100%) rename {public => src/main/resources/views}/blocks/group/topic.ftl (100%) rename {public => src/main/resources/views}/blocks/navbar-search.ftl (100%) rename {public => src/main/resources/views}/blocks/topic/data.ftl (98%) rename {public => src/main/resources/views}/blocks/topic/dataBody.ftl (100%) rename {public => src/main/resources/views}/blocks/topic/pagination.ftl (97%) rename {public => src/main/resources/views}/blocks/topic/partitions.ftl (93%) rename {public => src/main/resources/views}/group.ftl (89%) rename {public => src/main/resources/views}/groupList.ftl (59%) rename {public => src/main/resources/views}/groupUpdate.ftl (96%) rename {public => src/main/resources/views}/includes/functions.ftl (100%) rename {public => src/main/resources/views}/includes/group.ftl (100%) rename {public => src/main/resources/views}/includes/log.ftl (96%) rename {public => src/main/resources/views}/includes/node.ftl (100%) rename {public => src/main/resources/views}/includes/schema.ftl (100%) rename {public => src/main/resources/views}/includes/template.ftl (98%) rename {public => src/main/resources/views}/node.ftl (86%) rename {public => src/main/resources/views}/nodeList.ftl (94%) rename {public => src/main/resources/views}/schema.ftl (93%) rename {public => src/main/resources/views}/schemaCreate.ftl (75%) rename {public => src/main/resources/views}/schemaList.ftl (82%) rename {public => src/main/resources/views}/topic.ftl (89%) rename {public => src/main/resources/views}/topicCreate.ftl (96%) rename {public => src/main/resources/views}/topicList.ftl (96%) rename {public => src/main/resources/views}/topicProduce.ftl (91%) rename {public => src/main/resources/views}/topicSearch.ftl (89%) rename conf/logback-test.xml => src/test/resources/logback.xml (75%) diff --git a/.gitignore b/.gitignore index 5f4e881e7..e29388f11 100644 --- a/.gitignore +++ b/.gitignore @@ -18,7 +18,7 @@ build/* .gradletasknamecache ### Assets -public/static +src/main/resources/static node_modules node/ @@ -36,7 +36,7 @@ out/* logs/* ### Kafka HQ ### -conf/*.dev.conf +src/**/*-dev.yml ## Docker .env diff --git a/build.gradle b/build.gradle index a598c401b..2d04b8309 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,6 @@ buildscript { ext { + micronautVersion = "1.1.0.M1" joobyVersion = "1.5.+" confluentVersion = "5.1.0" kafkaVersion = "2.1.+" @@ -12,20 +13,33 @@ buildscript { } dependencies { + // jooby classpath "com.google.gradle:osdetector-gradle-plugin:1.4.0" classpath "io.spring.gradle:dependency-management-plugin:1.0.3.RELEASE" classpath "org.jooby:jooby-gradle-plugin:$joobyVersion" + + // kafkahq classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.16.0" } } plugins { + // micronaut + id "io.spring.dependency-management" version "1.0.6.RELEASE" + id "com.github.johnrengelman.shadow" version "4.0.2" + id "application" + id "java" + id "net.ltgt.apt-eclipse" version "0.20" + id "net.ltgt.apt-idea" version "0.20" + + // kafkahq id "com.moowork.node" version "1.2.0" id 'io.franzbecker.gradle-lombok' version '1.14' id 'com.adarshr.test-logger' version '1.6.0' id 'com.github.psxpaul.execfork' version '0.1.9' } +// jooby apply plugin: "io.spring.dependency-management" apply plugin: "com.google.osdetector" apply plugin: "application" @@ -43,6 +57,7 @@ repositories { mavenCentral() maven { url 'http://packages.confluent.io/maven/' } maven { url "https://oss.sonatype.org/content/repositories/snapshots" } + maven { url "https://jcenter.bintray.com" } } lombok { @@ -71,23 +86,15 @@ configurations { dependencyManagement { imports { mavenBom "org.jooby:jooby-bom:$joobyVersion" + mavenBom "io.micronaut:micronaut-bom:$micronautVersion" } } -joobyRun { - includes = ["**/*.class", "**/*.conf", "**/*.properties", "**/*.ftl", "**/*.xml"] -} - -sourceSets.main.resources { - srcDirs = ["conf", "public"] +// micronaut +configurations { + developmentOnly } -gradle.taskGraph.whenReady { graph -> - if (graph.hasTask(joobyRun)) { - webpack.enabled = false - npmInstall.enabled = false - } -} /**********************************************************************************************************************\ * Dependencies @@ -127,6 +134,41 @@ dependencies { testCompile group: 'org.apache.kafka', name: 'kafka-streams', version: kafkaVersion testCompile group: "io.confluent", name: "kafka-streams-avro-serde", version: confluentVersion testCompile group: 'org.slf4j', name: 'jul-to-slf4j', version: '1.7.25' + + // micronaut + annotationProcessor "io.micronaut:micronaut-inject-java" + annotationProcessor "io.micronaut:micronaut-validation" + compile "io.micronaut:micronaut-http-client" + compile "io.micronaut:micronaut-http-server-netty" + compile "io.micronaut:micronaut-inject" + compile "io.micronaut:micronaut-validation" + compile 'io.micronaut:micronaut-views' + compile "io.micronaut:micronaut-runtime" + runtime "ch.qos.logback:logback-classic:1.2.3" + runtime 'org.freemarker:freemarker:2.3.28' + + // micronaut test + testAnnotationProcessor "io.micronaut:micronaut-inject-java" + testCompile "org.junit.jupiter:junit-jupiter-api" + testCompile "io.micronaut.test:micronaut-test-junit5" + testRuntime "org.junit.jupiter:junit-jupiter-engine" +} + +/**********************************************************************************************************************\ + * Compile & run (micronaut) + **********************************************************************************************************************/ +run.classpath += configurations.developmentOnly +test.classpath += configurations.developmentOnly +run.jvmArgs('-noverify', '-XX:TieredStopAtLevel=1') + +// use JUnit 5 platform +test { + useJUnitPlatform() +} + +tasks.withType(JavaCompile){ + options.encoding = "UTF-8" + options.compilerArgs.add('-parameters') } /**********************************************************************************************************************\ @@ -189,24 +231,34 @@ task webpack(type: NodeTask, dependsOn: "npmInstall") { group = 'build' description = 'Build with webpack assets' script = project.file("node_modules/.bin/webpack") - outputs.dir("public/static") + outputs.dir("src/main/resources/static") args = ["-p", "--mode production"] } jar.dependsOn "webpack" processResources.dependsOn 'webpack' -clean.delete << file("public/static") +clean.delete << file("src/main/resources/static") /**********************************************************************************************************************\ * Jar **********************************************************************************************************************/ -jar { - manifest { - attributes "Main-Class": mainClassName - } +shadowJar { + mergeServiceFiles() +} - from { - configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } +/**********************************************************************************************************************\ + * Tmp + **********************************************************************************************************************/ +task micronaut(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + main = 'org.kafkahq.App' + jvmArgs = ['-Dmicronaut.environments=dev'] +} + +gradle.taskGraph.whenReady { graph -> + if (graph.hasTask(micronaut)) { + webpack.enabled = false + npmInstall.enabled = false } } \ No newline at end of file diff --git a/micronaut-cli.yml b/micronaut-cli.yml new file mode 100644 index 000000000..7470e3b9f --- /dev/null +++ b/micronaut-cli.yml @@ -0,0 +1,5 @@ +profile: service +defaultPackage: org.kafkahq +--- +testFramework: junit +sourceLanguage: java \ No newline at end of file diff --git a/src/main/java/org/kafkahq/App.java b/src/main/java/org/kafkahq/App.java index 306c258d6..6b0746e05 100644 --- a/src/main/java/org/kafkahq/App.java +++ b/src/main/java/org/kafkahq/App.java @@ -1,91 +1,30 @@ package org.kafkahq; -import com.typesafe.config.Config; -import org.jooby.FlashScope; -import org.jooby.Jooby; -import org.jooby.RequestLogger; -import org.jooby.assets.Assets; -import org.jooby.ftl.Ftl; -import org.jooby.json.Jackson; -import org.jooby.livereload.LiveReload; -import org.jooby.whoops.Whoops; -import org.kafkahq.controllers.*; -import org.kafkahq.modules.KafkaModule; -import org.kafkahq.modules.KafkaWrapper; -import org.kafkahq.repositories.AbstractRepository; +import io.micronaut.runtime.Micronaut; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Optional; - -public class App extends Jooby { - // module +public class App { + // route { - use(new FlashScope()); + /* + use(new Whoops()); use("*", new RequestLogger() .latency() .extended() ); - use(new Jackson()); - - on("dev", () -> { - Path basedir = Paths.get(System.getProperty("user.dir")); - use(new Whoops()); - use(new LiveReload() - .register(basedir.resolve("public"), - "**/*.ftl" - ) - .register(basedir.resolve("target"), - "**/*.class", - "**/*.conf", - "**/*.properties") - .register(basedir.resolve("build"), - "**/*.class", - "**/*.conf", - "**/*.properties") - ); - }); - on("prod", () -> { - use(new Assets()); - }); assets("/favicon.ico"); - use(new Ftl("/", ".ftl")); - use(KafkaModule.class); - - // @RequestScoped hack - use("*", "/{cluster}/**", (req, rsp, chain) -> { - Optional cluster = req.param("cluster").toOptional(); - cluster.ifPresent(clusterId -> - AbstractRepository.setWrapper(new KafkaWrapper(this.require(KafkaModule.class), clusterId)) - ); - - chain.next(req, rsp); - }); - } - - // route - { use("*", "/", (req, rsp, chain) -> { rsp.redirect("/" + this.require(KafkaModule.class).getClustersList().get(0) + "/topic"); }); use("*", "/{cluster}", (req, rsp, chain) -> { rsp.redirect("/" + req.param("cluster").value() + "/topic"); }); - use(NodeController.class); - use(SchemaController.class); - use(TopicController.class); - use(GroupController.class); - sse("/{cluster}/topic/{topic}/search/{search}", new DataSseController()); - } - - public static String getBasePath(Config config) { - return config.getString("application.path").replaceAll("/$",""); + */ } public static void main(String[] args) { - run(App::new, args); + Micronaut.run(App.class); } } \ No newline at end of file diff --git a/src/main/java/org/kafkahq/configs/AbstractProperties.java b/src/main/java/org/kafkahq/configs/AbstractProperties.java new file mode 100644 index 000000000..d3a097819 --- /dev/null +++ b/src/main/java/org/kafkahq/configs/AbstractProperties.java @@ -0,0 +1,20 @@ +package org.kafkahq.configs; + +import io.micronaut.context.annotation.Parameter; +import io.micronaut.core.convert.format.MapFormat; +import lombok.Getter; + +import java.util.Map; + +@Getter +abstract public class AbstractProperties { + private final String name; + + @MapFormat(transformation = MapFormat.MapTransformation.FLAT) + Map properties; + + public AbstractProperties(@Parameter String name) { + this.name = name; + } +} + diff --git a/src/main/java/org/kafkahq/configs/Connection.java b/src/main/java/org/kafkahq/configs/Connection.java new file mode 100644 index 000000000..faad27e42 --- /dev/null +++ b/src/main/java/org/kafkahq/configs/Connection.java @@ -0,0 +1,19 @@ +package org.kafkahq.configs; + +import io.micronaut.context.annotation.EachProperty; +import io.micronaut.context.annotation.Parameter; +import lombok.Getter; + +import java.net.URL; +import java.util.Optional; + +@EachProperty("kafka.connections") +@Getter +public class Connection extends AbstractProperties { + Optional registry = Optional.empty(); + + public Connection(@Parameter String name) { + super(name); + } +} + diff --git a/src/main/java/org/kafkahq/configs/Default.java b/src/main/java/org/kafkahq/configs/Default.java new file mode 100644 index 000000000..49906d2fc --- /dev/null +++ b/src/main/java/org/kafkahq/configs/Default.java @@ -0,0 +1,14 @@ +package org.kafkahq.configs; + +import io.micronaut.context.annotation.EachProperty; +import io.micronaut.context.annotation.Parameter; +import lombok.Getter; + +@EachProperty("kafka.defaults") +@Getter +public class Default extends AbstractProperties { + public Default(@Parameter String name) { + super(name); + } +} + diff --git a/src/main/java/org/kafkahq/controllers/AbstractController.java b/src/main/java/org/kafkahq/controllers/AbstractController.java index 37e0c0e6e..144d84a96 100644 --- a/src/main/java/org/kafkahq/controllers/AbstractController.java +++ b/src/main/java/org/kafkahq/controllers/AbstractController.java @@ -2,51 +2,67 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import com.google.inject.Inject; -import com.typesafe.config.Config; +import io.micronaut.context.annotation.Value; +import io.micronaut.core.util.CollectionUtils; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.MutableHttpResponse; +import io.micronaut.http.cookie.Cookie; import lombok.Builder; import lombok.Getter; -import org.codehaus.httpcache4j.uri.QueryParams; -import org.codehaus.httpcache4j.uri.URIBuilder; -import org.jooby.Request; -import org.jooby.View; -import org.kafkahq.App; +import lombok.experimental.Wither; import org.kafkahq.modules.KafkaModule; +import javax.inject.Inject; +import java.util.Map; + abstract public class AbstractController { private static final String SESSION_TOAST = "TOAST"; private static Gson gson = new GsonBuilder() .enableComplexMapKeySerialization() .create(); + @Value("${micronaut.context.path}") + protected String basePath; + @Inject private KafkaModule kafkaModule; - @Inject - private Config config; + @SuppressWarnings("unchecked") + protected HttpResponse template(HttpRequest request, String cluster, Object... values) { - protected View template(Request request, String cluster, View view) { - view - .put("clusterId", cluster) - .put("clusters", this.kafkaModule.getClustersList()) - .put("registryEnabled", this.kafkaModule.getRegistryRestClient(cluster) != null) - .put("basePath", App.getBasePath(config)); + Map datas = CollectionUtils.mapOf(values); - request - .ifFlash(SESSION_TOAST) - .ifPresent(s -> view.put("toast", s)); + datas.put("clusterId", cluster); + datas.put("clusters", this.kafkaModule.getClustersList()); + datas.put("registryEnabled", this.kafkaModule.getRegistryRestClient(cluster) != null); + datas.put("basePath", getBasePath()); - return view; + MutableHttpResponse response = HttpResponse.ok(); + + request.getCookies() + .findCookie(SESSION_TOAST) + .ifPresent(s -> { + datas.put("toast", s.getValue()); + response.cookie(Cookie.of(SESSION_TOAST, "").maxAge(0).path("/")); + }); + + return response.body(datas); } - protected URIBuilder uri(Request request) { - return URIBuilder.empty() - .withPath(request.path()) - .withParameters(QueryParams.parse(request.queryString().orElse(""))); + protected String getBasePath() { + return basePath.replaceAll("/$",""); } - protected Toast toast(Request request, Toast toast) { - request.flash(SESSION_TOAST, gson.toJson(toast)); + protected Toast toast(MutableHttpResponse response, Toast toast) { + Cookie cookie = Cookie + .of(SESSION_TOAST, gson.toJson(toast + .withTitle(toast.getTitle() != null ? toast.getTitle().replaceAll(";", ",") : null) + .withMessage(toast.getMessage() != null ? toast.getMessage().replaceAll(";", ",") : null) + )) + .path("/"); + + response.cookie(cookie); return toast; } @@ -62,8 +78,10 @@ public enum Type { question } + @Wither private String title; + @Wither private String message; @Builder.Default diff --git a/src/main/java/org/kafkahq/controllers/DataSseController.java b/src/main/java/org/kafkahq/controllers/DataSseController.java deleted file mode 100644 index 51d57e333..000000000 --- a/src/main/java/org/kafkahq/controllers/DataSseController.java +++ /dev/null @@ -1,107 +0,0 @@ -package org.kafkahq.controllers; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.typesafe.config.Config; -import freemarker.template.Configuration; -import freemarker.template.Template; -import freemarker.template.TemplateException; -import lombok.AllArgsConstructor; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.ToString; -import org.jooby.MediaType; -import org.jooby.Request; -import org.jooby.Sse; -import org.kafkahq.App; -import org.kafkahq.models.Topic; -import org.kafkahq.modules.RequestHelper; -import org.kafkahq.repositories.ConfigRepository; -import org.kafkahq.repositories.RecordRepository; -import org.kafkahq.repositories.TopicRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.StringWriter; -import java.io.Writer; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -public class DataSseController implements Sse.Handler { - private static Logger logger = LoggerFactory.getLogger(DataSseController.class); - - @Override - public void handle(Request request, Sse sse) throws Exception { - TopicRepository topicRepository = sse.require(TopicRepository.class); - RecordRepository recordRepository = sse.require(RecordRepository.class); - ConfigRepository configRepository = sse.require(ConfigRepository.class); - Config config = sse.require(Config.class); - - Topic topic = topicRepository.findByName(request.param("topic").value()); - RecordRepository.Options options = RequestHelper.buildRecordRepositoryOptions( - request, - request.param("cluster").value(), - request.param("topic").value() - ); - - Map datas = new HashMap<>(); - datas.put("topic", topic); - datas.put("canDeleteRecords", topic.canDeleteRecords(configRepository)); - datas.put("clusterId", request.param("cluster").value()); - datas.put("basePath", App.getBasePath(config)); - - AtomicInteger next = new AtomicInteger(0); - RecordRepository.SearchConsumer searchConsumer = new RecordRepository.SearchConsumer() { - @Override - public void accept(RecordRepository.SearchEvent searchEvent) { - datas.put("datas", searchEvent.getRecords()); - try { - sse - .event(new SearchBody(searchEvent.getOffsets(), searchEvent.getProgress(), render(sse, datas))) - .id(next.incrementAndGet()) - .name("searchBody") - .type(MediaType.json) - .send(); - } catch (IOException | TemplateException e) { - logger.error("Error on sse send", e); - } - } - }; - sse.onClose(searchConsumer::close); - - RecordRepository.SearchEnd end = recordRepository.search(options, searchConsumer); - sse - .event(end) - .id(next.incrementAndGet()) - .name("searchEnd") - .type(MediaType.json) - .send(); - sse.close(); - } - - private String render(Sse sse, Map datas) throws IOException, TemplateException { - Configuration configuration = sse.require(Configuration.class); - Template template = configuration.getTemplate("topicSearch.ftl"); - - Writer stringWriterr = new StringWriter(); - template.process(datas, stringWriterr); - - return stringWriterr.toString(); - } - - @ToString - @EqualsAndHashCode - @Getter - @AllArgsConstructor - public static class SearchBody { - @JsonProperty("offsets") - private Map offsets = new HashMap<>(); - - @JsonProperty("progress") - private Map progress = new HashMap<>(); - - @JsonProperty("body") - private String body; - } -} diff --git a/src/main/java/org/kafkahq/controllers/GroupController.java b/src/main/java/org/kafkahq/controllers/GroupController.java index 76f814ae5..0773d2c2b 100644 --- a/src/main/java/org/kafkahq/controllers/GroupController.java +++ b/src/main/java/org/kafkahq/controllers/GroupController.java @@ -1,18 +1,20 @@ package org.kafkahq.controllers; -import com.google.inject.Inject; -import org.jooby.*; -import org.jooby.mvc.GET; -import org.jooby.mvc.POST; -import org.jooby.mvc.Path; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.MediaType; +import io.micronaut.http.MutableHttpResponse; +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Get; +import io.micronaut.http.annotation.Post; +import io.micronaut.views.View; import org.kafkahq.models.ConsumerGroup; import org.kafkahq.models.TopicPartition; import org.kafkahq.modules.RequestHelper; import org.kafkahq.repositories.ConsumerGroupRepository; import org.kafkahq.repositories.RecordRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import javax.inject.Inject; import java.time.Instant; import java.util.AbstractMap; import java.util.List; @@ -21,81 +23,78 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -@Path("/{cluster}/group") +@Controller("${micronaut.context.path:}/{cluster}/group") public class GroupController extends AbstractController { - private static final Logger logger = LoggerFactory.getLogger(TopicController.class); - - @Inject private ConsumerGroupRepository consumerGroupRepository; + private RecordRepository recordRepository; @Inject - private RecordRepository recordRepository; + public GroupController(ConsumerGroupRepository consumerGroupRepository, RecordRepository recordRepository) { + this.consumerGroupRepository = consumerGroupRepository; + this.recordRepository = recordRepository; + } - @GET - public View list(Request request, String cluster, Optional search) throws ExecutionException, InterruptedException { + @View("groupList") + @Get + public HttpResponse list(HttpRequest request, String cluster, Optional search) throws ExecutionException, InterruptedException { return this.template( request, cluster, - Results - .html("groupList") - .put("search", search) - .put("groups", this.consumerGroupRepository.list(search)) + "search", search, + "groups", this.consumerGroupRepository.list(search) ); } - @GET - @Path("{groupName}") - public View home(Request request, String cluster, String groupName) throws ExecutionException, InterruptedException { + @View("group") + @Get("{groupName}") + public HttpResponse home(HttpRequest request, String cluster, String groupName) throws ExecutionException, InterruptedException { return this.render(request, cluster, groupName, "topics"); } - @GET - @Path("{groupName}/{tab:(topics|members)}") - public View tab(Request request, String cluster, String tab, String groupName) throws ExecutionException, InterruptedException { + @View("group") + @Get("{groupName}/{tab:(topics|members)}") + public HttpResponse tab(HttpRequest request, String cluster, String tab, String groupName) throws ExecutionException, InterruptedException { return this.render(request, cluster, groupName, tab); } - public View render(Request request, String cluster, String groupName, String tab) throws ExecutionException, InterruptedException { + private HttpResponse render(HttpRequest request, String cluster, String groupName, String tab) throws ExecutionException, InterruptedException { ConsumerGroup group = this.consumerGroupRepository.findByName(groupName); return this.template( request, cluster, - Results - .html("group") - .put("tab", tab) - .put("group", group) + "tab", tab, + "group", group ); } - @GET - @Path("{groupName}/offsets") - public View offsets(Request request, String cluster, String groupName) throws ExecutionException, InterruptedException { + @View("groupUpdate") + @Get("{groupName}/offsets") + public HttpResponse offsets(HttpRequest request, String cluster, String groupName) throws ExecutionException, InterruptedException { ConsumerGroup group = this.consumerGroupRepository.findByName(groupName); return this.template( request, cluster, - Results - .html("groupUpdate") - .put("group", group) + "group", group ); } - @POST - @Path("{groupName}/offsets") - public void offsetsSubmit(Request request, Response response, String cluster, String groupName) throws Throwable { + @Post(value = "{groupName}/offsets", consumes = MediaType.MULTIPART_FORM_DATA) + public HttpResponse offsetsSubmit(HttpRequest request, String cluster, String groupName, Map offset) throws Throwable { ConsumerGroup group = this.consumerGroupRepository.findByName(groupName); Map offsets = group.getOffsets() .stream() .map(r -> new AbstractMap.SimpleEntry<>( new TopicPartition(r.getTopic(), r.getPartition()), - request.param("offset[" + r.getTopic() + "][" + r.getPartition() + "]").longValue() + offset.get("offset[" + r.getTopic() + "][" + r.getPartition() + "]") )) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - this.toast(request, RequestHelper.runnableToToast(() -> this.consumerGroupRepository.updateOffsets( + MutableHttpResponse response = HttpResponse.redirect(request.getUri()); + + this.toast(response, RequestHelper.runnableToToast(() -> this.consumerGroupRepository.updateOffsets( cluster, groupName, offsets @@ -104,12 +103,11 @@ public void offsetsSubmit(Request request, Response response, String cluster, St "Failed to update offsets for '" + group.getId() + "'" )); - response.redirect(request.path()); + return response; } - @GET - @Path("{groupName}/offsets/start") - public Result offsetsStart(Request request, String cluster, String groupName, String timestamp) throws ExecutionException, InterruptedException { + @Get("{groupName}/offsets/start") + public HttpResponse offsetsStart(HttpRequest request, String cluster, String groupName, String timestamp) throws ExecutionException, InterruptedException { ConsumerGroup group = this.consumerGroupRepository.findByName(groupName); List offsetForTime = recordRepository.getOffsetForTime( @@ -121,20 +119,19 @@ public Result offsetsStart(Request request, String cluster, String groupName, St Instant.parse(timestamp).toEpochMilli() ); - return Results - .with(offsetForTime) - .type(MediaType.json); + return HttpResponse.ok(offsetForTime); } - @GET - @Path("{groupName}/delete") - public Result delete(Request request, String cluster, String groupName) { - this.toast(request, RequestHelper.runnableToToast(() -> + @Get("{groupName}/delete") + public HttpResponse delete(HttpRequest request, String cluster, String groupName) { + MutableHttpResponse response = HttpResponse.ok(); + + this.toast(response, RequestHelper.runnableToToast(() -> this.consumerGroupRepository.delete(cluster, groupName), "Consumer group '" + groupName + "' is deleted", "Failed to consumer group " + groupName )); - return Results.ok(); + return response; } } diff --git a/src/main/java/org/kafkahq/controllers/NodeController.java b/src/main/java/org/kafkahq/controllers/NodeController.java index b34fe4137..ca12a8ef9 100644 --- a/src/main/java/org/kafkahq/controllers/NodeController.java +++ b/src/main/java/org/kafkahq/controllers/NodeController.java @@ -1,13 +1,13 @@ package org.kafkahq.controllers; -import com.google.inject.Inject; -import org.jooby.Request; -import org.jooby.Response; -import org.jooby.Results; -import org.jooby.View; -import org.jooby.mvc.GET; -import org.jooby.mvc.POST; -import org.jooby.mvc.Path; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.MediaType; +import io.micronaut.http.MutableHttpResponse; +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Get; +import io.micronaut.http.annotation.Post; +import io.micronaut.views.View; import org.kafkahq.models.Config; import org.kafkahq.models.Node; import org.kafkahq.modules.RequestHelper; @@ -15,51 +15,54 @@ import org.kafkahq.repositories.ConfigRepository; import org.kafkahq.repositories.LogDirRepository; +import javax.inject.Inject; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.ExecutionException; -@Path("/{cluster}/node") +@Controller("${micronaut.context.path:}/{cluster}/node") public class NodeController extends AbstractController { - - @Inject private ClusterRepository clusterRepository; - - @Inject private ConfigRepository configRepository; + private LogDirRepository logDirRepository; @Inject - private LogDirRepository logDirRepository; + public NodeController(ClusterRepository clusterRepository, ConfigRepository configRepository, LogDirRepository logDirRepository) { + this.clusterRepository = clusterRepository; + this.configRepository = configRepository; + this.logDirRepository = logDirRepository; + } - @GET - public View list(Request request, String cluster) throws ExecutionException, InterruptedException { + @View("nodeList") + @Get + public HttpResponse list(HttpRequest request, String cluster) throws ExecutionException, InterruptedException { return this.template( request, cluster, - Results - .html("nodeList") - .put("cluster", this.clusterRepository.get()) + "cluster", this.clusterRepository.get() ); } - @GET - @Path("{nodeId}") - public View home(Request request, String cluster, Integer nodeId) throws ExecutionException, InterruptedException { + @View("node") + @Get("{nodeId}") + public HttpResponse home(HttpRequest request, String cluster, Integer nodeId) throws ExecutionException, InterruptedException { return this.render(request, cluster, nodeId, "configs"); } - @GET - @Path("{nodeId}/{tab:(logs)}") - public View tab(Request request, String cluster, Integer nodeId, String tab) throws ExecutionException, InterruptedException { + @View("node") + @Get("{nodeId}/{tab:(logs)}") + public HttpResponse tab(HttpRequest request, String cluster, Integer nodeId, String tab) throws ExecutionException, InterruptedException { return this.render(request, cluster, nodeId, tab); } - @POST - @Path("{nodeId}") - public void updateConfig(Request request, Response response, String cluster, Integer nodeId) throws Throwable { - List updated = RequestHelper.updatedConfigs(request, this.configRepository.findByBroker(nodeId)); + @Post(value = "{nodeId}", consumes = MediaType.MULTIPART_FORM_DATA) + public HttpResponse updateConfig(HttpRequest request, String cluster, Integer nodeId, Map configs) throws Throwable { + List updated = ConfigRepository.updatedConfigs(configs, this.configRepository.findByBroker(nodeId)); + + MutableHttpResponse response = HttpResponse.redirect(request.getUri()); - this.toast(request, RequestHelper.runnableToToast(() -> { + this.toast(response, RequestHelper.runnableToToast(() -> { if (updated.size() == 0) { throw new IllegalArgumentException("No config to update"); } @@ -74,10 +77,10 @@ public void updateConfig(Request request, Response response, String cluster, Int "Failed to update node '" + nodeId + "' configs" )); - response.redirect(request.path()); + return response; } - - public View render(Request request, String cluster, Integer nodeId, String tab) throws ExecutionException, InterruptedException { + + private HttpResponse render(HttpRequest request, String cluster, Integer nodeId, String tab) throws ExecutionException, InterruptedException { Node node = this.clusterRepository.get() .getNodes() .stream() @@ -93,13 +96,10 @@ public View render(Request request, String cluster, Integer nodeId, String tab) return this.template( request, cluster, - Results - .html("node") - .put("tab", tab) - .put("node", node) - .put("logs", logDirRepository.findByBroker(node.getId())) - .put("configs", configs) + "tab", tab, + "node", node, + "logs", logDirRepository.findByBroker(node.getId()), + "configs", configs ); } - } diff --git a/src/main/java/org/kafkahq/controllers/SchemaController.java b/src/main/java/org/kafkahq/controllers/SchemaController.java index 9ad7142dd..3c669ce16 100644 --- a/src/main/java/org/kafkahq/controllers/SchemaController.java +++ b/src/main/java/org/kafkahq/controllers/SchemaController.java @@ -1,134 +1,152 @@ package org.kafkahq.controllers; -import com.google.inject.Inject; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import org.jooby.*; -import org.jooby.mvc.GET; -import org.jooby.mvc.POST; -import org.jooby.mvc.Path; +import io.micronaut.http.*; +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Get; +import io.micronaut.http.annotation.Post; +import io.micronaut.views.View; import org.kafkahq.models.Schema; import org.kafkahq.modules.RequestHelper; import org.kafkahq.repositories.SchemaRegistryRepository; +import javax.inject.Inject; import java.io.IOException; +import java.net.URI; -@Path("/{cluster}/schema") +@Controller("${micronaut.context.path:}/{cluster}/schema") public class SchemaController extends AbstractController { - @Inject private SchemaRegistryRepository schemaRepository; - @GET - public View list(Request request, String cluster) throws IOException, RestClientException { + @Inject + public SchemaController(SchemaRegistryRepository schemaRepository) { + this.schemaRepository = schemaRepository; + } + + @View("schemaList") + @Get + public HttpResponse list(HttpRequest request, String cluster) throws IOException, RestClientException { return this.template( request, cluster, - Results - .html("schemaList") - .put("schemas", this.schemaRepository.getAll(cluster)) + "schemas", this.schemaRepository.getAll(cluster) ); } - @GET - @Path("create") - public View create(Request request, String cluster) throws IOException, RestClientException { + @View("schemaCreate") + @Get("create") + public HttpResponse create(HttpRequest request, String cluster) throws IOException, RestClientException { return this.template( request, cluster, - Results - .html("schemaCreate") - .put("config", this.schemaRepository.getDefaultConfig(cluster)) - .put("compatibilityLevel", Schema.Config.getCompatibilityLevelConfigList()) + "config", this.schemaRepository.getDefaultConfig(cluster), + "compatibilityLevel", Schema.Config.getCompatibilityLevelConfigList() ); } - @POST - @Path("create") - public Result createSubmit(Request request, Response response, String cluster, String subject) throws Throwable { + @Post(value = "create", consumes = MediaType.MULTIPART_FORM_DATA) + public HttpResponse createSubmit(String cluster, + String subject, + String schema, + String compatibilityLevel) + throws Throwable + { if (this.schemaRepository.exist(cluster, subject)) { - this.toast(request, AbstractController.Toast.builder() + MutableHttpResponse response = HttpResponse.redirect(new URI(("/" + cluster + "/schema/create"))); + + this.toast(response, AbstractController.Toast.builder() .message("Subject '" + subject + "' already exits") .type(AbstractController.Toast.Type.error) .build() ); - response.redirect("/" + cluster + "/schema/create"); - return Results.ok(); + + return response; } - Toast toast = this.toast(request, RequestHelper.runnableToToast( - () -> registerSchema(request, cluster, subject), + MutableHttpResponse response = HttpResponse.ok(); + + Toast toast = this.toast(response, RequestHelper.runnableToToast( + () -> registerSchema(cluster, subject, schema, compatibilityLevel), "Schema '" + subject + "' is created", "Failed to create schema'" + subject + "'" )); + URI redirect; + if (toast.getType() != Toast.Type.error) { - response.redirect("/" + cluster + "/schema/" + subject); + redirect = new URI("/" + cluster + "/schema/" + subject); } else { - response.redirect("/" + cluster + "/schema/create"); + redirect = new URI("/" + cluster + "/schema/create"); } - return Results.ok(); + return response.status(HttpStatus.MOVED_PERMANENTLY) + .headers((headers) -> + headers.location(redirect) + ); } - @GET - @Path("{subject}") - public View home(Request request, String cluster, String subject) throws IOException, RestClientException { + @View("schema") + @Get("{subject}") + public HttpResponse home(HttpRequest request, String cluster, String subject) throws IOException, RestClientException { return this.render(request, cluster, subject, "update"); } - @POST - @Path("{subject}") - public Result updateSchema(Request request, Response response, String cluster, String subject) throws Throwable { - Toast toast = this.toast(request, RequestHelper.runnableToToast( - () -> registerSchema(request, cluster, subject), + @Post(value = "{subject}", consumes = MediaType.MULTIPART_FORM_DATA) + public HttpResponse updateSchema(String cluster, + String subject, + String schema, + String compatibilityLevel) throws Throwable { + MutableHttpResponse response = HttpResponse.redirect(new URI("/" + cluster + "/schema/" + subject)); + + this.toast(response, RequestHelper.runnableToToast( + () -> registerSchema(cluster, subject, schema, compatibilityLevel), "Schema '" + subject + "' is updated", - "Failed to update schema'" + subject + "'" + "Failed to update schema '" + subject + "'" )); - response.redirect("/" + cluster + "/schema/" + subject); - return Results.ok(); + return response; } - @GET - @Path("{subject}/{tab:(version)}") - public View tab(Request request, String cluster, String subject, String tab) throws IOException, RestClientException { + @View("schema") + @Get("{subject}/{tab:(version)}") + public HttpResponse tab(HttpRequest request, String cluster, String subject, String tab) throws IOException, RestClientException { return this.render(request, cluster, subject, tab); } - @GET - @Path("{subject}/delete") - public Result delete(Request request, String cluster, String subject) { - this.toast(request, RequestHelper.runnableToToast(() -> + @Get("{subject}/delete") + public HttpResponse delete(String cluster, String subject) { + MutableHttpResponse response = HttpResponse.ok(); + + this.toast(response, RequestHelper.runnableToToast(() -> this.schemaRepository.delete(cluster, subject), "Subject from '" + subject + "' is deleted", "Failed to delete subject from '" + subject + "'" )); - return Results.ok(); + return response; } - @GET - @Path("{subject}/version/{version}/delete") - public Result deleteVersion(Request request, String cluster, String subject, Integer version) { - this.toast(request, RequestHelper.runnableToToast(() -> + @Get("{subject}/version/{version}/delete") + public HttpResponse deleteVersion(HttpRequest request, String cluster, String subject, Integer version) { + MutableHttpResponse response = HttpResponse.ok(); + + this.toast(response, RequestHelper.runnableToToast(() -> this.schemaRepository.deleteVersion(cluster, subject, version), "Version '" + version + "' from '" + subject + "' is deleted", "Failed to delete version '" + version + "' from '" + subject + "'" )); - return Results.ok(); + return response; } - - private Schema registerSchema(Request request, String cluster, String subject) throws IOException, RestClientException { - org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse( - request.param("schema").value() - ); + private Schema registerSchema(String cluster, String subject, String schema, String compatibilityLevel) throws IOException, RestClientException { + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(schema); Schema register = this.schemaRepository.register(cluster, subject, avroSchema); Schema.Config config = Schema.Config.builder() .compatibilityLevel(Schema.Config.CompatibilityLevelConfig.valueOf( - request.param("compatibility-level").value() + compatibilityLevel )) .build(); this.schemaRepository.updateConfig(cluster, subject, config); @@ -136,17 +154,15 @@ private Schema registerSchema(Request request, String cluster, String subject) t return register; } - public View render(Request request, String cluster, String subject, String tab) throws IOException, RestClientException { + private HttpResponse render(HttpRequest request, String cluster, String subject, String tab) throws IOException, RestClientException { return this.template( request, cluster, - Results - .html("schema") - .put("tab", tab) - .put("schema", this.schemaRepository.getLatestVersion(cluster, subject)) - .put("versions", this.schemaRepository.getAllVersions(cluster, subject)) - .put("config", this.schemaRepository.getConfig(cluster, subject)) - .put("compatibilityLevel", Schema.Config.getCompatibilityLevelConfigList()) + "tab", tab, + "schema", this.schemaRepository.getLatestVersion(cluster, subject), + "versions", this.schemaRepository.getAllVersions(cluster, subject), + "config", this.schemaRepository.getConfig(cluster, subject), + "compatibilityLevel", Schema.Config.getCompatibilityLevelConfigList() ); } } diff --git a/src/main/java/org/kafkahq/controllers/TopicController.java b/src/main/java/org/kafkahq/controllers/TopicController.java index 5b71d94d8..22ed2985e 100644 --- a/src/main/java/org/kafkahq/controllers/TopicController.java +++ b/src/main/java/org/kafkahq/controllers/TopicController.java @@ -1,14 +1,25 @@ package org.kafkahq.controllers; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; -import com.google.inject.Inject; -import org.apache.kafka.common.config.TopicConfig; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.MediaType; +import io.micronaut.http.MutableHttpResponse; +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Get; +import io.micronaut.http.annotation.Post; +import io.micronaut.http.sse.Event; +import io.micronaut.views.View; +import io.micronaut.views.freemarker.FreemarkerViewsRenderer; +import io.reactivex.Flowable; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; import org.codehaus.httpcache4j.uri.URIBuilder; -import org.jooby.*; -import org.jooby.mvc.GET; -import org.jooby.mvc.POST; -import org.jooby.mvc.Path; import org.kafkahq.models.Config; import org.kafkahq.models.Record; import org.kafkahq.models.Topic; @@ -16,129 +27,162 @@ import org.kafkahq.repositories.ConfigRepository; import org.kafkahq.repositories.RecordRepository; import org.kafkahq.repositories.TopicRepository; +import org.reactivestreams.Publisher; +import javax.inject.Inject; +import java.io.IOException; +import java.io.StringWriter; +import java.net.URI; import java.time.Instant; import java.util.*; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; -@Path("/{cluster}/topic") +@Slf4j +@Controller("${micronaut.context.path:}/{cluster}/topic") public class TopicController extends AbstractController { - - @Inject private TopicRepository topicRepository; - - @Inject private ConfigRepository configRepository; + private RecordRepository recordRepository; + private FreemarkerViewsRenderer freemarkerViewsRenderer; @Inject - private RecordRepository recordRepository; + public TopicController(TopicRepository topicRepository, ConfigRepository configRepository, RecordRepository recordRepository, FreemarkerViewsRenderer freemarkerViewsRenderer) { + this.topicRepository = topicRepository; + this.configRepository = configRepository; + this.recordRepository = recordRepository; + this.freemarkerViewsRenderer = freemarkerViewsRenderer; + } - @GET - public View list(Request request, String cluster, Optional search) throws ExecutionException, InterruptedException { + @View("topicList") + @Get + public HttpResponse list(HttpRequest request, String cluster, Optional search) throws ExecutionException, InterruptedException { return this.template( request, cluster, - Results - .html("topicList") - .put("search", search) - .put("topics", this.topicRepository.list(search)) + "search", search, + "topics", this.topicRepository.list(search) ); } - @GET - @Path("create") - public View create(Request request, String cluster) { + @View("topicCreate") + @Get("create") + public HttpResponse create(HttpRequest request, String cluster) { return this.template( request, - cluster, - Results - .html("topicCreate") + cluster ); } - @POST - @Path("create") - public void createSubmit(Request request, Response response, String cluster) throws Throwable { - List options = new ArrayList<>(); - Arrays - .asList(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.RETENTION_MS_CONFIG) - .forEach(s -> request - .param("configs[" + s + "]") - .toOptional() - .ifPresent(r -> options.add(new Config(s, r))) - ); - - this.toast(request, RequestHelper.runnableToToast(() -> + @Post(value = "create", consumes = MediaType.MULTIPART_FORM_DATA) + public HttpResponse createSubmit(HttpRequest request, + String cluster, + String name, + Integer partition, + Short replication, + Map configs) + throws Throwable + { + List options = configs + .entrySet() + .stream() + .map(r -> new AbstractMap.SimpleEntry<>( + r.getKey().replaceAll("(configs\\[)(.*)(])", "$2"), + r.getValue() + )) + .map(r -> new Config(r.getKey(), r.getValue())) + .collect(Collectors.toList()); + + MutableHttpResponse response = HttpResponse.redirect(new URI(("/" + cluster + "/topic"))); + + this.toast(response, RequestHelper.runnableToToast(() -> this.topicRepository.create( cluster, - request.param("name").value(), - request.param("partition").intValue(), - request.param("replication").shortValue(), + name, + partition, + replication, options ), - "Topic '" + request.param("name").value() + "' is created", - "Failed to create topic '" + request.param("name").value() + "'" + "Topic '" + name + "' is created", + "Failed to create topic '" + name + "'" )); - response.redirect("/" + cluster + "/topic"); + return response; } - @GET - @Path("{topicName}/produce") - public View produce(Request request, String cluster, String topicName) throws ExecutionException, InterruptedException { + @View("topicProduce") + @Get("{topicName}/produce") + public HttpResponse produce(HttpRequest request, String cluster, String topicName) throws ExecutionException, InterruptedException { Topic topic = this.topicRepository.findByName(topicName); return this.template( request, cluster, - Results - .html("topicProduce") - .put("topic", topic) + "topic", topic ); } - @POST - @Path("{topicName}/produce") - public void produceSubmit(Request request, Response response, String cluster, String topicName) throws Throwable { - List headersKey = request.param("headers[key][]").toList(); - List headersValue = request.param("headers[value][]").toList(); - - Map headers = new HashMap<>(); + @Post(value = "{topicName}/produce", consumes = MediaType.MULTIPART_FORM_DATA) + public HttpResponse produceSubmit(HttpRequest request, + String cluster, + String topicName, + String value, + Optional key, + Optional partition, + Optional timestamp, + Map> headers) + { + Map finalHeaders = new HashMap<>(); int i = 0; - for (String key : headersKey) { - if (key != null && !key.equals("") && headersValue.get(i) != null && !headersValue.get(i).equals("")) { - headers.put(key, headersValue.get(i)); + for (String headerKey : headers.get("headers[key]")) { + if (headerKey != null && !headerKey.equals("") && headers.get("headers[value]").get(i) != null && !headers.get("headers[value]").get(i).equals("")) { + finalHeaders.put(headerKey, headers.get("headers[value]").get(i)); } i++; } - this.toast(request, RequestHelper.runnableToToast(() -> { + MutableHttpResponse response = HttpResponse.redirect(request.getUri()); + + this.toast(response, RequestHelper.runnableToToast(() -> this.recordRepository.produce( cluster, topicName, - request.param("value").value(), - headers, - request.param("key").toOptional(), - request.param("partition").toOptional().filter(r -> !r.equals("")).map(Integer::valueOf), - request.param("timestamp") - .toOptional(String.class) - .filter(r -> !r.equals("")) - .map(s -> Instant.parse(s).toEpochMilli()) - ); - }, + value, + finalHeaders, + key.filter(r -> !r.equals("")), + partition, + timestamp.filter(r -> !r.equals("")).map(r -> Instant.parse(r).toEpochMilli()) + ) + , "Record created", "Failed to produce record" )); - response.redirect(request.path()); + return response; } - @GET - @Path("{topicName}") - public View home(Request request, String cluster, String topicName) throws ExecutionException, InterruptedException { + @View("topic") + @Get("{topicName}") + public HttpResponse home(HttpRequest request, + String cluster, + String topicName, + Optional after, + Optional partition, + Optional sort, + Optional timestamp, + Optional search) + throws ExecutionException, InterruptedException + { Topic topic = this.topicRepository.findByName(topicName); - RecordRepository.Options options = RequestHelper.buildRecordRepositoryOptions(request, cluster, topicName); + + RecordRepository.Options options = new RecordRepository.Options(cluster, topicName); + after.ifPresent(options::setAfter); + partition.ifPresent(options::setPartition); + sort.ifPresent(options::setSort); + timestamp.map(r -> Instant.parse(r).toEpochMilli()).ifPresent(options::setTimestamp); + after.ifPresent(options::setAfter); + search.ifPresent(options::setSearch); List data = new ArrayList<>(); @@ -146,7 +190,7 @@ public View home(Request request, String cluster, String topicName) throws Execu data = this.recordRepository.consume(options); } - URIBuilder uri = this.uri(request); + URIBuilder uri = URIBuilder.fromURI(request.getUri()); ImmutableMap.Builder partitionUrls = ImmutableSortedMap.naturalOrder(); partitionUrls.put((uri.getParametersByName("partition").size() > 0 ? uri.removeParameters("partition") : uri).toNormalizedURI(false).toString(), "All"); @@ -157,14 +201,12 @@ public View home(Request request, String cluster, String topicName) throws Execu return this.template( request, cluster, - Results - .html("topic") - .put("tab", "data") - .put("topic", topic) - .put("canDeleteRecords", topic.canDeleteRecords(configRepository)) - .put("datas", data) - .put("navbar", dataNavbar(options, uri, partitionUrls)) - .put("pagination", dataPagination(topic, options, data, uri)) + "tab", "data", + "topic", topic, + "canDeleteRecords", topic.canDeleteRecords(configRepository), + "datas", data, + "navbar", dataNavbar(options, uri, partitionUrls), + "pagination", dataPagination(topic, options, data, uri) ); } @@ -203,33 +245,31 @@ private ImmutableMap dataNavbar(RecordRepository.Options options .build(); } - @GET - @Path("{topicName}/{tab:(partitions|groups|configs|logs)}") - public View tab(Request request, String cluster, String topicName, String tab) throws ExecutionException, InterruptedException { + @View("topic") + @Get("{topicName}/{tab:(partitions|groups|configs|logs)}") + public HttpResponse tab(HttpRequest request, String cluster, String topicName, String tab) throws ExecutionException, InterruptedException { return this.render(request, cluster, topicName, tab); } - public View render(Request request, String cluster, String topicName, String tab) throws ExecutionException, InterruptedException { + private HttpResponse render(HttpRequest request, String cluster, String topicName, String tab) throws ExecutionException, InterruptedException { Topic topic = this.topicRepository.findByName(topicName); List configs = this.configRepository.findByTopic(topicName); return this.template( request, cluster, - Results - .html("topic") - .put("tab", tab) - .put("topic", topic) - .put("configs", configs) + "tab", tab, + "topic", topic, + "configs", configs ); } - @POST - @Path("{topicName}/{tab:configs}") - public void updateConfig(Request request, Response response, String cluster, String topicName) throws Throwable { - List updated = RequestHelper.updatedConfigs(request, this.configRepository.findByTopic(topicName)); + @Post(value = "{topicName}/configs", consumes = MediaType.MULTIPART_FORM_DATA) + public HttpResponse updateConfig(HttpRequest request, String cluster, String topicName, Map configs) throws Throwable { + List updated = ConfigRepository.updatedConfigs(configs, this.configRepository.findByTopic(topicName)); + MutableHttpResponse response = HttpResponse.redirect(request.getUri()); - this.toast(request, RequestHelper.runnableToToast(() -> { + this.toast(response, RequestHelper.runnableToToast(() -> { if (updated.size() == 0) { throw new IllegalArgumentException("No config to update"); } @@ -244,13 +284,14 @@ public void updateConfig(Request request, Response response, String cluster, Str "Failed to update topic '" + topicName + "' configs" )); - response.redirect(request.path()); + return response; } - @GET - @Path("{topicName}/deleteRecord") - public Result deleteRecord(Request request, String cluster, String topicName, Integer partition, String key) { - this.toast(request, RequestHelper.runnableToToast(() -> this.recordRepository.delete( + @Get("{topicName}/deleteRecord") + public HttpResponse deleteRecord(String cluster, String topicName, Integer partition, String key) { + MutableHttpResponse response = HttpResponse.ok(); + + this.toast(response, RequestHelper.runnableToToast(() -> this.recordRepository.delete( cluster, topicName, partition, @@ -260,18 +301,100 @@ public Result deleteRecord(Request request, String cluster, String topicName, In "Failed to delete record '" + key + "'" )); - return Results.ok(); + return response; } - @GET - @Path("{topicName}/delete") - public Result delete(Request request, String cluster, String topicName) { - this.toast(request, RequestHelper.runnableToToast(() -> + @Get("{topicName}/delete") + public HttpResponse delete(String cluster, String topicName) { + MutableHttpResponse response = HttpResponse.ok(); + + this.toast(response, RequestHelper.runnableToToast(() -> this.topicRepository.delete(cluster, topicName), "Topic '" + topicName + "' is deleted", "Failed to delete topic " + topicName )); - return Results.ok(); + return response; + } + + @Get("{topicName}/search/{search}") + public Publisher> sse(String cluster, + String topicName, + Optional after, + Optional partition, + Optional sort, + Optional timestamp, + Optional search) + throws ExecutionException, InterruptedException + { + Topic topic = topicRepository.findByName(topicName); + + RecordRepository.Options options = new RecordRepository.Options(cluster, topicName); + after.ifPresent(options::setAfter); + partition.ifPresent(options::setPartition); + sort.ifPresent(options::setSort); + timestamp.map(r -> Instant.parse(r).toEpochMilli()).ifPresent(options::setTimestamp); + after.ifPresent(options::setAfter); + search.ifPresent(options::setSearch); + + Map datas = new HashMap<>(); + datas.put("topic", topic); + datas.put("canDeleteRecords", topic.canDeleteRecords(configRepository)); + datas.put("clusterId", cluster); + datas.put("basePath", getBasePath()); + + return Flowable.unsafeCreate((emitter) -> { + RecordRepository.SearchConsumer searchConsumer = new RecordRepository.SearchConsumer() { + @Override + public void accept(RecordRepository.SearchEvent searchEvent) { + datas.put("datas", searchEvent.getRecords()); + + StringWriter stringWriter = new StringWriter(); + try { + freemarkerViewsRenderer.render("topicSearch", datas).writeTo(stringWriter); + } catch (IOException ignored) {} + + emitter.onNext(Event + .of(new SearchBody( + searchEvent.getOffsets(), + searchEvent.getProgress(), + stringWriter.toString() + )) + .name("searchBody") + ); + } + }; + + +// emitter.setCancellable(searchConsumer::close); + RecordRepository.SearchEnd end = null; + try { + end = recordRepository.search(options, searchConsumer); + } catch (ExecutionException | InterruptedException e) { + emitter.onError(e); + } + + emitter.onNext(Event + .of(end) + .name("searchEnd") + ); + + emitter.onComplete(); + }); + } + + @ToString + @EqualsAndHashCode + @Getter + @AllArgsConstructor + public static class SearchBody { + @JsonProperty("offsets") + private Map offsets = new HashMap<>(); + + @JsonProperty("progress") + private Map progress = new HashMap<>(); + + @JsonProperty("body") + private String body; } } diff --git a/src/main/java/org/kafkahq/middlewares/KafkaWrapperFilter.java b/src/main/java/org/kafkahq/middlewares/KafkaWrapperFilter.java new file mode 100644 index 000000000..f2e103128 --- /dev/null +++ b/src/main/java/org/kafkahq/middlewares/KafkaWrapperFilter.java @@ -0,0 +1,47 @@ +package org.kafkahq.middlewares; + +import io.micronaut.context.annotation.Value; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.MutableHttpResponse; +import io.micronaut.http.annotation.Filter; +import io.micronaut.http.filter.HttpServerFilter; +import io.micronaut.http.filter.ServerFilterChain; +import org.kafkahq.modules.KafkaModule; +import org.kafkahq.modules.KafkaWrapper; +import org.kafkahq.repositories.AbstractRepository; +import org.reactivestreams.Publisher; + +import javax.inject.Inject; +import java.util.Arrays; +import java.util.List; + +@Filter("/**") +public class KafkaWrapperFilter implements HttpServerFilter { + private final KafkaModule kafkaModule; + + @Inject + public KafkaWrapperFilter(KafkaModule kafkaModule) { + this.kafkaModule = kafkaModule; + } + + @Value("${micronaut.context.path}") + protected String basePath; + + @Override + public Publisher> doFilter(HttpRequest request, ServerFilterChain chain) { + String path = request.getPath(); + if (!this.basePath.equals("") && path.indexOf(this.basePath) == 0) { + path = path.substring(this.basePath.length()); + } + + List pathSplit = Arrays.asList(path.split("/")); + + // set cluster + if (pathSplit.size() >= 2) { + String clusterId = pathSplit.get(1); + AbstractRepository.setWrapper(new KafkaWrapper(kafkaModule, clusterId)); + } + + return chain.proceed(request); + } +} \ No newline at end of file diff --git a/src/main/java/org/kafkahq/modules/KafkaModule.java b/src/main/java/org/kafkahq/modules/KafkaModule.java index 4f0bf74c6..e9bb9befc 100644 --- a/src/main/java/org/kafkahq/modules/KafkaModule.java +++ b/src/main/java/org/kafkahq/modules/KafkaModule.java @@ -1,22 +1,20 @@ package org.kafkahq.modules; -import com.google.inject.Binder; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.typesafe.config.Config; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.RestService; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.jooby.Env; -import org.jooby.Jooby; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.kafkahq.configs.AbstractProperties; +import org.kafkahq.configs.Connection; +import org.kafkahq.configs.Default; +import javax.inject.Inject; +import javax.inject.Singleton; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -26,11 +24,13 @@ import java.util.stream.Collectors; @Singleton -public class KafkaModule implements Jooby.Module { - private static Logger logger = LoggerFactory.getLogger(KafkaModule.class); +@Slf4j +public class KafkaModule { + @Inject + private List connections; @Inject - private Config config; + private List defaults; public T debug(Callable task, String format, Object... arguments) throws ExecutionException, InterruptedException { long startTime = System.currentTimeMillis(); @@ -38,7 +38,7 @@ public T debug(Callable task, String format, Object... arguments) throws try { call = task.call(); - logger.debug("{} ms -> " + format, (System.currentTimeMillis() - startTime), arguments); + log.debug("{} ms -> " + format, (System.currentTimeMillis() - startTime), arguments); return call; } catch (InterruptedException | ExecutionException exception) { throw exception; @@ -48,47 +48,54 @@ public T debug(Callable task, String format, Object... arguments) throws } public List getClustersList() { - return this.config - .getConfig("kafka.connections") - .entrySet() + return this.connections .stream() - .map(entry -> entry.getKey().split("\\.")[0]) + .map(r -> r.getName().split("\\.")[0]) .distinct() .collect(Collectors.toList()); } - private Properties getConfigProperties(String path) { - Properties props = new Properties(); + private Connection getConnection(String cluster) { + return this.connections + .stream() + .filter(r -> r.getName().equals(cluster)) + .findFirst() + .get(); + } - if (this.config.hasPath(path)) { - this.config.getConfig(path) - .entrySet() - .forEach(config -> props.put(config.getKey(), config.getValue().unwrapped())); - } + private Properties getDefaultsProperties(List current, String type) { + Properties properties = new Properties(); - return props; + current + .stream() + .filter(r -> r.getName().equals(type)) + .forEach(r -> r.getProperties() + .forEach(properties::put) + ); + + return properties; } private Properties getConsumerProperties(String clusterId) { Properties props = new Properties(); - props.putAll(this.getConfigProperties("kafka.defaults.consumer")); - props.putAll(this.getConfigProperties("kafka.connections." + clusterId + ".properties")); + props.putAll(this.getDefaultsProperties(this.defaults, "consumer")); + props.putAll(this.getDefaultsProperties(this.connections, clusterId)); return props; } private Properties getProducerProperties(String clusterId) { Properties props = new Properties(); - props.putAll(this.getConfigProperties("kafka.defaults.producer")); - props.putAll(this.getConfigProperties("kafka.connections." + clusterId + ".properties")); + props.putAll(this.getDefaultsProperties(this.defaults, "producer")); + props.putAll(this.getDefaultsProperties(this.connections, clusterId)); return props; } private Properties getAdminProperties(String clusterId) { Properties props = new Properties(); - props.putAll(this.getConfigProperties("kafka.defaults.admin")); - props.putAll(this.getConfigProperties("kafka.connections." + clusterId + ".properties")); + props.putAll(this.getDefaultsProperties(this.defaults, "admin")); + props.putAll(this.getDefaultsProperties(this.connections, clusterId)); return props; } @@ -146,9 +153,11 @@ public KafkaProducer getProducer(String clusterId) { public RestService getRegistryRestClient(String clusterId) { if (!this.registryRestClient.containsKey(clusterId)) { - if (this.config.hasPath("kafka.connections." + clusterId + ".registry")) { + Connection connection = this.getConnection(clusterId); + + if (connection.getRegistry().isPresent()) { this.registryRestClient.put(clusterId, new RestService( - this.config.getString("kafka.connections." + clusterId + ".registry") + connection.getRegistry().get().toString() )); } } @@ -162,10 +171,4 @@ public SchemaRegistryClient getRegistryClient(String clusterId) { Integer.MAX_VALUE ); } - - @SuppressWarnings("NullableProblems") - @Override - public void configure(Env env, Config conf, Binder binder) { - binder.bind(KafkaModule.class).toInstance(new KafkaModule()); - } } diff --git a/src/main/java/org/kafkahq/modules/KafkaWrapper.java b/src/main/java/org/kafkahq/modules/KafkaWrapper.java index c4568af0d..d826966ee 100644 --- a/src/main/java/org/kafkahq/modules/KafkaWrapper.java +++ b/src/main/java/org/kafkahq/modules/KafkaWrapper.java @@ -1,6 +1,6 @@ package org.kafkahq.modules; -import com.google.inject.Inject; + import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -8,25 +8,19 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.requests.DescribeLogDirsResponse; -import org.jooby.scope.RequestScoped; import org.kafkahq.models.Partition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import javax.inject.Inject; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static java.util.stream.Collectors.*; -@RequestScoped public class KafkaWrapper { - private static Logger logger = LoggerFactory.getLogger(KafkaWrapper.class); - private KafkaModule kafkaModule; - private String clusterId; - + @Inject public KafkaWrapper(KafkaModule kafkaModule, String clusterId) { this.kafkaModule = kafkaModule; diff --git a/src/main/java/org/kafkahq/modules/RequestHelper.java b/src/main/java/org/kafkahq/modules/RequestHelper.java index b1ec4a0d6..8cddc4852 100644 --- a/src/main/java/org/kafkahq/modules/RequestHelper.java +++ b/src/main/java/org/kafkahq/modules/RequestHelper.java @@ -1,46 +1,13 @@ package org.kafkahq.modules; -import com.google.inject.Binder; -import com.google.inject.Singleton; -import com.typesafe.config.Config; -import org.jooby.Env; -import org.jooby.Jooby; -import org.jooby.Request; +import lombok.extern.slf4j.Slf4j; import org.kafkahq.controllers.AbstractController; -import org.kafkahq.controllers.TopicController; -import org.kafkahq.repositories.RecordRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.time.Instant; -import java.util.List; -import java.util.stream.Collectors; +import javax.inject.Singleton; @Singleton -public class RequestHelper implements Jooby.Module { - private transient static final Logger logger = LoggerFactory.getLogger(TopicController.class); - - public static RecordRepository.Options buildRecordRepositoryOptions(Request request, String cluster, String topic) { - RecordRepository.Options options = new RecordRepository.Options(cluster, topic); - - request.param("after").toOptional().ifPresent(options::setAfter); - request.param("partition").toOptional(Integer.class).ifPresent(options::setPartition); - request.param("sort").toOptional(RecordRepository.Options.Sort.class).ifPresent(options::setSort); - request.param("timestamp").toOptional(String.class).ifPresent(s -> options.setTimestamp(Instant.parse(s).toEpochMilli())); - request.param("search").toOptional(String.class).ifPresent(options::setSearch); - - return options; - } - - public static List updatedConfigs(Request request, List configs) { - return configs - .stream() - .filter(config -> !config.isReadOnly()) - .filter(config -> !(config.getValue() == null ? "" : config.getValue()).equals(request.param("configs[" + config.getName() + "]").value())) - .map(config -> config.withValue(request.param("configs[" + config.getName() + "]").value())) - .collect(Collectors.toList()); - } - +@Slf4j +public class RequestHelper { public static AbstractController.Toast runnableToToast(ResultStatusResponseRunnable callable, String successMessage, String failedMessage) { AbstractController.Toast.ToastBuilder builder = AbstractController.Toast.builder(); @@ -57,7 +24,7 @@ public static AbstractController.Toast runnableToToast(ResultStatusResponseRunna .message(exception.getCause() != null ? exception.getCause().getMessage() : exception.getMessage()) .type(AbstractController.Toast.Type.error); - logger.error(failedMessage != null ? failedMessage : cause, exception); + log.error(failedMessage != null ? failedMessage : cause, exception); } return builder.build(); @@ -66,10 +33,4 @@ public static AbstractController.Toast runnableToToast(ResultStatusResponseRunna public interface ResultStatusResponseRunnable { void run() throws Exception; } - - @SuppressWarnings("NullableProblems") - @Override - public void configure(Env env, Config conf, Binder binder) { - binder.bind(RequestHelper.class).toInstance(new RequestHelper()); - } } diff --git a/src/main/java/org/kafkahq/repositories/ClusterRepository.java b/src/main/java/org/kafkahq/repositories/ClusterRepository.java index a659c4fcb..2f06320ad 100644 --- a/src/main/java/org/kafkahq/repositories/ClusterRepository.java +++ b/src/main/java/org/kafkahq/repositories/ClusterRepository.java @@ -1,23 +1,13 @@ package org.kafkahq.repositories; -import com.google.inject.Binder; -import com.google.inject.Singleton; -import com.typesafe.config.Config; -import org.jooby.Env; -import org.jooby.Jooby; import org.kafkahq.models.Cluster; +import javax.inject.Singleton; import java.util.concurrent.ExecutionException; @Singleton -public class ClusterRepository extends AbstractRepository implements Jooby.Module { +public class ClusterRepository extends AbstractRepository { public Cluster get() throws ExecutionException, InterruptedException { return new Cluster(kafkaWrapper.describeCluster()); } - - @SuppressWarnings("NullableProblems") - @Override - public void configure(Env env, Config conf, Binder binder) { - binder.bind(ClusterRepository.class).toInstance(new ClusterRepository()); - } } diff --git a/src/main/java/org/kafkahq/repositories/ConfigRepository.java b/src/main/java/org/kafkahq/repositories/ConfigRepository.java index 738cf28ea..07016372a 100644 --- a/src/main/java/org/kafkahq/repositories/ConfigRepository.java +++ b/src/main/java/org/kafkahq/repositories/ConfigRepository.java @@ -1,22 +1,18 @@ package org.kafkahq.repositories; import com.google.common.collect.ImmutableMap; -import com.google.inject.Binder; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.typesafe.config.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.common.config.ConfigResource; -import org.jooby.Env; -import org.jooby.Jooby; import org.kafkahq.modules.KafkaModule; +import javax.inject.Inject; +import javax.inject.Singleton; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @Singleton -public class ConfigRepository extends AbstractRepository implements Jooby.Module { +public class ConfigRepository extends AbstractRepository { @Inject private KafkaModule kafkaModule; @@ -81,9 +77,17 @@ private void update(String clusterId, ConfigResource.Type type, String name, Lis .get(); } - @SuppressWarnings("NullableProblems") - @Override - public void configure(Env env, Config conf, Binder binder) { - binder.bind(ConfigRepository.class).toInstance(new ConfigRepository()); + public static List updatedConfigs(Map request, List configs) { + return configs + .stream() + .filter(config -> !config.isReadOnly()) + .filter(config -> request.containsKey("configs[" + config.getName() + "]")) + .filter(config -> { + String current = config.getValue() == null ? "" : config.getValue(); + + return !(current).equals(request.get("configs[" + config.getName() + "]")); + }) + .map(config -> config.withValue(request.get("configs[" + config.getName() + "]"))) + .collect(Collectors.toList()); } } diff --git a/src/main/java/org/kafkahq/repositories/ConsumerGroupRepository.java b/src/main/java/org/kafkahq/repositories/ConsumerGroupRepository.java index 425e9d10d..bfefb2343 100644 --- a/src/main/java/org/kafkahq/repositories/ConsumerGroupRepository.java +++ b/src/main/java/org/kafkahq/repositories/ConsumerGroupRepository.java @@ -1,30 +1,30 @@ package org.kafkahq.repositories; -import com.google.inject.Binder; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.typesafe.config.Config; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.jooby.Env; -import org.jooby.Jooby; import org.kafkahq.models.ConsumerGroup; import org.kafkahq.models.Partition; import org.kafkahq.modules.KafkaModule; +import javax.inject.Inject; +import javax.inject.Singleton; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @Singleton -public class ConsumerGroupRepository extends AbstractRepository implements Jooby.Module { - @Inject +public class ConsumerGroupRepository extends AbstractRepository { private KafkaModule kafkaModule; + @Inject + public ConsumerGroupRepository(KafkaModule kafkaModule) { + this.kafkaModule = kafkaModule; + } + public List list(Optional search) throws ExecutionException, InterruptedException { ArrayList list = new ArrayList<>(); @@ -103,10 +103,4 @@ public void updateOffsets(String clusterId, String name, Map list() throws ExecutionException, InterruptedException { ArrayList list = new ArrayList<>(); @@ -42,10 +38,4 @@ public List findByBroker(Integer brokerId) throws ExecutionException, In .filter(item -> item.getBrokerId().equals(brokerId)) .collect(Collectors.toList()); } - - @SuppressWarnings("NullableProblems") - @Override - public void configure(Env env, Config conf, Binder binder) { - binder.bind(LogDirRepository.class).toInstance(new LogDirRepository()); - } } diff --git a/src/main/java/org/kafkahq/repositories/RecordRepository.java b/src/main/java/org/kafkahq/repositories/RecordRepository.java index a9efb753b..c7b6610e7 100644 --- a/src/main/java/org/kafkahq/repositories/RecordRepository.java +++ b/src/main/java/org/kafkahq/repositories/RecordRepository.java @@ -3,27 +3,22 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; -import com.google.inject.Binder; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.typesafe.config.Config; import lombok.*; import lombok.experimental.Wither; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeader; import org.codehaus.httpcache4j.uri.URIBuilder; -import org.jooby.Env; -import org.jooby.Jooby; import org.kafkahq.models.Partition; import org.kafkahq.models.Record; import org.kafkahq.models.Topic; import org.kafkahq.modules.KafkaModule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import javax.inject.Inject; +import javax.inject.Singleton; import java.io.Closeable; import java.io.IOException; import java.util.*; @@ -33,9 +28,8 @@ import java.util.stream.Stream; @Singleton -public class RecordRepository extends AbstractRepository implements Jooby.Module { - private static Logger logger = LoggerFactory.getLogger(RecordRepository.class); - +@Slf4j +public class RecordRepository extends AbstractRepository { private final KafkaModule kafkaModule; private final TopicRepository topicRepository; private final SchemaRegistryRepository schemaRegistryRepository; @@ -69,7 +63,7 @@ private List consumeOldest(Topic topic, Options options) { partitions.forEach(consumer::seek); partitions.forEach((topicPartition, first) -> - logger.trace( + log.trace( "Consume [topic: {}] [partition: {}] [start: {}]", topicPartition.topic(), topicPartition.partition(), @@ -335,7 +329,6 @@ public void delete(String clusterId, String topic, Integer partition, byte[] key )).get(); } - public SearchEnd search(Options options, SearchConsumer callback) throws ExecutionException, InterruptedException { KafkaConsumer consumer = this.kafkaModule.getConsumer(options.clusterId); Topic topic = topicRepository.findByName(options.topic); @@ -350,7 +343,7 @@ public SearchEnd search(Options options, SearchConsumer callback) throws Executi consumer.assign(partitions.keySet()); partitions.forEach(consumer::seek); partitions.forEach((topicPartition, first) -> - logger.trace( + log.trace( "Search [topic: {}] [partition: {}] [start: {}]", topicPartition.topic(), topicPartition.partition(), @@ -382,7 +375,7 @@ public SearchEnd search(Options options, SearchConsumer callback) throws Executi if (searchFilter(options, record)) { list.add(newRecord(record, options)); - logger.trace( + log.trace( "Record [topic: {}] [partition: {}] [offset: {}] [key: {}]", record.topic(), record.partition(), @@ -428,7 +421,7 @@ public abstract static class SearchConsumer implements Closeable, Consumer kafkaAvroDeserializers = new HashMap<>(); @@ -177,10 +173,4 @@ public KafkaAvroDeserializer getKafkaAvroDeserializer(String clusterId) { return this.kafkaAvroDeserializers.get(clusterId); } - - @SuppressWarnings("NullableProblems") - @Override - public void configure(Env env, Config conf, Binder binder) { - binder.bind(SchemaRegistryRepository.class); - } } diff --git a/src/main/java/org/kafkahq/repositories/TopicRepository.java b/src/main/java/org/kafkahq/repositories/TopicRepository.java index f5c301018..563def0a9 100644 --- a/src/main/java/org/kafkahq/repositories/TopicRepository.java +++ b/src/main/java/org/kafkahq/repositories/TopicRepository.java @@ -1,29 +1,22 @@ package org.kafkahq.repositories; -import com.google.inject.Binder; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.typesafe.config.Config; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.admin.TopicListing; -import org.jooby.Env; -import org.jooby.Jooby; import org.kafkahq.models.Partition; import org.kafkahq.models.Topic; import org.kafkahq.modules.KafkaModule; +import javax.inject.Inject; +import javax.inject.Singleton; import java.util.*; import java.util.concurrent.ExecutionException; @Singleton -public class TopicRepository extends AbstractRepository implements Jooby.Module { +public class TopicRepository extends AbstractRepository { private KafkaModule kafkaModule; - private ConsumerGroupRepository consumerGroupRepository; - private LogDirRepository logDirRepository; - private ConfigRepository configRepository; @Inject @@ -93,10 +86,4 @@ public void delete(String clusterId, String name) throws ExecutionException, Int .all() .get(); } - - @SuppressWarnings("NullableProblems") - @Override - public void configure(Env env, Config conf, Binder binder) { - binder.bind(TopicRepository.class).asEagerSingleton(); - } } diff --git a/src/main/java/org/kafkahq/utils/Debug.java b/src/main/java/org/kafkahq/utils/Debug.java index 1fc2cc974..b523a9e87 100644 --- a/src/main/java/org/kafkahq/utils/Debug.java +++ b/src/main/java/org/kafkahq/utils/Debug.java @@ -6,7 +6,7 @@ import org.slf4j.LoggerFactory; public class Debug { - private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getStackTrace()[2].getClassName()); + private static final Logger log = LoggerFactory.getLogger(Thread.currentThread().getStackTrace()[2].getClassName()); private static final String name = Thread.currentThread().getStackTrace()[2].getClassName(); private static String caller() { @@ -35,7 +35,7 @@ public static void time(String message, Runnable runnable, Object... argumen runnable.run(); - logger.trace("[" + (System.currentTimeMillis() - start ) + " ms] " + message, arguments); + log.trace("[" + (System.currentTimeMillis() - start ) + " ms] " + message, arguments); } @SafeVarargs @@ -49,10 +49,10 @@ public static void print(T... args) { @SafeVarargs public static void log(T... args) { - logger.trace("\033[44;30m " + caller() + " \033[0m"); + log.trace("\033[44;30m " + caller() + " \033[0m"); for (Object arg : args) { - logger.trace("\033[46;30m " + arg.getClass().getName() + " \033[0m " + toJson(arg)); + log.trace("\033[46;30m " + arg.getClass().getName() + " \033[0m " + toJson(arg)); } } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 000000000..4f56f4041 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,26 @@ +micronaut: + application: + name: kafkahq + io: + watch: + paths: src/main + restart: true + context: + path: "" + router: + static-resources: + swagger: + paths: classpath:static + mapping: "${micronaut.context.path:}/static/**" + +jackson: + module-scan: false + +kafka: + defaults: + consumer: + properties: + max.poll.records: 50 + isolation.level: read_committed + group.id: KafkaHQ + enable.auto.commit: "false" diff --git a/conf/logback.xml b/src/main/resources/logback.xml similarity index 52% rename from conf/logback.xml rename to src/main/resources/logback.xml index 67f496549..a1271b0d3 100644 --- a/conf/logback.xml +++ b/src/main/resources/logback.xml @@ -1,5 +1,10 @@ - + + + + + true + System.out true @@ -12,7 +17,7 @@ DENY - %d{ISO8601} %highlight(%5.5level) %magenta(%-10.10thread) %cyan(%-26.26logger{26}) %msg%n + ${pattern} @@ -23,28 +28,14 @@ WARN - %d{ISO8601} %highlight(%5.5level) %magenta(%-10.10thread) %cyan(%-26.26logger{26}) %msg%n + ${pattern} - + - - - - - - - - - - - - - - - - + + \ No newline at end of file diff --git a/public/blocks/configs.ftl b/src/main/resources/views/blocks/configs.ftl similarity index 100% rename from public/blocks/configs.ftl rename to src/main/resources/views/blocks/configs.ftl diff --git a/public/blocks/group/members.ftl b/src/main/resources/views/blocks/group/members.ftl similarity index 100% rename from public/blocks/group/members.ftl rename to src/main/resources/views/blocks/group/members.ftl diff --git a/public/blocks/group/topic.ftl b/src/main/resources/views/blocks/group/topic.ftl similarity index 100% rename from public/blocks/group/topic.ftl rename to src/main/resources/views/blocks/group/topic.ftl diff --git a/public/blocks/navbar-search.ftl b/src/main/resources/views/blocks/navbar-search.ftl similarity index 100% rename from public/blocks/navbar-search.ftl rename to src/main/resources/views/blocks/navbar-search.ftl diff --git a/public/blocks/topic/data.ftl b/src/main/resources/views/blocks/topic/data.ftl similarity index 98% rename from public/blocks/topic/data.ftl rename to src/main/resources/views/blocks/topic/data.ftl index 354f97094..f36748bbc 100644 --- a/public/blocks/topic/data.ftl +++ b/src/main/resources/views/blocks/topic/data.ftl @@ -1,3 +1,4 @@ + <#-- @ftlvariable name="datas" type="java.util.List>" --> <#-- @ftlvariable name="navbar" type="java.util.Map>" --> <#-- @ftlvariable name="basePath" type="java.lang.String" --> @@ -16,7 +17,7 @@ \ No newline at end of file diff --git a/public/blocks/topic/dataBody.ftl b/src/main/resources/views/blocks/topic/dataBody.ftl similarity index 100% rename from public/blocks/topic/dataBody.ftl rename to src/main/resources/views/blocks/topic/dataBody.ftl diff --git a/public/blocks/topic/pagination.ftl b/src/main/resources/views/blocks/topic/pagination.ftl similarity index 97% rename from public/blocks/topic/pagination.ftl rename to src/main/resources/views/blocks/topic/pagination.ftl index 1c48d44c4..a42643501 100644 --- a/public/blocks/topic/pagination.ftl +++ b/src/main/resources/views/blocks/topic/pagination.ftl @@ -1,3 +1,5 @@ +<#ftl output_format="HTML"> + <#-- @ftlvariable name="pagination" type="java.util.Map" --> <#-- @ftlvariable name="basePath" type="java.lang.String" --> diff --git a/public/blocks/topic/partitions.ftl b/src/main/resources/views/blocks/topic/partitions.ftl similarity index 93% rename from public/blocks/topic/partitions.ftl rename to src/main/resources/views/blocks/topic/partitions.ftl index 7b7df2906..67798afe5 100644 --- a/public/blocks/topic/partitions.ftl +++ b/src/main/resources/views/blocks/topic/partitions.ftl @@ -2,8 +2,8 @@ <#-- @ftlvariable name="topic" type="org.kafkahq.models.Topic" --> <#-- @ftlvariable name="basePath" type="java.lang.String" --> -<#import "/includes/node.ftl" as nodeTemplate> -<#import "/includes/functions.ftl" as functions> +<#import "../../includes/node.ftl" as nodeTemplate> +<#import "../../includes/functions.ftl" as functions> diff --git a/public/group.ftl b/src/main/resources/views/group.ftl similarity index 89% rename from public/group.ftl rename to src/main/resources/views/group.ftl index 8a926b166..ec1eb6b00 100644 --- a/public/group.ftl +++ b/src/main/resources/views/group.ftl @@ -3,7 +3,7 @@ <#-- @ftlvariable name="tab" type="java.lang.String" --> <#-- @ftlvariable name="basePath" type="java.lang.String" --> -<#import "/includes/template.ftl" as template> +<#import "includes/template.ftl" as template> <@template.header "Consumer Group: " + group.getId(), "group" /> @@ -24,13 +24,13 @@
<#if tab == "topics">
- <#include "/blocks/group/topic.ftl" /> + <#include "blocks/group/topic.ftl" />
<#if tab == "members">
- <#include "/blocks/group/members.ftl" /> + <#include "blocks/group/members.ftl" />
diff --git a/public/groupList.ftl b/src/main/resources/views/groupList.ftl similarity index 59% rename from public/groupList.ftl rename to src/main/resources/views/groupList.ftl index 277e8605d..fd21783ba 100644 --- a/public/groupList.ftl +++ b/src/main/resources/views/groupList.ftl @@ -1,11 +1,11 @@ <#-- @ftlvariable name="groups" type="java.util.ArrayList" --> -<#import "/includes/template.ftl" as template> -<#import "/includes/group.ftl" as groupTemplate> +<#import "includes/template.ftl" as template> +<#import "includes/group.ftl" as groupTemplate> <@template.header "Consumer Groups", "group" /> -<#include "/blocks/navbar-search.ftl" /> +<#include "blocks/navbar-search.ftl" /> <@groupTemplate.table groups /> diff --git a/public/groupUpdate.ftl b/src/main/resources/views/groupUpdate.ftl similarity index 96% rename from public/groupUpdate.ftl rename to src/main/resources/views/groupUpdate.ftl index c476e366a..d44a2a5b2 100644 --- a/public/groupUpdate.ftl +++ b/src/main/resources/views/groupUpdate.ftl @@ -2,8 +2,8 @@ <#-- @ftlvariable name="group" type="org.kafkahq.models.ConsumerGroup" --> <#-- @ftlvariable name="basePath" type="java.lang.String" --> -<#import "/includes/template.ftl" as template> -<#import "/includes/functions.ftl" as functions> +<#import "includes/template.ftl" as template> +<#import "includes/functions.ftl" as functions> <@template.header "Update offsets: " + group.getId(), "group" /> diff --git a/public/includes/functions.ftl b/src/main/resources/views/includes/functions.ftl similarity index 100% rename from public/includes/functions.ftl rename to src/main/resources/views/includes/functions.ftl diff --git a/public/includes/group.ftl b/src/main/resources/views/includes/group.ftl similarity index 100% rename from public/includes/group.ftl rename to src/main/resources/views/includes/group.ftl diff --git a/public/includes/log.ftl b/src/main/resources/views/includes/log.ftl similarity index 96% rename from public/includes/log.ftl rename to src/main/resources/views/includes/log.ftl index 7536e5fb3..cea7c33cc 100644 --- a/public/includes/log.ftl +++ b/src/main/resources/views/includes/log.ftl @@ -2,7 +2,7 @@ <#-- @ftlvariable name="basePath" type="java.lang.String" --> <#import "node.ftl" as nodeTemplate> -<#import "/includes/functions.ftl" as functions> +<#import "functions.ftl" as functions> <#macro table logs> <#-- @ftlvariable name="logs" type="java.util.List" --> diff --git a/public/includes/node.ftl b/src/main/resources/views/includes/node.ftl similarity index 100% rename from public/includes/node.ftl rename to src/main/resources/views/includes/node.ftl diff --git a/public/includes/schema.ftl b/src/main/resources/views/includes/schema.ftl similarity index 100% rename from public/includes/schema.ftl rename to src/main/resources/views/includes/schema.ftl diff --git a/public/includes/template.ftl b/src/main/resources/views/includes/template.ftl similarity index 98% rename from public/includes/template.ftl rename to src/main/resources/views/includes/template.ftl index 1d2c8db54..2f702fc4d 100644 --- a/public/includes/template.ftl +++ b/src/main/resources/views/includes/template.ftl @@ -1,3 +1,5 @@ +<#ftl output_format="HTML"> + <#-- @ftlvariable name="tab" type="java.lang.String" --> <#-- @ftlvariable name="clusters" type="java.util.List" --> <#-- @ftlvariable name="clusterId" type="java.lang.String" --> @@ -9,15 +11,13 @@ + ${title} | KafkaHQ - <#if liveReload?? > - ${liveReload?no_esc} - diff --git a/public/node.ftl b/src/main/resources/views/node.ftl similarity index 86% rename from public/node.ftl rename to src/main/resources/views/node.ftl index ac037c8b8..bd0809ef0 100644 --- a/public/node.ftl +++ b/src/main/resources/views/node.ftl @@ -1,3 +1,5 @@ +<#ftl output_format="HTML"> + <#-- @ftlvariable name="clusterId" type="java.lang.String" --> <#-- @ftlvariable name="node" type="org.kafkahq.models.Node" --> <#-- @ftlvariable name="configs" type="java.util.ArrayList" --> @@ -5,9 +7,9 @@ <#-- @ftlvariable name="tab" type="java.lang.String" --> <#-- @ftlvariable name="basePath" type="java.lang.String" --> -<#import "/includes/template.ftl" as template> -<#import "/includes/group.ftl" as groupTemplate> -<#import "/includes/log.ftl" as logTemplate> +<#import "includes/template.ftl" as template> +<#import "includes/group.ftl" as groupTemplate> +<#import "includes/log.ftl" as logTemplate> <@template.header "Node: " + node.getId()?c, "node" /> @@ -28,7 +30,7 @@
<#if tab == "configs">
- <#include "/blocks/configs.ftl" /> + <#include "blocks/configs.ftl" />
diff --git a/public/nodeList.ftl b/src/main/resources/views/nodeList.ftl similarity index 94% rename from public/nodeList.ftl rename to src/main/resources/views/nodeList.ftl index d0959067a..386ce97f8 100644 --- a/public/nodeList.ftl +++ b/src/main/resources/views/nodeList.ftl @@ -2,8 +2,8 @@ <#-- @ftlvariable name="cluster" type="org.kafkahq.models.Cluster" --> <#-- @ftlvariable name="basePath" type="java.lang.String" --> -<#import "/includes/template.ftl" as template> -<#import "/includes/node.ftl" as nodeTemplate> +<#import "includes/template.ftl" as template> +<#import "includes/node.ftl" as nodeTemplate> <@template.header "Nodes", "node" /> diff --git a/public/schema.ftl b/src/main/resources/views/schema.ftl similarity index 93% rename from public/schema.ftl rename to src/main/resources/views/schema.ftl index f8a7255f4..92ff0d7a1 100644 --- a/public/schema.ftl +++ b/src/main/resources/views/schema.ftl @@ -5,8 +5,8 @@ <#-- @ftlvariable name="config" type="org.kafkahq.models.Schema.Config" --> <#-- @ftlvariable name="versions" type="java.util.List" --> -<#import "/includes/template.ftl" as template> -<#import "/includes/schema.ftl" as schemaTemplate> +<#import "includes/template.ftl" as template> +<#import "includes/schema.ftl" as schemaTemplate> <@template.header "Schema: " + schema.getSubject(), "schema" /> diff --git a/public/schemaCreate.ftl b/src/main/resources/views/schemaCreate.ftl similarity index 75% rename from public/schemaCreate.ftl rename to src/main/resources/views/schemaCreate.ftl index 40e2c958e..05eed0517 100644 --- a/public/schemaCreate.ftl +++ b/src/main/resources/views/schemaCreate.ftl @@ -2,8 +2,8 @@ <#-- @ftlvariable name="basePath" type="java.lang.String" --> <#-- @ftlvariable name="config" type="org.kafkahq.models.Schema.Config" --> -<#import "/includes/template.ftl" as template> -<#import "/includes/schema.ftl" as schemaTemplate> +<#import "includes/template.ftl" as template> +<#import "includes/schema.ftl" as schemaTemplate> <@template.header "Create a schema", "schema" /> diff --git a/public/schemaList.ftl b/src/main/resources/views/schemaList.ftl similarity index 82% rename from public/schemaList.ftl rename to src/main/resources/views/schemaList.ftl index 27507ef18..c23a49f43 100644 --- a/public/schemaList.ftl +++ b/src/main/resources/views/schemaList.ftl @@ -2,8 +2,8 @@ <#-- @ftlvariable name="schemas" type="java.util.List" --> <#-- @ftlvariable name="basePath" type="java.lang.String" --> -<#import "/includes/template.ftl" as template> -<#import "/includes/schema.ftl" as schemaTemplate> +<#import "includes/template.ftl" as template> +<#import "includes/schema.ftl" as schemaTemplate> <@template.header "Schema Registry", "schema" /> diff --git a/public/topic.ftl b/src/main/resources/views/topic.ftl similarity index 89% rename from public/topic.ftl rename to src/main/resources/views/topic.ftl index 1bd133e4d..5b76ab7b5 100644 --- a/public/topic.ftl +++ b/src/main/resources/views/topic.ftl @@ -4,9 +4,9 @@ <#-- @ftlvariable name="tab" type="java.lang.String" --> <#-- @ftlvariable name="basePath" type="java.lang.String" --> -<#import "/includes/template.ftl" as template> -<#import "/includes/group.ftl" as groupTemplate> -<#import "/includes/log.ftl" as logTemplate> +<#import "includes/template.ftl" as template> +<#import "includes/group.ftl" as groupTemplate> +<#import "includes/log.ftl" as logTemplate> <@template.header "Topic: " + topic.getName(), "topic" /> @@ -42,13 +42,13 @@
<#if tab == "data">
- <#include "/blocks/topic/data.ftl" /> + <#include "blocks/topic/data.ftl" />
<#if tab == "partitions">
- <#include "/blocks/topic/partitions.ftl" /> + <#include "blocks/topic/partitions.ftl" />
@@ -60,7 +60,7 @@ <#if tab == "configs">
- <#include "/blocks/configs.ftl" /> + <#include "blocks/configs.ftl" />
diff --git a/public/topicCreate.ftl b/src/main/resources/views/topicCreate.ftl similarity index 96% rename from public/topicCreate.ftl rename to src/main/resources/views/topicCreate.ftl index b039bf316..7403435fa 100644 --- a/public/topicCreate.ftl +++ b/src/main/resources/views/topicCreate.ftl @@ -1,5 +1,5 @@ -<#import "/includes/template.ftl" as template> -<#import "/includes/functions.ftl" as functions> +<#import "includes/template.ftl" as template> +<#import "includes/functions.ftl" as functions> <@template.header "Create a topic", "topic" /> diff --git a/public/topicList.ftl b/src/main/resources/views/topicList.ftl similarity index 96% rename from public/topicList.ftl rename to src/main/resources/views/topicList.ftl index 6489bd184..0327e6b3a 100644 --- a/public/topicList.ftl +++ b/src/main/resources/views/topicList.ftl @@ -2,12 +2,12 @@ <#-- @ftlvariable name="topics" type="java.util.ArrayList" --> <#-- @ftlvariable name="basePath" type="java.lang.String" --> -<#import "/includes/template.ftl" as template> -<#import "/includes/functions.ftl" as functions> +<#import "includes/template.ftl" as template> +<#import "includes/functions.ftl" as functions> <@template.header "Topics", "topic" /> -<#include "/blocks/navbar-search.ftl" /> +<#include "blocks/navbar-search.ftl" />
diff --git a/public/topicProduce.ftl b/src/main/resources/views/topicProduce.ftl similarity index 91% rename from public/topicProduce.ftl rename to src/main/resources/views/topicProduce.ftl index 74062e3d4..0c29eeafe 100644 --- a/public/topicProduce.ftl +++ b/src/main/resources/views/topicProduce.ftl @@ -1,7 +1,7 @@ <#-- @ftlvariable name="topic" type="org.kafkahq.models.Topic" --> -<#import "/includes/template.ftl" as template> -<#import "/includes/functions.ftl" as functions> +<#import "includes/template.ftl" as template> +<#import "includes/functions.ftl" as functions> <@template.header "Produce to " + topic.getName(), "topic" /> @@ -27,8 +27,8 @@
- - + +
diff --git a/public/topicSearch.ftl b/src/main/resources/views/topicSearch.ftl similarity index 89% rename from public/topicSearch.ftl rename to src/main/resources/views/topicSearch.ftl index a2d4638ef..dc21d4cf8 100644 --- a/public/topicSearch.ftl +++ b/src/main/resources/views/topicSearch.ftl @@ -4,4 +4,4 @@ <#-- @ftlvariable name="tab" type="java.lang.String" --> <#-- @ftlvariable name="basePath" type="java.lang.String" --> -<#include "/blocks/topic/dataBody.ftl" /> +<#include "blocks/topic/dataBody.ftl" /> diff --git a/src/test/java/org/kafkahq/BaseTest.java b/src/test/java/org/kafkahq/BaseTest.java index 6a70e26b7..62e1f2b09 100644 --- a/src/test/java/org/kafkahq/BaseTest.java +++ b/src/test/java/org/kafkahq/BaseTest.java @@ -2,20 +2,16 @@ import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; +import lombok.extern.slf4j.Slf4j; import org.apache.curator.test.InstanceSpec; import org.junit.AfterClass; import org.junit.BeforeClass; import org.kafkahq.modules.KafkaModule; import org.kafkahq.modules.KafkaWrapper; import org.kafkahq.repositories.AbstractRepository; -import org.kafkahq.repositories.RecordRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; +@Slf4j public class BaseTest { - protected static Logger logger = LoggerFactory.getLogger(RecordRepository.class); protected static App app; private static KafkaTestCluster cluster; @@ -25,7 +21,7 @@ public static void setup() throws Exception { KafkaTestCluster.ConnectionString connectionString = KafkaTestCluster.readClusterInfo(); if (connectionString != null) { - logger.info("Kafka server reused on {}", connectionString.getKafka()); + log.info("Kafka server reused on {}", connectionString.getKafka()); } else { cluster = new KafkaTestCluster(false); cluster.run(); diff --git a/src/test/java/org/kafkahq/KafkaTestCluster.java b/src/test/java/org/kafkahq/KafkaTestCluster.java index 8c855439b..fdea8f8fc 100644 --- a/src/test/java/org/kafkahq/KafkaTestCluster.java +++ b/src/test/java/org/kafkahq/KafkaTestCluster.java @@ -11,6 +11,7 @@ import com.yammer.metrics.core.Stoppable; import lombok.Builder; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.producer.KafkaProducer; @@ -19,9 +20,6 @@ import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.kafkahq.clusters.EmbeddedSingleNodeKafkaCluster; -import org.kafkahq.repositories.RecordRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.slf4j.bridge.SLF4JBridgeHandler; import java.io.IOException; @@ -31,6 +29,7 @@ import java.util.*; import java.util.concurrent.ExecutionException; +@Slf4j public class KafkaTestCluster implements Runnable, Stoppable { private static final Path CS_PATH = Paths.get(System.getProperty("java.io.tmpdir"), "/kafkahq-cs.json"); @@ -44,8 +43,6 @@ public class KafkaTestCluster implements Runnable, Stoppable { public static final String TOPIC_STREAM_MAP = "stream-map"; public static final String TOPIC_STREAM_COUNT = "stream-count"; - private static Logger logger = LoggerFactory.getLogger(RecordRepository.class); - private EmbeddedSingleNodeKafkaCluster kafkaCluster; private KafkaTestUtils testUtils; private boolean reuse; @@ -124,8 +121,8 @@ public void run() { try { kafkaCluster.start(); - logger.info("Kafka Server started on {}", kafkaCluster.bootstrapServers()); - logger.info("Kafka Schema registry started on {}", kafkaCluster.schemaRegistryUrl()); + log.info("Kafka Server started on {}", kafkaCluster.bootstrapServers()); + log.info("Kafka Schema registry started on {}", kafkaCluster.schemaRegistryUrl()); connectionString = ConnectionString.builder() .kafka(kafkaCluster.bootstrapServers()) @@ -140,17 +137,17 @@ public void run() { } injectTestData(); - logger.info("Test data injected"); + log.info("Test data injected"); Thread.sleep(5000); - logger.info("Test data injected sleep done"); + log.info("Test data injected sleep done"); if (reuse) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { Files.delete(CS_PATH); } catch (Exception e) { - logger.error("Can't delete CS file", e); + log.error("Can't delete CS file", e); } })); } @@ -209,7 +206,7 @@ private void injectTestData() throws InterruptedException, ExecutionException { TOPIC_STREAM_IN, 1 ); - logger.debug("Stream started"); + log.debug("Stream started"); // compacted topic testUtils.createTopic(TOPIC_COMPACTED, 3, (short) 1); @@ -233,25 +230,25 @@ private void injectTestData() throws InterruptedException, ExecutionException { Thread.sleep(10L); testUtils.produceRecords(randomDatas(50, 0), TOPIC_COMPACTED, partition); } - logger.debug("Compacted topic created"); + log.debug("Compacted topic created"); // empty topic testUtils.createTopic(TOPIC_EMPTY, 12, (short) 1); - logger.debug("Empty topic created"); + log.debug("Empty topic created"); // random data testUtils.createTopic(TOPIC_RANDOM, 3, (short) 1); for (int partition = 0; partition < 3; partition++) { testUtils.produceRecords(randomDatas(100, 0), TOPIC_RANDOM, partition); } - logger.debug("Random topic created"); + log.debug("Random topic created"); // huge data testUtils.createTopic(TOPIC_HUGE, 3, (short) 1); for (int partition = 0; partition < 3; partition++) { testUtils.produceRecords(randomDatas(1000, 0), TOPIC_HUGE, partition); } - logger.debug("Huge topic created"); + log.debug("Huge topic created"); } private static Map randomDatas(int size, Integer start) { diff --git a/src/test/java/org/kafkahq/StreamTest.java b/src/test/java/org/kafkahq/StreamTest.java index ccd053d1a..276a551cd 100644 --- a/src/test/java/org/kafkahq/StreamTest.java +++ b/src/test/java/org/kafkahq/StreamTest.java @@ -4,6 +4,7 @@ import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -17,18 +18,14 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.*; -import org.kafkahq.repositories.RecordRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.util.Collections; import java.util.Properties; +@Slf4j public class StreamTest implements Runnable, Stoppable { - private static Logger logger = LoggerFactory.getLogger(RecordRepository.class); - private KafkaStreams streams; private String bootstrapServers; private String registryUrl; @@ -62,12 +59,12 @@ public void run() { private void start(KafkaStreams streams) { streams.setUncaughtExceptionHandler((thread, ex) -> { - logger.error("Uncaught exception in " + thread.getName() + ", closing Kafka Streams !", ex); + log.error("Uncaught exception in " + thread.getName() + ", closing Kafka Streams !", ex); System.exit(1); }); streams.setStateListener((newState, oldState) -> - logger.debug("Switching from {} to {} state", oldState, newState) + log.debug("Switching from {} to {} state", oldState, newState) ); streams.start(); diff --git a/src/test/java/org/kafkahq/repositories/RecordRepositoryTest.java b/src/test/java/org/kafkahq/repositories/RecordRepositoryTest.java index 51ace1cbb..676852288 100644 --- a/src/test/java/org/kafkahq/repositories/RecordRepositoryTest.java +++ b/src/test/java/org/kafkahq/repositories/RecordRepositoryTest.java @@ -1,5 +1,6 @@ package org.kafkahq.repositories; +import lombok.extern.slf4j.Slf4j; import org.codehaus.httpcache4j.uri.URIBuilder; import org.junit.Test; import org.kafkahq.BaseTest; @@ -14,6 +15,7 @@ import static org.junit.Assert.assertEquals; +@Slf4j public class RecordRepositoryTest extends BaseTest { private final RecordRepository repository = app.require(RecordRepository.class); @@ -122,7 +124,7 @@ private List consumeAllRecord(RecordRepository.Options options) throws E List datas = repository.consume(options); all.addAll(datas); - datas.forEach(record -> logger.debug( + datas.forEach(record -> log.debug( "Records [Topic: {}] [Partition: {}] [Offset: {}] [Key: {}] [Value: {}]", record.getTopic(), record.getPartition(), @@ -130,7 +132,7 @@ private List consumeAllRecord(RecordRepository.Options options) throws E record.getKey(), record.getValue() )); - logger.info("Consume {} records", datas.size()); + log.info("Consume {} records", datas.size()); URIBuilder after = options.after(datas, URIBuilder.empty()); diff --git a/conf/logback-test.xml b/src/test/resources/logback.xml similarity index 75% rename from conf/logback-test.xml rename to src/test/resources/logback.xml index cd1d7b7be..fa8882f9c 100644 --- a/conf/logback-test.xml +++ b/src/test/resources/logback.xml @@ -1,10 +1,9 @@ - + + - - - true - + + true System.out @@ -18,7 +17,7 @@ DENY - %d{ISO8601} %highlight(%5.5level) %magenta(%-10.10thread) %cyan(%-26.26logger{26}) %msg%n + ${pattern} @@ -29,7 +28,7 @@ WARN - %d{ISO8601} %highlight(%5.5level) %magenta(%-10.10thread) %cyan(%-26.26logger{26}) %msg%n + ${pattern} @@ -38,8 +37,6 @@ - - @@ -48,5 +45,4 @@ - diff --git a/webpack.config.js b/webpack.config.js index 33f37fc67..e965b3143 100644 --- a/webpack.config.js +++ b/webpack.config.js @@ -8,7 +8,7 @@ const ExtractTextPlugin = require('extract-text-webpack-plugin'); * Vars **********************************************************************************************************************/ const srcDirectory = path.join(__dirname, 'assets'); -const dstDirectory = path.join(__dirname, 'public/static'); +const dstDirectory = path.join(__dirname, 'src/main/resources/static'); /**********************************************************************************************************************\ * Base @@ -41,7 +41,6 @@ module.exports = (env, argv) => { assets: [ path.join(srcDirectory, 'css/assets.scss'), ], - }, output: { path: dstDirectory,