diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..eac51fe0c --- /dev/null +++ b/.gitignore @@ -0,0 +1,36 @@ +### Java +*.class +*.log +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar +hs_err_pid* + +### Gradle +.gradle/* +build/* +!gradle-wrapper.jar +.gradletasknamecache + +### Assets +public/vendor + +### Ide ### +.idea/* +.vscode/* +.project/* +.settings/* +out/* + +### Log ### +.*.log +.*.log.[0-9] +*.log.[0-9] +logs/* + +### Kafka HQ ### +conf/*.dev.conf \ No newline at end of file diff --git a/.yarnrc b/.yarnrc new file mode 100644 index 000000000..0842dd9c5 --- /dev/null +++ b/.yarnrc @@ -0,0 +1 @@ +--modules-folder public/vendor \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..8bf7faf07 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM openjdk:8-jdk as builder +COPY . /app +WORKDIR /app +RUN echo -e 'http://dl-cdn.alpinelinux.org/alpine/edge/main\nhttp://dl-cdn.alpinelinux.org/alpine/edge/community\nhttp://dl-cdn.alpinelinux.org/alpine/edge/testing' > /etc/apk/repositories && \ + apk add --no-cache yarn && \ + yarn install && \ + ./gradlew jar + + +FROM openjdk:8-jre +WORKDIR /app +COPY --from=builder /app/build/libs/kafkahq-*.jar /app/kafkahq.jar +CMD ["/usr/bin/java", "-jar", "/app/kafkahq.jar", "prod"] \ No newline at end of file diff --git a/build.gradle b/build.gradle new file mode 100644 index 000000000..1a8fad7bc --- /dev/null +++ b/build.gradle @@ -0,0 +1,91 @@ +buildscript { + ext { + joobyVersion = "1.5.1" + } + + repositories { + mavenLocal() + mavenCentral() + } + + dependencies { + classpath "com.google.gradle:osdetector-gradle-plugin:1.4.0" + classpath "io.spring.gradle:dependency-management-plugin:1.0.3.RELEASE" + classpath "org.jooby:jooby-gradle-plugin:$joobyVersion" + } +} + +apply plugin: "io.spring.dependency-management" +apply plugin: "com.google.osdetector" +apply plugin: "application" +apply plugin: "jooby" +apply plugin: "idea" + +group 'org.kafkahq' +version '0.1' +mainClassName = "org.kafkahq.App" +sourceCompatibility = 1.8 + +repositories { + mavenLocal() + mavenCentral() +} + +dependencyManagement { + imports { + mavenBom "org.jooby:jooby-bom:$joobyVersion" + } +} + +idea { + module { + downloadJavadoc = false + downloadSources = true + } +} + +joobyRun { + includes = ['**/*.class', '**/*.conf', '**/*.properties', '**/*.ftl', '**/*.xml'] +} + + +dependencies { + // jooby + compile "org.jooby:jooby-netty" + compile "io.netty:netty-transport-native-epoll:${dependencyManagement.importedProperties['netty.version']}:${osdetector.classifier.contains('linux') ? 'linux-x86_64' : ''}" + compile "io.netty:netty-tcnative-boringssl-static:${dependencyManagement.importedProperties['boringssl.version']}" + compile group: 'org.jooby', name: 'jooby-assets', version: joobyVersion + compile group: 'org.jooby', name: 'jooby-assets-sass', version: joobyVersion + compile group: 'org.jooby', name: 'jooby-livereload', version: joobyVersion + // compile group: 'org.jooby', name: 'jooby-assets-auto-prefixer', version: joobyVersion + // compile group: 'org.jooby', name: 'jooby-assets-clean-css', version: joobyVersion + // compile group: 'org.jooby', name: 'jooby-assets-uglify', version: joobyVersion + compile group: 'org.jooby', name: 'jooby-jackson', version: joobyVersion + compile group: 'org.jooby', name: 'jooby-ftl', version: joobyVersion + compile group: 'org.jooby', name: 'jooby-whoops', version: joobyVersion + + // kafka + compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.0.+' + + // debug + compile group: 'com.google.code.gson', name: 'gson', version: '2.8.+' + compileOnly group: 'org.projectlombok', name: 'lombok', version: '1.18.+' + + // test + testCompile "junit:junit:4.12" + testCompile "io.rest-assured:rest-assured:3.1.0" +} + +sourceSets.main.resources { + srcDirs = ["conf", "public"] +} + +jar { + manifest { + attributes "Main-Class": mainClassName + } + + from { + configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } + } +} \ No newline at end of file diff --git a/conf/application.conf b/conf/application.conf new file mode 100644 index 000000000..d6053fba9 --- /dev/null +++ b/conf/application.conf @@ -0,0 +1,20 @@ +application { + name: Kafka HQ + charset: UTF-8 + port: 8080 + port: ${?APPLICATION_PORT} +} + +sass { + dev { + sourcemap: inline + } + + dist { + style: compressed + } +} + + +freemarker { +} \ No newline at end of file diff --git a/conf/assets.conf b/conf/assets.conf new file mode 100644 index 000000000..d9e059427 --- /dev/null +++ b/conf/assets.conf @@ -0,0 +1,32 @@ +assets { + fileset { + vendor: [ + static/vendor.scss + + vendor/jquery/dist/jquery.js + vendor/popper.js/dist/umd/popper.js + vendor/bootstrap/dist/js/bootstrap.js + vendor/sweetalert2/dist/sweetalert2.all.js + vendor/turbolinks/dist/turbolinks.js + ] + + template: [ + static/template.scss, + static/template.js + ] + } + + pipeline { + dev: [sass] + dist: [sass] + } + + sass { + dev { + sourcemap: inline + } + dist { + style: compressed + } + } +} \ No newline at end of file diff --git a/conf/logback.xml b/conf/logback.xml new file mode 100644 index 000000000..21ec88f64 --- /dev/null +++ b/conf/logback.xml @@ -0,0 +1,39 @@ + + + + System.out + true + + ERROR + DENY + + + WARN + DENY + + + %d{ISO8601} %highlight(%5.5level) %magenta(%-10.10thread) %cyan(%-26.26logger{26}) %msg%n + + + + + System.err + true + + WARN + + + %d{ISO8601} %highlight(%5.5level) %magenta(%-10.10thread) %cyan(%-26.26logger{26}) %msg%n + + + + + + + + + + + + + diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 000000000..0d4a95168 Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 000000000..bd24854fe --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-all.zip diff --git a/gradlew b/gradlew new file mode 100755 index 000000000..cccdd3d51 --- /dev/null +++ b/gradlew @@ -0,0 +1,172 @@ +#!/usr/bin/env sh + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 000000000..f9553162f --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/package.json b/package.json new file mode 100644 index 000000000..6519dbcec --- /dev/null +++ b/package.json @@ -0,0 +1,28 @@ +{ + "name": "kafkahq", + "version": "1.0.0", + "description": "", + "main": "index.js", + "dependencies": { + "bootstrap": "^4.1.3", + "font-awesome": "^4.7.0", + "jquery": "^3.3.1", + "popper.js": "^1.14.4", + "sweetalert2": "^7.28.0", + "turbolinks": "^5.2.0" + }, + "devDependencies": {}, + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/tchiotludo/kafkahq.git" + }, + "author": "", + "license": "ISC", + "bugs": { + "url": "https://github.com/tchiotludo/kafkahq/issues" + }, + "homepage": "https://github.com/tchiotludo/kafkahq#readme" +} diff --git a/public/favicon.ico b/public/favicon.ico new file mode 100644 index 000000000..52ab0a55d Binary files /dev/null and b/public/favicon.ico differ diff --git a/public/group.ftl b/public/group.ftl new file mode 100644 index 000000000..79af7167c --- /dev/null +++ b/public/group.ftl @@ -0,0 +1,133 @@ +<#-- @ftlvariable name="clusterId" type="java.lang.String" --> +<#-- @ftlvariable name="group" type="org.kafkahq.models.ConsumerGroup" --> +<#-- @ftlvariable name="tab" type="java.lang.String" --> + +<#import "/includes/template.ftl" as template> + +<@template.header "Consumer Group: " + group.getId(), "group" /> + + + + +<@template.footer/> + diff --git a/public/groupList.ftl b/public/groupList.ftl new file mode 100644 index 000000000..a7647ca1f --- /dev/null +++ b/public/groupList.ftl @@ -0,0 +1,11 @@ +<#-- @ftlvariable name="groups" type="java.util.ArrayList" --> + +<#import "/includes/template.ftl" as template> +<#import "/includes/group.ftl" as groupTemplate> + +<@template.header "Consumer Groups", "group" /> + +<@groupTemplate.table groups /> + +<@template.footer/> + diff --git a/public/includes/functions.ftl b/public/includes/functions.ftl new file mode 100644 index 000000000..51d6732a3 --- /dev/null +++ b/public/includes/functions.ftl @@ -0,0 +1,8 @@ +<#function filesize num> + <#assign order = num?round?c?length /> + <#assign thousands = ((order - 1) / 3)?floor /> + <#if (thousands < 0)><#assign thousands = 0 /> + <#assign siMap = [ {"factor": 1, "unit": " b"}, {"factor": 1000, "unit": " ko"}, {"factor": 1000000, "unit": " mo"}, {"factor": 1000000000, "unit":" go"}, {"factor": 1000000000000, "unit": " to"} ]/> + <#assign siStr = (num / (siMap[thousands].factor))?string("0.#") + siMap[thousands].unit /> + <#return siStr /> + \ No newline at end of file diff --git a/public/includes/group.ftl b/public/includes/group.ftl new file mode 100644 index 000000000..ad4201dc7 --- /dev/null +++ b/public/includes/group.ftl @@ -0,0 +1,63 @@ +<#-- @ftlvariable name="clusterId" type="java.lang.String" --> + +<#import "node.ftl" as nodeTemplate> + +<#macro table groups> + <#-- @ftlvariable name="groups" type="java.util.List" --> +
+ + + + + + + + + + + + <#if groups?size == 0> + + + + + <#list groups as group> + + + + + + + + + +
IdStateCoordinatorTopics
+ +
${group.getId()}<@state group.getState() /><@nodeTemplate.badge group.getCoordinator()/> + <#list group.getTopics() as topic> + + ${topic} + Lag: ${group.getOffsetLag(topic)} + + + + +
+
+ + +<#macro state state> + <#-- @ftlvariable name="state" type="org.apache.kafka.common.ConsumerGroupState" --> + <#if state.toString() == "Stable"> + <#assign class="success"> + <#elseif state.toString() == "Dead"> + <#assign class="danger"> + <#elseif state.toString() == "Empty"> + <#assign class="warning"> + <#else> + <#assign class="info"> + + ${state.toString()} + \ No newline at end of file diff --git a/public/includes/node.ftl b/public/includes/node.ftl new file mode 100644 index 000000000..5f1e0b443 --- /dev/null +++ b/public/includes/node.ftl @@ -0,0 +1,7 @@ +<#macro badge node> + <#-- @ftlvariable name="node" type="org.kafkahq.models.Node" --> + + + ${node.getId()?c} + + \ No newline at end of file diff --git a/public/includes/template.ftl b/public/includes/template.ftl new file mode 100644 index 000000000..7891fa74a --- /dev/null +++ b/public/includes/template.ftl @@ -0,0 +1,78 @@ +<#-- @ftlvariable name="tab" type="java.lang.String" --> +<#-- @ftlvariable name="clusters" type="java.util.List" --> +<#-- @ftlvariable name="clusterId" type="java.lang.String" --> + +<#macro header title tab> + + + + ${title} | KafkaHQ + + + + ${liveReload?no_esc} + ${vendor_styles?no_esc} + ${template_styles?no_esc} + ${vendor_scripts?no_esc} + ${template_scripts?no_esc} + <#nested> + + +
+ + + +
+
+

+ + + ${title} +

+
+ +
+ + +<#macro footer> +
+
+
+ + <#nested> + + + diff --git a/public/static/_init.scss b/public/static/_init.scss new file mode 100644 index 000000000..07ab9afc9 --- /dev/null +++ b/public/static/_init.scss @@ -0,0 +1,6 @@ +@import "./_variables"; +@import "vendor/bootstrap/scss/_functions"; +@import "vendor/bootstrap/scss/_variables"; +@import "vendor/bootstrap/scss/_mixins"; + + diff --git a/public/static/_variables.scss b/public/static/_variables.scss new file mode 100644 index 000000000..f0206e4c4 --- /dev/null +++ b/public/static/_variables.scss @@ -0,0 +1,150 @@ +/* ----------------------------------------------------------------------------------------------------------------- *\ + Bootstrap +\* ----------------------------------------------------------------------------------------------------------------- */ + +// Typography +$font-family-sans-serif: "Open Sans", -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif, "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji" !default; +$font-family-monospace: "Source Code Pro", SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace !default; +$font-family-title: "Open Sans Condensed", -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif, "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji" !default; +$font-size-base: 0.875rem; + +// Color system +$white: #fff !default; +$gray-100: #f8f9fa !default; +$gray-200: #ebebeb !default; +$gray-300: #dee2e6 !default; +$gray-400: #ced4da !default; +$gray-500: #adb5bd !default; +$gray-600: #999 !default; +$gray-700: #444 !default; +$gray-800: #303030 !default; +$gray-900: #222 !default; +$black: #000 !default; + +$blue: #33b5e5 !default; +$indigo: #005f81 !default; +$purple: #9933cc !default; +$pink: #ff4444 !default; +$red: #d44a3a !default; +$orange: #eb7b18 !default; +$yellow: #ecbb13 !default; +$green: #299c46 !default; +$teal: #20c997 !default; +$cyan: #3498DB !default; + +$primary: $blue !default; +$secondary: $gray-700 !default; +$tertiary: $indigo; +$success: $green !default; +$info: $cyan !default; +$warning: $yellow !default; +$danger: $red !default; +$light: $gray-800 !default; +$dark: $gray-500 !default; + +$yiq-contrasted-threshold: 175 !default; + +// Body + +$body-bg: #171819 !default; +$body-color: $white !default; + +// Links +$link-color: $blue !default; + +// Tables +$table-accent-bg: $gray-800 !default; +$table-border-color: $gray-700 !default; +$table-dark-bg: $black !default; +$table-dark-border-color: $gray-700 !default; +$table-dark-color: $white !default; +$table-hover-bg: darken($primary, 25%); + +// Forms +$input-border-color: transparent !default; +$input-group-addon-color: $gray-500 !default; +$input-group-addon-bg: $gray-700 !default; + +$custom-file-color: $gray-500 !default; +$custom-file-border-color: $gray-700 !default; + +// Dropdowns +$dropdown-bg: $gray-900 !default; +$dropdown-border-color: $gray-700 !default; +$dropdown-divider-bg: $gray-700 !default; +$dropdown-link-color: $white !default; +$dropdown-link-hover-color: $white !default; +$dropdown-link-hover-bg: $primary !default; + +// Navs +$nav-link-padding-x: 2rem !default; +$nav-link-disabled-color: $gray-500 !default; +$nav-tabs-border-color: $gray-700 !default; +$nav-tabs-link-hover-border-color: $nav-tabs-border-color $nav-tabs-border-color transparent !default; +$nav-tabs-link-active-color: $white !default; +$nav-tabs-link-active-border-color: $nav-tabs-border-color $nav-tabs-border-color transparent !default; + +// Navbar +$navbar-padding-y: 1rem !default; +$navbar-dark-color: $white !default; +$navbar-dark-hover-color: $success !default; +$navbar-light-color: rgba($white, .5) !default; +$navbar-light-hover-color: $white !default; +$navbar-light-active-color: $white !default; +$navbar-light-disabled-color: rgba($white, .3) !default; +$navbar-light-toggler-border-color: rgba($white, .1) !default; + +// Pagination +$pagination-color: $white !default; +$pagination-bg: $success !default; +$pagination-border-width: 0 !default; +$pagination-border-color: transparent !default; +$pagination-hover-color: $white !default; +$pagination-hover-bg: lighten($success, 10%) !default; +$pagination-hover-border-color: transparent !default; +$pagination-active-bg: $pagination-hover-bg !default; +$pagination-active-border-color: transparent !default; +$pagination-disabled-color: $white !default; +$pagination-disabled-bg: darken($success, 15%) !default; +$pagination-disabled-border-color: transparent !default; + +// Jumbotron +$jumbotron-bg: $gray-800 !default; + +// Cards +$card-cap-bg: $gray-700 !default; +$card-bg: $gray-800 !default; + +// Popovers +$popover-bg: $gray-800 !default; +$popover-header-bg: $gray-700 !default; + +// Modals +$modal-content-bg: $gray-800 !default; +$modal-content-border-color: $gray-700 !default; +$modal-header-border-color: $gray-700 !default; + +// Progress bars +$progress-height: 0.625rem !default; +$progress-font-size: 0.625rem !default; +$progress-bg: $gray-700 !default; + +// List group +$list-group-bg: $gray-800 !default; +$list-group-border-color: $gray-700 !default; +$list-group-hover-bg: $gray-700 !default; + +// Breadcrumbs +$breadcrumb-bg: $gray-700 !default; + +// Close +$close-color: $white !default; +$close-text-shadow: none !default; + +// Code +$pre-color: inherit !default; + +/* ----------------------------------------------------------------------------------------------------------------- *\ + Font Awesome +\* ----------------------------------------------------------------------------------------------------------------- */ +$fa-font-path: "/vendor/font-awesome/fonts" \ No newline at end of file diff --git a/public/static/img/icon.png b/public/static/img/icon.png new file mode 100644 index 000000000..d784cde31 Binary files /dev/null and b/public/static/img/icon.png differ diff --git a/public/static/img/icon.svg b/public/static/img/icon.svg new file mode 100644 index 000000000..6ac1bcb90 --- /dev/null +++ b/public/static/img/icon.svg @@ -0,0 +1,93 @@ + + + + + + + + + + + + + + + Icon Mafia + + + + + image/svg+xml + + + + + + H Q + diff --git a/public/static/img/logo.svg b/public/static/img/logo.svg new file mode 100644 index 000000000..a01d8702a --- /dev/null +++ b/public/static/img/logo.svg @@ -0,0 +1,85 @@ + + + + + + image/svg+xml + + + + + + + + + + + + + + + diff --git a/public/static/template.js b/public/static/template.js new file mode 100644 index 000000000..22069c125 --- /dev/null +++ b/public/static/template.js @@ -0,0 +1,85 @@ +$(document).on('ready turbolinks:load', function () { + /* Sidebar */ + $('#sidebar-collapse').on('click', function () { + $('#sidebar').toggleClass('active'); + }); + + /* Tooltip */ + $('[data-toggle="tooltip"]').tooltip(); + + /* Tabs */ + const hash = window.location.hash; + if (hash) { + $('ul.nav a[href="' + hash + '"]') + .tab('show') + } + + $('.tabs-container').removeClass('invisible'); + + $('.nav-tabs a').click(function (e) { + $(this).tab('show'); + window.location.hash = this.hash; + }); + + + /* Main action */ + $('td.main-row-action').each(function (key, td) { + $(td).parent() + .addClass("main-row-action") + .on('dblclick', function () { + Turbolinks.visit($(td).find('a').attr('href')); + }) + }); + + /* Confirm */ + var toast = swal.mixin({ + toast: true, + position: 'top-end', + showConfirmButton: false, + timer: 3000 + }); + + $('[data-confirm]').on("click", function (event) { + event.stopPropagation(); + event.preventDefault(); + + var message = 'Are you sure ?'; + var href = $(this).attr("href"); + + if ($(this).attr("data-confirm") !== "true") { + message = $(this).attr("data-confirm"); + } + + swal({ + html: message, + type: 'question', + showCancelButton: true + }).then(function (result) { + if (result.value) { + $.ajax({ + type: "GET", + url: href, + dataType: "json" + }) + .always(function (response) + { + response = response.result ? response : response.responseJSON; + + toast({ + type: response.result ? 'success' : 'error', + title: response.message, + timer: response.result === false ? 0 : 300, + onAfterClose: function () { + if (response.result === true) { + Turbolinks.visit(window.location.href); + } + } + }); + }) + ; + } + }); + + return false; + }); +}); \ No newline at end of file diff --git a/public/static/template.scss b/public/static/template.scss new file mode 100644 index 000000000..0031726d5 --- /dev/null +++ b/public/static/template.scss @@ -0,0 +1,223 @@ +@import "./_init"; +/* ----------------------------------------------------------------------------------------------------------------- *\ + Bootstrap +\* ----------------------------------------------------------------------------------------------------------------- */ + +// Tabs +ul.nav-tabs { + .nav-link { + background-color: transparent; + border: 0 solid; + border-bottom-width: 4px; + border-bottom-color: transparent; + color: $body-color; + + &.active, &:hover.active { + font-weight: bold; + color: $primary; + border-bottom-color: $primary; + border-bottom-width: 4px; + background-color: transparent; + } + + &:hover { + transition-duration: 0ms; + border-bottom-color: $gray-400; + border-bottom-width: 4px; + } + } +} + +.tabs-container { + .tab-content { + padding: 20px; + border: 1px solid $nav-tabs-border-color; + background: $gray-900; + border-top: 0; + } +} + + +/* ----------------------------------------------------------------------------------------------------------------- *\ + Layout +\* ----------------------------------------------------------------------------------------------------------------- */ +a, button, input[type="submit"] { + transition-duration: 200ms; +} + +.title { + display: flex; + position: relative; + width: calc(100% + 40px); + left: -20px; + top: -20px; + height: 74px; + border-bottom: 4px solid $yellow; + padding: 17px 20px 0; +} + +h1 { + font-family: $font-family-title; + font-weight: 700; + font-size: 2rem; + color: white; + margin-bottom: 0; + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; + min-width: 0; +} + +.wrapper { + display: flex; + width: 100%; + align-items: stretch; +} + +#content { + @media (min-width: 768px) { + width: calc(100% - 250px); + } + + @media (max-width: 768px) { + max-width: 100%; + } + + padding: 20px; + min-height: 100vh; + transition: all 0.3s; +} + +/* ----------------------------------------------------------------------------------------------------------------- *\ + Sidebar +\* ----------------------------------------------------------------------------------------------------------------- */ +#sidebar { + min-width: 250px; + max-width: 250px; + background: $black; + color: #fff; + transition: all 0.3s; + font-family: $font-family-title; + font-size: $font-size-lg; + border-right: 1px solid $nav-tabs-border-color; + + &.active { + margin-left: -250px; + } + .sidebar-header { + padding: 5px 10px; + background: $tertiary; + border-bottom: 4px solid $yellow; + h3 { + margin-bottom: 0; + text-align: center; + sup { + color: $yellow; + } + } + } + ul { + p { + color: #fff; + padding: 10px; + } + li { + a { + padding: 10px; + display: block; + color: #FFFFFF; + &:hover { + background: lighten($tertiary, 10%); + text-decoration: none; + } + } + &.active > a { + color: #fff; + background: $tertiary; + } + + ul { + border-bottom: 1px solid lighten($tertiary, 15%); + li a { + background: $black; + + &.active { + background: $yellow; + color: $black; + } + } + } + } + } +} + +a { + &[aria-expanded="true"] { + color: #fff; + background: $tertiary; + } + &[data-toggle="collapse"] { + position: relative; + } +} + +.dropdown-toggle::after { + display: block; + position: absolute; + top: 50%; + right: 20px; + transform: translateY(-50%); +} + +ul ul a { + font-size: 0.9em !important; + padding-left: 30px !important; + background: $tertiary; +} + + +@media (max-width: 768px) { + #sidebar { + margin-left: -250px; + &.active { + margin-left: 0; + } + } + #sidebar-collapse span { + display: none; + } +} + +/* ----------------------------------------------------------------------------------------------------------------- *\ + Turbolinks +\* ----------------------------------------------------------------------------------------------------------------- */ +.turbolinks-progress-bar { + background-color: $red; +} + +/* ----------------------------------------------------------------------------------------------------------------- *\ + Table +\* ----------------------------------------------------------------------------------------------------------------- */ +table { + .main-row-action { + cursor: pointer; + } + th.row-action { + width: 16px; + } + td.row-action a { + color: $body-color; + } + pre { + white-space: pre-wrap; + overflow-wrap: break-word; + overflow: auto; + max-height: 150px; + + code { + white-space: pre-wrap; + overflow-wrap: break-word; + word-break: break-all; + } + } +} \ No newline at end of file diff --git a/public/static/vendor.scss b/public/static/vendor.scss new file mode 100644 index 000000000..f4cd355d9 --- /dev/null +++ b/public/static/vendor.scss @@ -0,0 +1,7 @@ +@import "./_init"; + +@import "vendor/bootstrap/scss/bootstrap.scss"; +@import "vendor/font-awesome/scss/font-awesome.scss"; +@import "vendor/sweetalert2/src/sweetalert2.scss"; + + diff --git a/public/topic.ftl b/public/topic.ftl new file mode 100644 index 000000000..ee600ae99 --- /dev/null +++ b/public/topic.ftl @@ -0,0 +1,127 @@ +<#-- @ftlvariable name="clusterId" type="java.lang.String" --> +<#-- @ftlvariable name="topic" type="org.kafkahq.models.Topic" --> +<#-- @ftlvariable name="tab" type="java.lang.String" --> +<#-- @ftlvariable name="datas" type="java.util.List>" --> + +<#import "/includes/template.ftl" as template> +<#import "/includes/node.ftl" as nodeTemplate> +<#import "/includes/group.ftl" as groupTemplate> +<#import "/includes/functions.ftl" as functions> + +<@template.header "Topic: " + topic.getName(), "topic" /> + + + + +<@template.footer/> + diff --git a/public/topicList.ftl b/public/topicList.ftl new file mode 100644 index 000000000..826c0e3bb --- /dev/null +++ b/public/topicList.ftl @@ -0,0 +1,82 @@ +<#-- @ftlvariable name="clusterId" type="java.lang.String" --> +<#-- @ftlvariable name="topics" type="java.util.ArrayList" --> + +<#import "/includes/template.ftl" as template> +<#import "/includes/functions.ftl" as functions> + +<@template.header "Topics", "topic" /> + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + <#if topics?size == 0> + + + + + <#list topics as topic> + + + + + + + + + + + + + + + +
TopicsPartitionsReplicationsConsumers Groups
NameOffsetsSizeTotalAvailableUnder replicatedFactorIn SyncConsumer Groups
+ +
${topic.getName()}${topic.getSumFirstOffsets() + " ⤑ " + topic.getSumOffsets()}${functions.filesize(topic.getLogDirSize())}${topic.getPartitions()?size}${topic.getReplicas()?size}${topic.getInSyncReplicas()?size} + <#list topic.getConsumerGroups() as group> + <#assign active = group.isActiveTopic(topic.getName()) > + + ${group.getId()} + + Lag: ${group.getOffsetLag(topic.getName())} + +
+ +
+ + + <#if topic.isInternal() == false> + + +
+
+ +<@template.footer/> \ No newline at end of file diff --git a/settings.gradle b/settings.gradle new file mode 100644 index 000000000..2367dbc16 --- /dev/null +++ b/settings.gradle @@ -0,0 +1,2 @@ +rootProject.name = 'kafkahq' + diff --git a/src/main/java/org/kafkahq/App.java b/src/main/java/org/kafkahq/App.java new file mode 100644 index 000000000..ac52fd57d --- /dev/null +++ b/src/main/java/org/kafkahq/App.java @@ -0,0 +1,64 @@ +package org.kafkahq; + +import org.jooby.Jooby; +import org.jooby.RequestLogger; +import org.jooby.assets.Assets; +import org.jooby.ftl.Ftl; +import org.jooby.json.Jackson; +import org.jooby.livereload.LiveReload; +import org.jooby.whoops.Whoops; +import org.kafkahq.controllers.GroupController; +import org.kafkahq.controllers.TopicController; +import org.kafkahq.modules.KafkaModule; +import org.kafkahq.modules.KafkaWrapper; +import org.kafkahq.repositories.AbstractRepository; + +import java.util.Optional; + +public class App extends Jooby { + // module + { + use("*", new RequestLogger() + .latency() + .extended() + ); + use(new Jackson()); + + on("dev", () -> { + use(new Whoops()); + use(new LiveReload()); + }); + + use(new Assets()); + assets("/favicon.ico"); + use(new Ftl("/", ".ftl")); + + use(KafkaModule.class); + + // @RequestScoped hack + use("*", "/{cluster}/**", (req, rsp, chain) -> { + Optional cluster = req.param("cluster").toOptional(); + cluster.ifPresent(clusterId -> + AbstractRepository.setWrapper(new KafkaWrapper(this.require(KafkaModule.class), clusterId)) + ); + + chain.next(req, rsp); + }); + } + + // route + { + use("*", "/", (req, rsp, chain) -> { + rsp.redirect("/" + this.require(KafkaModule.class).getClustersList().get(0) + "/topic"); + }); + use("*", "/{cluster}", (req, rsp, chain) -> { + rsp.redirect("/" + req.param("cluster").value() + "/topic"); + }); + use(TopicController.class); + use(GroupController.class); + } + + public static void main(String[] args) { + run(App::new, args); + } +} \ No newline at end of file diff --git a/src/main/java/org/kafkahq/controllers/AbstractController.java b/src/main/java/org/kafkahq/controllers/AbstractController.java new file mode 100644 index 000000000..62960b401 --- /dev/null +++ b/src/main/java/org/kafkahq/controllers/AbstractController.java @@ -0,0 +1,17 @@ +package org.kafkahq.controllers; + +import com.google.inject.Inject; +import org.jooby.Request; +import org.jooby.View; +import org.kafkahq.modules.KafkaModule; + +abstract public class AbstractController { + @Inject + private KafkaModule kafkaModule; + + protected View template(Request request, View view) { + return view + .put("clusterId", request.param("cluster").value()) + .put("clusters", this.kafkaModule.getClustersList()); + } +} diff --git a/src/main/java/org/kafkahq/controllers/GroupController.java b/src/main/java/org/kafkahq/controllers/GroupController.java new file mode 100644 index 000000000..639ec529f --- /dev/null +++ b/src/main/java/org/kafkahq/controllers/GroupController.java @@ -0,0 +1,53 @@ +package org.kafkahq.controllers; + +import com.google.inject.Inject; +import org.jooby.Request; +import org.jooby.Results; +import org.jooby.View; +import org.jooby.mvc.GET; +import org.jooby.mvc.Path; +import org.kafkahq.models.ConsumerGroup; +import org.kafkahq.repositories.ConsumerGroupRepository; + +import java.util.concurrent.ExecutionException; + +@Path("/{cluster}/group") +public class GroupController extends AbstractController { + @Inject + private ConsumerGroupRepository consumerGroupRepository; + + @GET + public View list(Request request) throws ExecutionException, InterruptedException { + return this.template( + request, + Results + .html("groupList") + .put("groups", this.consumerGroupRepository.list()) + ); + } + + @GET + @Path("{id}") + public View home(Request request) throws ExecutionException, InterruptedException { + return this.group(request, "topics"); + } + + @GET + @Path("{id}/{tab:(topics|members)}") + public View tab(Request request) throws ExecutionException, InterruptedException { + return this.group(request, request.param("tab").value()); + } + + public View group(Request request, String tab) throws ExecutionException, InterruptedException { + ConsumerGroup group = this.consumerGroupRepository.findByName(request.param("id").value()); + + return this.template( + request, + Results + .html("group") + .put("tab", tab) + .put("group", group) + ); + } + +} diff --git a/src/main/java/org/kafkahq/controllers/TopicController.java b/src/main/java/org/kafkahq/controllers/TopicController.java new file mode 100644 index 000000000..e8d559f8c --- /dev/null +++ b/src/main/java/org/kafkahq/controllers/TopicController.java @@ -0,0 +1,102 @@ +package org.kafkahq.controllers; + +import com.google.inject.Inject; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.jooby.Request; +import org.jooby.Result; +import org.jooby.Results; +import org.jooby.View; +import org.jooby.mvc.GET; +import org.jooby.mvc.Path; +import org.kafkahq.models.Topic; +import org.kafkahq.repositories.RecordRepository; +import org.kafkahq.repositories.TopicRepository; +import org.kafkahq.response.ResultStatusResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; + +@Path("/{cluster}/topic") +public class TopicController extends AbstractController { + private static final Logger logger = LoggerFactory.getLogger(TopicController.class); + + @Inject + private TopicRepository topicRepository; + + @Inject + private RecordRepository recordRepository; + + @GET + public View list(Request request) throws ExecutionException, InterruptedException { + return this.template( + request, + Results + .html("topicList") + .put("topics", this.topicRepository.list()) + ); + } + + @GET + @Path("{id}") + public View home(Request request) throws ExecutionException, InterruptedException { + Topic topic = this.topicRepository.findByName(request.param("id").value()); + List> data = this.recordRepository.consume( + request.param("cluster").value(), + Collections.singletonList(topic.getName()), + new RecordRepository.Options() + ); + + return this.template( + request, + Results + .html("topic") + .put("tab", "data") + .put("topic", topic) + .put("datas", data) + ); + } + + @GET + @Path("{id}/{tab:(partitions|groups)}") + public View tab(Request request) throws ExecutionException, InterruptedException { + return this.topic(request, request.param("tab").value()); + } + + public View topic(Request request, String tab) throws ExecutionException, InterruptedException { + Topic topic = this.topicRepository.findByName(request.param("id").value()); + + return this.template( + request, + Results + .html("topic") + .put("tab", tab) + .put("topic", topic) + ); + } + + @GET + @Path("{id}/delete") + public Result delete(Request request) { + String name = request.param("id").value(); + ResultStatusResponse result = new ResultStatusResponse(); + + try { + this.topicRepository.delete(request.param("cluster").value(), name); + + result.result = true; + result.message = "Topic '" + name + "' is deleted"; + + return Results.with(result, 200); + } catch (Exception exception) { + logger.error("Failed to delete topic " + name, exception); + + result.result = false; + result.message = exception.getCause().getMessage(); + + return Results.with(result, 500); + } + } +} diff --git a/src/main/java/org/kafkahq/models/Cluster.java b/src/main/java/org/kafkahq/models/Cluster.java new file mode 100644 index 000000000..92ddc9cb8 --- /dev/null +++ b/src/main/java/org/kafkahq/models/Cluster.java @@ -0,0 +1,40 @@ +package org.kafkahq.models; + +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.kafka.clients.admin.DescribeClusterResult; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; + +@ToString +@EqualsAndHashCode +public class Cluster { + public Cluster(DescribeClusterResult result) throws ExecutionException, InterruptedException { + this.id = result.clusterId().get(); + for(org.apache.kafka.common.Node node : result.nodes().get()) { + this.nodes.add(new Node(node)); + } + + this.controller = new Node(result.controller().get()); + } + + private final String id; + + public String getId() { + return id; + } + + private final List nodes = new ArrayList<>(); + + public List getNodes() { + return nodes; + } + + private final Node controller; + + public Node getController() { + return controller; + } +} diff --git a/src/main/java/org/kafkahq/models/Consumer.java b/src/main/java/org/kafkahq/models/Consumer.java new file mode 100644 index 000000000..85d3e00d8 --- /dev/null +++ b/src/main/java/org/kafkahq/models/Consumer.java @@ -0,0 +1,88 @@ +package org.kafkahq.models; + +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.kafka.clients.admin.MemberDescription; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@ToString +@EqualsAndHashCode +public class Consumer { + public Consumer(MemberDescription description) { + this.id = description.consumerId(); + this.clientId = description.clientId(); + this.host = description.host(); + + for (org.apache.kafka.common.TopicPartition assignment : description.assignment().topicPartitions()) { + this.assignments.add(new TopicPartition(assignment)); + } + + this.assignments.sort(Comparator + .comparing(org.kafkahq.models.TopicPartition::getTopic) + .thenComparingInt(org.kafkahq.models.TopicPartition::getPartition) + ); + } + + private final String id; + + public String getId() { + return id; + } + + private final String clientId; + + public String getClientId() { + return clientId; + } + + private final String host; + + public String getHost() { + return host; + } + + private final ArrayList assignments = new ArrayList<>(); + + public ArrayList getAssignments() { + return assignments; + } + + public ArrayList getGroupedAssignments() { + Map> collect = this.assignments + .stream() + .collect(Collectors.groupingBy(TopicPartition::getTopic)) + ; + + ArrayList list = new ArrayList<>(); + + for(Map.Entry> item : collect.entrySet()) { + list.add(new GroupedAssignement(item.getKey(), item.getValue().stream().mapToInt(TopicPartition::getPartition).toArray())); + } + + return list; + } + + public static class GroupedAssignement { + private GroupedAssignement (String topic, int[] partitions) { + this.topic = topic; + this.partitions = partitions; + } + + private final String topic; + + public String getTopic() { + return topic; + } + + private final int[] partitions; + + public int[] getPartitions() { + return partitions; + } + } +} diff --git a/src/main/java/org/kafkahq/models/ConsumerGroup.java b/src/main/java/org/kafkahq/models/ConsumerGroup.java new file mode 100644 index 000000000..ddea470f8 --- /dev/null +++ b/src/main/java/org/kafkahq/models/ConsumerGroup.java @@ -0,0 +1,146 @@ +package org.kafkahq.models; + +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.ConsumerGroupState; + +import java.util.*; +import java.util.stream.Collectors; + +@ToString +@EqualsAndHashCode +public class ConsumerGroup { + public ConsumerGroup( + ConsumerGroupDescription groupDescription, + Map groupOffset, + Map> topicsOffsets + ) { + this.id = groupDescription.groupId(); + this.isSimpleConsumerGroup = groupDescription.isSimpleConsumerGroup(); + this.partitionAssignor = groupDescription.partitionAssignor(); + this.state = groupDescription.state(); + this.coordinator = new Node(groupDescription.coordinator()); + + for (MemberDescription member : groupDescription.members()) { + this.members.add(new Consumer(member)); + } + + for (Map.Entry offset : groupOffset.entrySet()) { + Partition.Offsets topicOffsets = topicsOffsets.get(offset.getKey().topic()) + .stream() + .filter(item -> item.getPartition() == offset.getKey().partition()) + .findFirst() + .orElseThrow(() -> new NoSuchElementException( + "Topic Partition Offsets '" + offset.getKey().topic() + + "' partition " + offset.getKey().partition() + " doesn't exist for group " + this.id + )); + + this.offsets.add(new TopicPartition.ConsumerGroupOffset( + offset.getKey(), + offset.getValue(), + topicOffsets + )); + } + + for (MemberDescription member : groupDescription.members()) { + for (org.apache.kafka.common.TopicPartition assignment : member.assignment().topicPartitions()) { + long count = this.offsets.stream() + .filter(entry -> entry.getTopic().equals(assignment.topic()) && entry.getPartition() == assignment.partition()) + .count(); + + if (count == 0) { + this.offsets.add(new TopicPartition.ConsumerGroupOffset(assignment)); + } + } + } + + this.offsets.sort(Comparator + .comparing(org.kafkahq.models.TopicPartition.ConsumerGroupOffset::getTopic) + .thenComparingInt(org.kafkahq.models.TopicPartition.ConsumerGroupOffset::getPartition) + ); + } + + private final String id; + + public String getId() { + return id; + } + + private final boolean isSimpleConsumerGroup; + + public boolean isSimpleConsumerGroup() { + return isSimpleConsumerGroup; + } + + private final String partitionAssignor; + + public String partitionAssignor() { + return partitionAssignor; + } + + private final ConsumerGroupState state; + + public ConsumerGroupState getState() { + return state; + } + + private final Node coordinator; + + public Node getCoordinator() { + return coordinator; + } + + private final ArrayList members = new ArrayList<>(); + + public ArrayList getMembers() { + return members; + } + + private final ArrayList offsets = new ArrayList<>(); + + public ArrayList getOffsets() { + return offsets; + } + + public List getActiveTopics() { + return this.getMembers() + .stream() + .flatMap(consumer -> consumer.getAssignments().stream().map(org.kafkahq.models.TopicPartition::getTopic)) + .distinct() + .sorted(String::compareToIgnoreCase) + .collect(Collectors.toList()); + } + + public List getTopics() { + List list = this.getOffsets() + .stream() + .map(org.kafkahq.models.TopicPartition::getTopic) + .distinct() + .collect(Collectors.toList()); + + list.addAll(this.getActiveTopics()); + + return list + .stream() + .distinct() + .sorted(String::compareToIgnoreCase) + .collect(Collectors.toList()); + } + + public boolean isActiveTopic(String topic) { + return this.getActiveTopics().contains(topic); + } + + public long getOffsetLag(String topic) { + return this.offsets.stream() + .filter(consumerGroupOffset -> consumerGroupOffset.getTopic().equals(topic)) + .map(org.kafkahq.models.TopicPartition.ConsumerGroupOffset::getOffsetLag) + .reduce(0L, + (a1, a2) -> a1 + a2.orElse(0L), + (a1, a2) -> a1 + a2 + ); + } +} diff --git a/src/main/java/org/kafkahq/models/LogDir.java b/src/main/java/org/kafkahq/models/LogDir.java new file mode 100644 index 000000000..bb572727b --- /dev/null +++ b/src/main/java/org/kafkahq/models/LogDir.java @@ -0,0 +1,76 @@ +package org.kafkahq.models; + +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; + +@ToString +@EqualsAndHashCode +public class LogDir { + public LogDir(Integer brokerId, String path, org.apache.kafka.common.TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) { + this.brokerId = brokerId; + this.path = path; + this.topic = topicPartition.topic(); + this.partition = topicPartition.partition(); + this.size = replicaInfo.size; + this.offsetLag = replicaInfo.offsetLag; + this.isFuture = replicaInfo.isFuture; + } + + private final Integer brokerId; + + public Integer getBrokerId() { + return brokerId; + } + + /** + * The absolute log directory path. + */ + private final String path; + + public String getPath() { + return path; + } + + private final String topic; + + public String getTopic() { + return topic; + } + + private final int partition; + + public int getPartition() { + return partition; + } + + /** + * The size of the log segments of the partition in bytes. + */ + private final long size; + + public long getSize() { + return size; + } + + /** + * The lag of the log's LEO w.r.t. partition's HW + * (if it is the current log for the partition) or current replica's LEO + * (if it is the future log for the partition) + */ + private final long offsetLag; + + public long getOffsetLag() { + return offsetLag; + } + + /** + * True if this log is created by AlterReplicaLogDirsRequest + * and will replace the current log of the replica in the future. + */ + private final boolean isFuture; + + public boolean isFuture() { + return isFuture; + } +} diff --git a/src/main/java/org/kafkahq/models/Node.java b/src/main/java/org/kafkahq/models/Node.java new file mode 100644 index 000000000..d6bd7c898 --- /dev/null +++ b/src/main/java/org/kafkahq/models/Node.java @@ -0,0 +1,62 @@ +package org.kafkahq.models; + +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@ToString +@EqualsAndHashCode +public class Node { + public Node(org.apache.kafka.common.Node node) { + this.id = node.id(); + this.host = node.host(); + this.port = node.port(); + this.rack = node.rack(); + } + + private final int id; + + public int getId() { + return id; + } + + private final String host; + + public String getHost() { + return host; + } + + private final int port; + + public int getPort() { + return port; + } + + private final String rack; + + public String getRack() { + return rack; + } + + @ToString + @EqualsAndHashCode(callSuper=true) + public static class Partition extends Node + { + public Partition(org.apache.kafka.common.Node node, boolean isLeader, boolean isInSyncReplicas) { + super(node); + this.isLeader = isLeader; + this.isInSyncReplicas = isInSyncReplicas; + } + + private final boolean isLeader; + + public boolean isLeader() { + return isLeader; + } + + private final boolean isInSyncReplicas; + + public boolean isInSyncReplicas() { + return isInSyncReplicas; + } + } +} diff --git a/src/main/java/org/kafkahq/models/Partition.java b/src/main/java/org/kafkahq/models/Partition.java new file mode 100644 index 000000000..12119a49d --- /dev/null +++ b/src/main/java/org/kafkahq/models/Partition.java @@ -0,0 +1,94 @@ +package org.kafkahq.models; + +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.kafka.common.TopicPartitionInfo; + +import java.util.*; + +@ToString +@EqualsAndHashCode +public class Partition { + public Partition(TopicPartitionInfo partitionInfo, LogDir logDir, Offsets offsets) { + this.id = partitionInfo.partition(); + this.logDir = logDir; + this.firstOffset = offsets.getFirstOffset(); + this.lastOffset = offsets.getLastOffset(); + this.nodes = new ArrayList<>(); + + for (org.apache.kafka.common.Node replica : partitionInfo.replicas()) { + nodes.add(new Node.Partition( + replica, + partitionInfo.leader().id() == replica.id(), + partitionInfo.isr().stream().anyMatch(node -> node.id() == replica.id()) + )); + } + } + + private final int id; + + public int getId() { + return id; + } + + private final List nodes; + + public List getNodes() { + return nodes; + } + + private final LogDir logDir; + + public LogDir getLogDir() { + return logDir; + } + + private final long firstOffset; + + public long getFirstOffset() { + return firstOffset; + } + + private final long lastOffset; + + public long getLastOffset() { + return lastOffset; + } + + public Node.Partition getLeader() { + return nodes + .stream() + .filter(Node.Partition::isLeader) + .findFirst() + .orElseThrow(() -> new NoSuchElementException("Leader not found")); + } + + @ToString + @EqualsAndHashCode + public static class Offsets + { + public Offsets(int partition, long start, long lastOffset) { + this.partition = partition; + this.firstOffset = start; + this.lastOffset = lastOffset; + } + + private final int partition; + + public int getPartition() { + return partition; + } + + private long firstOffset; + + public long getFirstOffset() { + return firstOffset; + } + + private long lastOffset; + + public long getLastOffset() { + return lastOffset; + } + } +} diff --git a/src/main/java/org/kafkahq/models/Topic.java b/src/main/java/org/kafkahq/models/Topic.java new file mode 100644 index 000000000..6ffa43c83 --- /dev/null +++ b/src/main/java/org/kafkahq/models/Topic.java @@ -0,0 +1,101 @@ +package org.kafkahq.models; + +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartitionInfo; + +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; + +@ToString +@EqualsAndHashCode +public class Topic { + public Topic( + TopicDescription description, + List consumerGroup, + List logDirs, + List offsets + ) { + this.name = description.name(); + this.internal = description.isInternal(); + this.consumerGroups = consumerGroup; + + for (TopicPartitionInfo partition : description.partitions()) { + this.partitions.add(new Partition( + partition, + logDirs.stream() + .filter(logDir -> logDir.getPartition() == partition.partition()) + .findFirst() + .orElseThrow(() -> new NoSuchElementException( + "Partition '" + partition.partition() + "' doesn't exist for topic " + this.name + )), + offsets.stream() + .filter(offset -> offset.getPartition() == partition.partition()) + .findFirst() + .orElseThrow(() -> new NoSuchElementException( + "Partition Offsets '" + partition.partition() + "' doesn't exist for topic " + this.name + )) + )); + } + } + + private String name; + + public String getName() { + return name; + } + + private boolean internal; + + public boolean isInternal() { + return internal; + } + + private final List partitions = new ArrayList<>(); + + public List getPartitions() { + return partitions; + } + + private List consumerGroups; + + public List getConsumerGroups() { + return consumerGroups; + } + + public List getReplicas() { + return this.getPartitions().stream() + .flatMap(partition -> partition.getNodes().stream()) + .distinct() + .collect(Collectors.toList()); + } + + public List getInSyncReplicas() { + return this.getPartitions().stream() + .flatMap(partition -> partition.getNodes().stream()) + .filter(Node.Partition::isInSyncReplicas) + .distinct() + .collect(Collectors.toList()); + } + + public long getLogDirSize() { + return this.getPartitions().stream() + .map(p -> p.getLogDir().getSize()) + .reduce(0L, Long::sum); + } + + public long getSumFirstOffsets() { + return this.getPartitions().stream() + .map(Partition::getFirstOffset) + .reduce(0L, Long::sum); + } + + public long getSumOffsets() { + return this.getPartitions().stream() + .map(Partition::getLastOffset) + .reduce(0L, Long::sum); + } +} diff --git a/src/main/java/org/kafkahq/models/TopicPartition.java b/src/main/java/org/kafkahq/models/TopicPartition.java new file mode 100644 index 000000000..fd33896f1 --- /dev/null +++ b/src/main/java/org/kafkahq/models/TopicPartition.java @@ -0,0 +1,87 @@ +package org.kafkahq.models; + +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; + +import java.util.Optional; + +@ToString +@EqualsAndHashCode +public class TopicPartition { + public TopicPartition(org.apache.kafka.common.TopicPartition topicPartition) { + this.topic = topicPartition.topic(); + this.partition = topicPartition.partition(); + } + + private final String topic; + + public String getTopic() { + return topic; + } + + private final int partition; + + public int getPartition() { + return partition; + } + + @ToString + @EqualsAndHashCode(callSuper=true) + public static class ConsumerGroupOffset extends TopicPartition { + public ConsumerGroupOffset(org.apache.kafka.common.TopicPartition topicPartition) { + super(topicPartition); + + this.offset = Optional.empty(); + this.metadata = Optional.empty(); + this.firstOffset = Optional.empty(); + this.lastOffset = Optional.empty(); + } + + public ConsumerGroupOffset( + org.apache.kafka.common.TopicPartition topicPartition, + OffsetAndMetadata offsetAndMetadata, + Partition.Offsets partiionOffsets + ) { + super(topicPartition); + + this.offset = Optional.of(offsetAndMetadata.offset()); + this.metadata = Optional.of(offsetAndMetadata.metadata()); + + this.firstOffset = Optional.of(partiionOffsets.getFirstOffset()); + this.lastOffset = Optional.of(partiionOffsets.getLastOffset()); + } + + private final Optional offset; + + public Optional getOffset() { + return offset; + } + + private final Optional metadata; + + public Optional getMetadata() { + return metadata; + } + + private Optional firstOffset; + + public Optional getFirstOffset() { + return firstOffset; + } + + private Optional lastOffset; + + public Optional getLastOffset() { + return lastOffset; + } + + public Optional getOffsetLag() { + if (this.lastOffset.isPresent() && this.offset.isPresent()) { + return Optional.of(this.lastOffset.get() - this.offset.get()); + } else { + return Optional.empty(); + } + } + } +} diff --git a/src/main/java/org/kafkahq/modules/KafkaModule.java b/src/main/java/org/kafkahq/modules/KafkaModule.java new file mode 100644 index 000000000..a5826b38e --- /dev/null +++ b/src/main/java/org/kafkahq/modules/KafkaModule.java @@ -0,0 +1,94 @@ +package org.kafkahq.modules; + +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.typesafe.config.Config; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.jooby.Env; +import org.jooby.Jooby; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +@Singleton +public class KafkaModule implements Jooby.Module { + private static Logger logger = LoggerFactory.getLogger(KafkaModule.class); + + @Inject + private Config config; + + public T debug(Callable task, String format, Object... arguments) throws ExecutionException, InterruptedException { + long startTime = System.currentTimeMillis(); + T call; + + try { + call = task.call(); + logger.debug("[{}] " + format, (System.currentTimeMillis() - startTime) + "ms", arguments); + return call; + } catch (InterruptedException | ExecutionException exception) { + throw exception; + } catch (Exception exception) { + throw new RuntimeException("Error for " + format, exception); + } + } + + public List getClustersList() { + return this.config + .getConfig("kafka.connections") + .entrySet() + .stream() + .map(entry -> entry.getKey().split("\\.")[0]) + .distinct() + .collect(Collectors.toList()); + } + + private Properties getCommonProperties(String clusterId) { + Properties props = new Properties(); + + this.config.getConfig("kafka.connections." + clusterId) + .entrySet() + .forEach(config -> props.put(config.getKey(), config.getValue().unwrapped())); + + return props; + } + + private Map adminClient = new HashMap<>(); + + public AdminClient getAdminClient(String clusterId) { + if (!this.adminClient.containsKey(clusterId)) { + this.adminClient.put(clusterId, AdminClient.create(this.getCommonProperties(clusterId))); + } + + return this.adminClient.get(clusterId); + } + + private Map> consumer = new HashMap<>(); + + public KafkaConsumer getConsumer(String clusterId) { + if (!this.consumer.containsKey(clusterId)) { + this.consumer.put(clusterId, new KafkaConsumer<>( + this.getCommonProperties(clusterId), + new StringDeserializer(), + new StringDeserializer() + )); + } + + return this.consumer.get(clusterId); + } + + @SuppressWarnings("NullableProblems") + @Override + public void configure(Env env, Config conf, Binder binder) { + binder.bind(KafkaModule.class).toInstance(new KafkaModule()); + } +} diff --git a/src/main/java/org/kafkahq/modules/KafkaWrapper.java b/src/main/java/org/kafkahq/modules/KafkaWrapper.java new file mode 100644 index 000000000..0e507e7d5 --- /dev/null +++ b/src/main/java/org/kafkahq/modules/KafkaWrapper.java @@ -0,0 +1,225 @@ +package org.kafkahq.modules; + +import com.google.inject.Inject; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.jooby.scope.RequestScoped; +import org.kafkahq.models.Partition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static java.util.stream.Collectors.*; + +@RequestScoped +public class KafkaWrapper { + private static Logger logger = LoggerFactory.getLogger(KafkaWrapper.class); + + private KafkaModule kafkaModule; + + private String clusterId; + + @Inject + public KafkaWrapper(KafkaModule kafkaModule, String clusterId) { + this.kafkaModule = kafkaModule; + this.clusterId = clusterId; + } + + private DescribeClusterResult cluster; + + public DescribeClusterResult describeCluster() throws ExecutionException, InterruptedException { + if (this.cluster == null) { + this.cluster = this.kafkaModule.debug(() -> { + DescribeClusterResult cluster = kafkaModule.getAdminClient(clusterId).describeCluster(); + + cluster.clusterId().get(); + cluster.nodes().get(); + cluster.controller().get(); + return cluster; + }, "Cluster"); + } + + return this.cluster; + } + + private Collection listTopics; + + public Collection listTopics() throws ExecutionException, InterruptedException { + if (this.listTopics == null) { + this.listTopics = this.kafkaModule.debug( + () -> kafkaModule.getAdminClient(clusterId).listTopics( + new ListTopicsOptions().listInternal(true) + ).listings().get(), + "List topics" + ); + } + + return this.listTopics; + } + + private Map describeTopics = new HashMap<>(); + + public Map describeTopics(List topics) throws ExecutionException, InterruptedException { + List list = new ArrayList<>(topics); + list.removeIf(value -> this.describeTopics.containsKey(value)); + + if (list.size() > 0) { + Map description = this.kafkaModule.debug( + () -> kafkaModule.getAdminClient(clusterId) + .describeTopics(list) + .all() + .get(), + "Describe Topic {}", + topics + ); + + this.describeTopics.putAll(description); + } + + return this.describeTopics + .entrySet() + .stream() + .filter(e -> topics.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private Map> describeTopicsOffsets = new HashMap<>(); + + public Map> describeTopicsOffsets(List topics) throws ExecutionException, InterruptedException { + List list = new ArrayList<>(topics); + list.removeIf(value -> this.describeTopicsOffsets.containsKey(value)); + + if (list.size() > 0) { + Map> finalOffsets = this.kafkaModule.debug( + () -> { + List collect = this.describeTopics(topics).entrySet() + .stream() + .flatMap(topicDescription -> topicDescription + .getValue() + .partitions() + .stream() + .map(topicPartitionInfo -> + new TopicPartition(topicDescription.getValue().name(), topicPartitionInfo.partition()) + ) + ) + .collect(Collectors.toList()); + + Map begins = kafkaModule.getConsumer(clusterId).beginningOffsets(collect); + // @FIXME: ugly hacks, on startup, first query can send a partial result, resending request works ! + if (begins.size() != collect.size()) { + begins = kafkaModule.getConsumer(clusterId).beginningOffsets(collect); + } + Map ends = kafkaModule.getConsumer(clusterId).endOffsets(collect); + + return begins.entrySet().stream() + .collect(groupingBy( + o -> o.getKey().topic(), + mapping(begin -> + new Partition.Offsets( + begin.getKey().partition(), + begin.getValue(), + ends.get(begin.getKey()) + ), + toList() + ) + )); + }, + "Describe Topic {}", + topics + ); + + this.describeTopicsOffsets.putAll(finalOffsets); + } + + return this.describeTopicsOffsets + .entrySet() + .stream() + .filter(e -> topics.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private Collection listConsumerGroups; + + public Collection listConsumerGroups() throws ExecutionException, InterruptedException { + if (this.listConsumerGroups == null) { + this.listConsumerGroups = this.kafkaModule.debug( + () -> kafkaModule.getAdminClient(clusterId).listConsumerGroups().all().get(), + "List ConsumerGroups" + ); + } + + return this.listConsumerGroups; + } + + private Map describeConsumerGroups = new HashMap<>(); + + public Map describeConsumerGroups(List topics) throws ExecutionException, InterruptedException { + List list = new ArrayList<>(topics); + list.removeIf(value -> this.describeConsumerGroups.containsKey(value)); + + if (list.size() > 0) { + Map description = this.kafkaModule.debug( + () -> kafkaModule.getAdminClient(clusterId) + .describeConsumerGroups(list) + .all() + .get(), + "Describe ConsumerGroups {}", + topics + ); + + this.describeConsumerGroups.putAll(description); + } + + return this.describeConsumerGroups + .entrySet() + .stream() + .filter(e -> topics.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + + private Map> consumerGroupOffset = new HashMap<>(); + + public Map consumerGroupsOffsets(String groupId) throws ExecutionException, InterruptedException { + if (!this.consumerGroupOffset.containsKey(groupId)) { + Map description = this.kafkaModule.debug( + () -> kafkaModule.getAdminClient(clusterId) + .listConsumerGroupOffsets(groupId) + .partitionsToOffsetAndMetadata() + .get(), + "ConsumerGroup Offsets {}", + groupId + ); + + this.consumerGroupOffset.put(groupId, description); + } + + return this.consumerGroupOffset.get(groupId); + } + + private Map> logDirs; + + public Map> describeLogDir() throws ExecutionException, InterruptedException { + if (this.logDirs == null) { + this.logDirs = this.kafkaModule.debug(() -> + kafkaModule.getAdminClient(clusterId) + .describeLogDirs(this.describeCluster().nodes().get() + .stream() + .map(Node::id) + .collect(Collectors.toList()) + ) + .all() + .get(), + "List Log dir" + ); + } + + return this.logDirs; + } +} diff --git a/src/main/java/org/kafkahq/repositories/AbstractRepository.java b/src/main/java/org/kafkahq/repositories/AbstractRepository.java new file mode 100644 index 000000000..98fea585c --- /dev/null +++ b/src/main/java/org/kafkahq/repositories/AbstractRepository.java @@ -0,0 +1,11 @@ +package org.kafkahq.repositories; + +import org.kafkahq.modules.KafkaWrapper; + +abstract public class AbstractRepository { + protected static KafkaWrapper kafkaWrapper; + + public static void setWrapper(KafkaWrapper kafkaWrapper) { + AbstractRepository.kafkaWrapper = kafkaWrapper; + } +} diff --git a/src/main/java/org/kafkahq/repositories/ClusterRepository.java b/src/main/java/org/kafkahq/repositories/ClusterRepository.java new file mode 100644 index 000000000..a659c4fcb --- /dev/null +++ b/src/main/java/org/kafkahq/repositories/ClusterRepository.java @@ -0,0 +1,23 @@ +package org.kafkahq.repositories; + +import com.google.inject.Binder; +import com.google.inject.Singleton; +import com.typesafe.config.Config; +import org.jooby.Env; +import org.jooby.Jooby; +import org.kafkahq.models.Cluster; + +import java.util.concurrent.ExecutionException; + +@Singleton +public class ClusterRepository extends AbstractRepository implements Jooby.Module { + public Cluster get() throws ExecutionException, InterruptedException { + return new Cluster(kafkaWrapper.describeCluster()); + } + + @SuppressWarnings("NullableProblems") + @Override + public void configure(Env env, Config conf, Binder binder) { + binder.bind(ClusterRepository.class).toInstance(new ClusterRepository()); + } +} diff --git a/src/main/java/org/kafkahq/repositories/ConsumerGroupRepository.java b/src/main/java/org/kafkahq/repositories/ConsumerGroupRepository.java new file mode 100644 index 000000000..177fc2c96 --- /dev/null +++ b/src/main/java/org/kafkahq/repositories/ConsumerGroupRepository.java @@ -0,0 +1,84 @@ +package org.kafkahq.repositories; + +import com.google.inject.Binder; +import com.google.inject.Singleton; +import com.typesafe.config.Config; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.jooby.Env; +import org.jooby.Jooby; +import org.kafkahq.models.ConsumerGroup; +import org.kafkahq.models.Partition; + +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +@Singleton +public class ConsumerGroupRepository extends AbstractRepository implements Jooby.Module { + public List list() throws ExecutionException, InterruptedException { + ArrayList list = new ArrayList<>(); + + for (ConsumerGroupListing item : kafkaWrapper.listConsumerGroups()) { + list.add(item.groupId()); + } + + List groups = this.findByName(list); + groups.sort(Comparator.comparing(ConsumerGroup::getId)); + + return groups; + } + + public ConsumerGroup findByName(String name) throws ExecutionException, InterruptedException { + Optional topics = this.findByName(new ArrayList() {{ + add(name); + }}).stream().findFirst(); + + return topics.orElseThrow(() -> new NoSuchElementException("Topic '" + name + "' doesn't exist")); + } + + public List findByName(List groups) throws ExecutionException, InterruptedException { + ArrayList list = new ArrayList<>(); + + Set> consumerDescriptions = kafkaWrapper.describeConsumerGroups(groups).entrySet(); + + for (Map.Entry description : consumerDescriptions) { + Map groupsOffsets = kafkaWrapper.consumerGroupsOffsets(description.getKey()); + Map> topicsOffsets = kafkaWrapper.describeTopicsOffsets(groupsOffsets.entrySet() + .stream() + .map(item -> item.getKey().topic()) + .distinct() + .collect(Collectors.toList()) + ); + + list.add(new ConsumerGroup( + description.getValue(), + groupsOffsets, + topicsOffsets + )); + } + + return list; + } + + public List findByTopic(String topic) throws ExecutionException, InterruptedException { + return this.list().stream() + .filter(consumerGroups -> + consumerGroups.getActiveTopics() + .stream() + .anyMatch(s -> Objects.equals(s, topic)) || + consumerGroups.getTopics() + .stream() + .anyMatch(s -> Objects.equals(s, topic)) + ) + .collect(Collectors.toList()); + } + + @SuppressWarnings("NullableProblems") + @Override + public void configure(Env env, Config conf, Binder binder) { + binder.bind(ConsumerGroupRepository.class).toInstance(new ConsumerGroupRepository()); + } +} diff --git a/src/main/java/org/kafkahq/repositories/LogDirRepository.java b/src/main/java/org/kafkahq/repositories/LogDirRepository.java new file mode 100644 index 000000000..f18d62849 --- /dev/null +++ b/src/main/java/org/kafkahq/repositories/LogDirRepository.java @@ -0,0 +1,45 @@ +package org.kafkahq.repositories; + +import com.google.inject.Binder; +import com.google.inject.Singleton; +import com.typesafe.config.Config; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.jooby.Env; +import org.jooby.Jooby; +import org.kafkahq.models.LogDir; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +@Singleton +public class LogDirRepository extends AbstractRepository implements Jooby.Module { + public ArrayList list() throws ExecutionException, InterruptedException { + ArrayList list = new ArrayList<>(); + + for(Map.Entry> nodes : kafkaWrapper.describeLogDir().entrySet()) { + for(Map.Entry node: nodes.getValue().entrySet()) { + for(Map.Entry log: node.getValue().replicaInfos.entrySet()) { + list.add(new LogDir(nodes.getKey(), node.getKey(), log.getKey(), log.getValue())); + } + } + } + + return list; + } + + public List findByTopic(String topic) throws ExecutionException, InterruptedException { + return this.list().stream() + .filter(item -> item.getTopic().equals(topic)) + .collect(Collectors.toList()); + } + + @SuppressWarnings("NullableProblems") + @Override + public void configure(Env env, Config conf, Binder binder) { + binder.bind(LogDirRepository.class).toInstance(new LogDirRepository()); + } +} diff --git a/src/main/java/org/kafkahq/repositories/RecordRepository.java b/src/main/java/org/kafkahq/repositories/RecordRepository.java new file mode 100644 index 000000000..7f1043d72 --- /dev/null +++ b/src/main/java/org/kafkahq/repositories/RecordRepository.java @@ -0,0 +1,166 @@ +package org.kafkahq.repositories; + +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.typesafe.config.Config; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.jooby.Env; +import org.jooby.Jooby; +import org.kafkahq.models.Topic; +import org.kafkahq.modules.KafkaModule; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +@Singleton +public class RecordRepository extends AbstractRepository implements Jooby.Module { + @Inject + private KafkaModule kafkaModule; + + @Inject + private TopicRepository topicRepository; + + public List> consume(String clusterId, List topics, Options options) throws ExecutionException, InterruptedException { + return this.kafkaModule.debug(() -> { + KafkaConsumer consumer = this.kafkaModule.getConsumer(clusterId); + options.seek(topicRepository, consumer, topics); + + List> list = new ArrayList<>(); + HashMap currentOffsets = new HashMap<>(); + boolean isDifferent = true; + + + while (isDifferent) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); + HashMap previousOffsets = new HashMap<>(currentOffsets); + for (ConsumerRecord record : records) { + list.add(record); + currentOffsets.put(record.partition(), record.offset()); + } + + isDifferent = !previousOffsets.equals(currentOffsets); + } + + return list; + }, "Consume {} with options {}", topics, options); + } + + @ToString + @EqualsAndHashCode + public static class Options { + public enum Sort { + OLDEST, + NEWEST, + } + + private int size = 20; + + + public int getSize() { + return size; + } + + public Options setSize(int size) { + this.size = size; + + return this; + } + + private int start = 0; + + public int getStart() { + return start; + } + + public Options setStart(int start) { + this.start = start; + + return this; + } + + private Sort sort = Sort.OLDEST; + + public Sort getSort() { + return sort; + } + + public Options setSort(Sort sort) { + this.sort = sort; + + return this; + } + + private Pattern filter; + + public Pattern getFilter() { + return filter; + } + + public void setFilter(Pattern filter) { + this.filter = filter; + } + + public Integer partition; + + public Integer getPartition() { + return partition; + } + + public Options setPartition(int partition) { + this.partition = partition; + + return this; + } + + private void seek(TopicRepository topicRepository, KafkaConsumer consumer, List topics) throws ExecutionException, InterruptedException { + List topicsDetails = topicRepository.findByName(topics); + + // list partitons + List input = topicsDetails + .stream() + .flatMap(topic -> topic.getPartitions().stream().map(partition -> + new TopicPartition(topic.getName(), partition.getId()) + )) + .collect(Collectors.toList()); + + // filter partitions + if (this.partition != null) { + input = input.stream() + .filter(topicPartition -> topicPartition.partition() == this.partition) + .collect(Collectors.toList()); + } + + consumer.assign(input); + + // offset + if (this.start == 0 && this.sort == Options.Sort.OLDEST) { + consumer.seekToBeginning(input); + } else { + this.findOffset(topicsDetails) + .forEach(consumer::seek); + } + } + + private Map findOffset(List topicsDetails) { + return new HashMap<>(); + } + } + + @SuppressWarnings("NullableProblems") + @Override + public void configure(Env env, Config conf, Binder binder) { + binder.bind(RecordRepository.class).toInstance(new RecordRepository()); + } +} diff --git a/src/main/java/org/kafkahq/repositories/TopicRepository.java b/src/main/java/org/kafkahq/repositories/TopicRepository.java new file mode 100644 index 000000000..dfcff1d27 --- /dev/null +++ b/src/main/java/org/kafkahq/repositories/TopicRepository.java @@ -0,0 +1,84 @@ +package org.kafkahq.repositories; + +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.typesafe.config.Config; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.TopicListing; +import org.jooby.Env; +import org.jooby.Jooby; +import org.kafkahq.models.Partition; +import org.kafkahq.models.Topic; +import org.kafkahq.modules.KafkaModule; + +import java.util.*; +import java.util.concurrent.ExecutionException; + +@Singleton +public class TopicRepository extends AbstractRepository implements Jooby.Module { + @Inject + private KafkaModule kafkaModule; + + @Inject + private ConsumerGroupRepository consumerGroupRepository; + + @Inject + private LogDirRepository logDirRepository; + + public List list() throws ExecutionException, InterruptedException { + ArrayList list = new ArrayList<>(); + + Collection listTopics = kafkaWrapper.listTopics(); + + for (TopicListing item : listTopics) { + list.add(item.name()); + } + + List topics = this.findByName(list); + topics.sort(Comparator.comparing(Topic::getName)); + + return topics; + } + + public Topic findByName(String name) throws ExecutionException, InterruptedException { + Optional topics = this.findByName(new ArrayList() {{ + add(name); + }}).stream().findFirst(); + + return topics.orElseThrow(() -> new NoSuchElementException("Topic '" + name + "' doesn't exist")); + } + + public List findByName(List topics) throws ExecutionException, InterruptedException { + ArrayList list = new ArrayList<>(); + + Set> topicDescriptions = kafkaWrapper.describeTopics(topics).entrySet(); + Map> topicOffsets = kafkaWrapper.describeTopicsOffsets(topics); + + for (Map.Entry description : topicDescriptions) { + list.add( + new Topic( + description.getValue(), + consumerGroupRepository.findByTopic(description.getValue().name()), + logDirRepository.findByTopic(description.getValue().name()), + topicOffsets.get(description.getValue().name()) + ) + ); + } + + return list; + } + + public void delete(String clusterId, String name) throws ExecutionException, InterruptedException { + kafkaModule.getAdminClient(clusterId).deleteTopics(new ArrayList() {{ + add(name); + }}).all().get(); + } + + + @SuppressWarnings("NullableProblems") + @Override + public void configure(Env env, Config conf, Binder binder) { + binder.bind(TopicRepository.class).toInstance(new TopicRepository()); + } +} diff --git a/src/main/java/org/kafkahq/response/ResultStatusResponse.java b/src/main/java/org/kafkahq/response/ResultStatusResponse.java new file mode 100644 index 000000000..3ba0bd94d --- /dev/null +++ b/src/main/java/org/kafkahq/response/ResultStatusResponse.java @@ -0,0 +1,6 @@ +package org.kafkahq.response; + +public class ResultStatusResponse { + public Boolean result; + public String message; +} diff --git a/src/main/java/org/kafkahq/utils/Debug.java b/src/main/java/org/kafkahq/utils/Debug.java new file mode 100644 index 000000000..1fc2cc974 --- /dev/null +++ b/src/main/java/org/kafkahq/utils/Debug.java @@ -0,0 +1,58 @@ +package org.kafkahq.utils; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Debug { + private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getStackTrace()[2].getClassName()); + private static final String name = Thread.currentThread().getStackTrace()[2].getClassName(); + + private static String caller() { + return Thread.currentThread().getStackTrace()[3].getClassName() + " -> " + + Thread.currentThread().getStackTrace()[3].getMethodName() + " # " + + Thread.currentThread().getStackTrace()[3].getLineNumber(); + } + + public static String toJson(T arg) { + String output; + + if (arg instanceof String) { + output = (String) arg; + } else if (arg instanceof byte[]) { + output = new String((byte[]) arg); + } else { + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + output = gson.toJson(arg); + } + + return output; + } + + public static void time(String message, Runnable runnable, Object... arguments) { + long start = System.currentTimeMillis(); + + runnable.run(); + + logger.trace("[" + (System.currentTimeMillis() - start ) + " ms] " + message, arguments); + } + + @SafeVarargs + public static void print(T... args) { + System.out.println("\033[44;30m " + caller() + " \033[0m"); + + for (Object arg : args) { + System.out.println("\033[46;30m " + arg.getClass().getName() + " \033[0m \n" + toJson(arg)); + } + } + + @SafeVarargs + public static void log(T... args) { + logger.trace("\033[44;30m " + caller() + " \033[0m"); + + for (Object arg : args) { + logger.trace("\033[46;30m " + arg.getClass().getName() + " \033[0m " + toJson(arg)); + } + } +} diff --git a/yarn.lock b/yarn.lock new file mode 100644 index 000000000..964be89ad --- /dev/null +++ b/yarn.lock @@ -0,0 +1,27 @@ +# THIS IS AN AUTOGENERATED FILE. DO NOT EDIT THIS FILE DIRECTLY. +# yarn lockfile v1 + + +bootstrap@^4.1.3: + version "4.1.3" + resolved "https://registry.yarnpkg.com/bootstrap/-/bootstrap-4.1.3.tgz#0eb371af2c8448e8c210411d0cb824a6409a12be" + +font-awesome@^4.7.0: + version "4.7.0" + resolved "https://registry.yarnpkg.com/font-awesome/-/font-awesome-4.7.0.tgz#8fa8cf0411a1a31afd07b06d2902bb9fc815a133" + +jquery@^3.3.1: + version "3.3.1" + resolved "https://registry.yarnpkg.com/jquery/-/jquery-3.3.1.tgz#958ce29e81c9790f31be7792df5d4d95fc57fbca" + +popper.js@^1.14.4: + version "1.14.4" + resolved "https://registry.yarnpkg.com/popper.js/-/popper.js-1.14.4.tgz#8eec1d8ff02a5a3a152dd43414a15c7b79fd69b6" + +sweetalert2@^7.28.0: + version "7.28.0" + resolved "https://registry.yarnpkg.com/sweetalert2/-/sweetalert2-7.28.0.tgz#1833695b7ac2a80e0468ddf2b610c4eeeb4d8ee3" + +turbolinks@^5.2.0: + version "5.2.0" + resolved "https://registry.yarnpkg.com/turbolinks/-/turbolinks-5.2.0.tgz#e6877a55ea5c1cb3bb225f0a4ae303d6d32ff77c"