From 4daba2d726fff0b2301a10aedb8c488d7dc55db4 Mon Sep 17 00:00:00 2001 From: tchiotludo Date: Sun, 30 Sep 2018 14:04:51 +0200 Subject: [PATCH] Init Unit Test --- build.gradle | 4 +- conf/application.test.conf | 7 + conf/logback.xml | 9 ++ public/includes/topic.ftl | 111 ++++++++++++++++ public/static/template.scss | 75 +++++++---- public/topic.ftl | 38 +----- .../java/org/kafkahq/models/Partition.java | 9 +- src/main/java/org/kafkahq/models/Topic.java | 1 + .../repositories/RecordRepository.java | 124 +++++++++++------- .../kafkahq/repositories/TopicRepository.java | 12 +- src/test/java/org/kafkahq/BaseTest.java | 58 ++++++++ .../repositories/TopicRepositoryTest.java | 15 +++ 12 files changed, 344 insertions(+), 119 deletions(-) create mode 100644 conf/application.test.conf create mode 100644 public/includes/topic.ftl create mode 100644 src/test/java/org/kafkahq/BaseTest.java create mode 100644 src/test/java/org/kafkahq/repositories/TopicRepositoryTest.java diff --git a/build.gradle b/build.gradle index 1a8fad7bc..52e9e798f 100644 --- a/build.gradle +++ b/build.gradle @@ -1,6 +1,6 @@ buildscript { ext { - joobyVersion = "1.5.1" + joobyVersion = "1.5.+" } repositories { @@ -73,6 +73,8 @@ dependencies { // test testCompile "junit:junit:4.12" + testCompile 'com.salesforce.kafka.test:kafka-junit4:3.0.1' + testCompile 'org.apache.kafka:kafka_2.11:2.0.0' testCompile "io.rest-assured:rest-assured:3.1.0" } diff --git a/conf/application.test.conf b/conf/application.test.conf new file mode 100644 index 000000000..be6ad312d --- /dev/null +++ b/conf/application.test.conf @@ -0,0 +1,7 @@ +application { + port: 28186 +} + +server { + join: false +} \ No newline at end of file diff --git a/conf/logback.xml b/conf/logback.xml index 21ec88f64..aa1e0f2f7 100644 --- a/conf/logback.xml +++ b/conf/logback.xml @@ -33,7 +33,16 @@ + + + + + + + + + diff --git a/public/includes/topic.ftl b/public/includes/topic.ftl new file mode 100644 index 000000000..e60b4ba31 --- /dev/null +++ b/public/includes/topic.ftl @@ -0,0 +1,111 @@ +<#macro data datas> + +
+ + + + + + + + + + + <#if datas?size == 0> + + + + + <#list datas as data> + + + + + + + + + + + +
KeyDatePartitionOffset
+ +
${data.key()!'null'}${data.timestamp()?number_to_datetime?string.medium_short}${data.partition()}${data.offset()}
+
${data.value()!'null'}
+
+
+ diff --git a/public/static/template.scss b/public/static/template.scss index 0031726d5..b72b13670 100644 --- a/public/static/template.scss +++ b/public/static/template.scss @@ -45,6 +45,11 @@ a, button, input[type="submit"] { transition-duration: 200ms; } +body { + min-width: 320px; + +} + .title { display: flex; position: relative; @@ -149,45 +154,47 @@ h1 { } } } -} -a { - &[aria-expanded="true"] { - color: #fff; - background: $tertiary; - } - &[data-toggle="collapse"] { - position: relative; + + a { + &[aria-expanded="true"] { + color: #fff; + background: $tertiary; + } + &[data-toggle="collapse"] { + position: relative; + } } -} -.dropdown-toggle::after { - display: block; - position: absolute; - top: 50%; - right: 20px; - transform: translateY(-50%); -} + .dropdown-toggle::after { + display: block; + position: absolute; + top: 50%; + right: 20px; + transform: translateY(-50%); + } -ul ul a { - font-size: 0.9em !important; - padding-left: 30px !important; - background: $tertiary; -} + ul ul a { + font-size: 0.9em !important; + padding-left: 30px !important; + background: $tertiary; + } -@media (max-width: 768px) { - #sidebar { - margin-left: -250px; - &.active { - margin-left: 0; + @media (max-width: 768px) { + & { + margin-left: -250px; + &.active { + margin-left: 0; + } + } + #sidebar-collapse span { + display: none; } - } - #sidebar-collapse span { - display: none; } } + /* ----------------------------------------------------------------------------------------------------------------- *\ Turbolinks \* ----------------------------------------------------------------------------------------------------------------- */ @@ -220,4 +227,14 @@ table { word-break: break-all; } } +} + +/* ----------------------------------------------------------------------------------------------------------------- *\ + Data +\* ----------------------------------------------------------------------------------------------------------------- */ +.data-filter { + > nav { + display: flex; + order: 2; + } } \ No newline at end of file diff --git a/public/topic.ftl b/public/topic.ftl index ee600ae99..99e181e18 100644 --- a/public/topic.ftl +++ b/public/topic.ftl @@ -7,6 +7,7 @@ <#import "/includes/node.ftl" as nodeTemplate> <#import "/includes/group.ftl" as groupTemplate> <#import "/includes/functions.ftl" as functions> +<#import "/includes/topic.ftl" as topicTemplate> <@template.header "Topic: " + topic.getName(), "topic" /> @@ -32,42 +33,7 @@
<#if tab == "data">
-
- - - - - - - - - - - <#if datas?size == 0> - - - - - <#list datas as data> - - - - - - - - - - - -
KeyDatePartitionOffset
- -
${data.key()!'null'}${data.timestamp()?number_to_datetime?string.medium_short}${data.partition()}${data.offset()}
-
${data.value()!'null'}
-
-
+ <@topicTemplate.data datas />
diff --git a/src/main/java/org/kafkahq/models/Partition.java b/src/main/java/org/kafkahq/models/Partition.java index 12119a49d..588bb2aa0 100644 --- a/src/main/java/org/kafkahq/models/Partition.java +++ b/src/main/java/org/kafkahq/models/Partition.java @@ -9,8 +9,9 @@ @ToString @EqualsAndHashCode public class Partition { - public Partition(TopicPartitionInfo partitionInfo, LogDir logDir, Offsets offsets) { + public Partition(String topic, TopicPartitionInfo partitionInfo, LogDir logDir, Offsets offsets) { this.id = partitionInfo.partition(); + this.topic = topic; this.logDir = logDir; this.firstOffset = offsets.getFirstOffset(); this.lastOffset = offsets.getLastOffset(); @@ -31,6 +32,12 @@ public int getId() { return id; } + private final String topic; + + public String getTopic() { + return topic; + } + private final List nodes; public List getNodes() { diff --git a/src/main/java/org/kafkahq/models/Topic.java b/src/main/java/org/kafkahq/models/Topic.java index 6ffa43c83..d12a7e3cc 100644 --- a/src/main/java/org/kafkahq/models/Topic.java +++ b/src/main/java/org/kafkahq/models/Topic.java @@ -25,6 +25,7 @@ public Topic( for (TopicPartitionInfo partition : description.partitions()) { this.partitions.add(new Partition( + description.name(), partition, logDirs.stream() .filter(logDir -> logDir.getPartition() == partition.partition()) diff --git a/src/main/java/org/kafkahq/repositories/RecordRepository.java b/src/main/java/org/kafkahq/repositories/RecordRepository.java index 7f1043d72..313ff891a 100644 --- a/src/main/java/org/kafkahq/repositories/RecordRepository.java +++ b/src/main/java/org/kafkahq/repositories/RecordRepository.java @@ -1,5 +1,6 @@ package org.kafkahq.repositories; +import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -12,46 +13,68 @@ import org.apache.kafka.common.TopicPartition; import org.jooby.Env; import org.jooby.Jooby; +import org.kafkahq.models.Partition; import org.kafkahq.models.Topic; import org.kafkahq.modules.KafkaModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; -import java.util.stream.Collectors; @Singleton public class RecordRepository extends AbstractRepository implements Jooby.Module { - @Inject private KafkaModule kafkaModule; - @Inject private TopicRepository topicRepository; + private static Logger logger = LoggerFactory.getLogger(RecordRepository.class); + + @Inject + public RecordRepository(KafkaModule kafkaModule, TopicRepository topicRepository) { + this.kafkaModule = kafkaModule; + this.topicRepository = topicRepository; + } + public List> consume(String clusterId, List topics, Options options) throws ExecutionException, InterruptedException { return this.kafkaModule.debug(() -> { + List topicsDetail = topicRepository.findByName(topics); KafkaConsumer consumer = this.kafkaModule.getConsumer(clusterId); - options.seek(topicRepository, consumer, topics); + Map> assigments = new TreeMap<>(); List> list = new ArrayList<>(); - HashMap currentOffsets = new HashMap<>(); - boolean isDifferent = true; - - while (isDifferent) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); - HashMap previousOffsets = new HashMap<>(currentOffsets); - for (ConsumerRecord record : records) { - list.add(record); - currentOffsets.put(record.partition(), record.offset()); - } - - isDifferent = !previousOffsets.equals(currentOffsets); - } + topicsDetail.forEach(topic -> topic + .getPartitions() + .forEach(partition -> { + assigments.putAll(options.seek(consumer, partition)); + + Long endOffset = assigments.get(partition.getTopic()).get(partition.getId()); + long currentOffset = 0L; + + while (currentOffset < endOffset) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + logger.trace( + "Record topic {} partion {} offset {}", + partition.getTopic(), + partition.getId(), + record.offset() + ); + + currentOffset = record.offset(); + + if (currentOffset >= endOffset) { + break; + } else { + list.add(record); + } + } + } + }) + ); return list; }, "Consume {} with options {}", topics, options); @@ -65,7 +88,7 @@ public enum Sort { NEWEST, } - private int size = 20; + private int size = 1; public int getSize() { @@ -124,43 +147,48 @@ public Options setPartition(int partition) { return this; } - private void seek(TopicRepository topicRepository, KafkaConsumer consumer, List topics) throws ExecutionException, InterruptedException { - List topicsDetails = topicRepository.findByName(topics); - - // list partitons - List input = topicsDetails - .stream() - .flatMap(topic -> topic.getPartitions().stream().map(partition -> - new TopicPartition(topic.getName(), partition.getId()) - )) - .collect(Collectors.toList()); - - // filter partitions - if (this.partition != null) { - input = input.stream() - .filter(topicPartition -> topicPartition.partition() == this.partition) - .collect(Collectors.toList()); - } + public Map> seek(KafkaConsumer consumer, Partition partition) { + TopicPartition topicPartition = new TopicPartition( + partition.getTopic(), + partition.getId() + ); - consumer.assign(input); + consumer.assign(Collections.singleton(topicPartition)); + long begin; + long last; - // offset if (this.start == 0 && this.sort == Options.Sort.OLDEST) { - consumer.seekToBeginning(input); + begin = partition.getFirstOffset(); + last = partition.getFirstOffset() + this.size > partition.getLastOffset() ? + partition.getLastOffset() : + partition.getFirstOffset() + this.size; } else { - this.findOffset(topicsDetails) - .forEach(consumer::seek); + if (partition.getLastOffset() - this.size < partition.getFirstOffset()) { + begin = partition.getFirstOffset(); + last = partition.getLastOffset(); + } else { + begin = partition.getLastOffset() - this.size; + last = partition.getLastOffset(); + } } - } - private Map findOffset(List topicsDetails) { - return new HashMap<>(); + consumer.seek(topicPartition, begin); + + logger.trace( + "Consume topic {} partion {} from {} to {}", + partition.getTopic(), + partition.getId(), + begin, + last - 1 + ); + + return ImmutableMap.of(partition.getTopic(), ImmutableMap.of(partition.getId(), last)); } } @SuppressWarnings("NullableProblems") @Override public void configure(Env env, Config conf, Binder binder) { - binder.bind(RecordRepository.class).toInstance(new RecordRepository()); + binder.bind(RecordRepository.class).to(RecordRepository.class); } } diff --git a/src/main/java/org/kafkahq/repositories/TopicRepository.java b/src/main/java/org/kafkahq/repositories/TopicRepository.java index c0ab08d51..e5a2ef6c5 100644 --- a/src/main/java/org/kafkahq/repositories/TopicRepository.java +++ b/src/main/java/org/kafkahq/repositories/TopicRepository.java @@ -17,15 +17,19 @@ @Singleton public class TopicRepository extends AbstractRepository implements Jooby.Module { - @Inject private KafkaModule kafkaModule; - @Inject private ConsumerGroupRepository consumerGroupRepository; - @Inject private LogDirRepository logDirRepository; + @Inject + public TopicRepository(KafkaModule kafkaModule, ConsumerGroupRepository consumerGroupRepository, LogDirRepository logDirRepository) { + this.kafkaModule = kafkaModule; + this.consumerGroupRepository = consumerGroupRepository; + this.logDirRepository = logDirRepository; + } + public List list() throws ExecutionException, InterruptedException { ArrayList list = new ArrayList<>(); @@ -78,6 +82,6 @@ public void delete(String clusterId, String name) throws ExecutionException, Int @SuppressWarnings("NullableProblems") @Override public void configure(Env env, Config conf, Binder binder) { - binder.bind(TopicRepository.class).toInstance(new TopicRepository()); + binder.bind(TopicRepository.class).asEagerSingleton(); } } diff --git a/src/test/java/org/kafkahq/BaseTest.java b/src/test/java/org/kafkahq/BaseTest.java new file mode 100644 index 000000000..01918f545 --- /dev/null +++ b/src/test/java/org/kafkahq/BaseTest.java @@ -0,0 +1,58 @@ +package org.kafkahq; + +import com.salesforce.kafka.test.KafkaTestUtils; +import com.salesforce.kafka.test.junit4.SharedKafkaTestResource; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.kafkahq.modules.KafkaModule; +import org.kafkahq.modules.KafkaWrapper; +import org.kafkahq.repositories.AbstractRepository; +import org.kafkahq.repositories.RecordRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BaseTest { + private static Logger logger = LoggerFactory.getLogger(RecordRepository.class); + + public static App app = new App(); + + @ClassRule + public static final SharedKafkaTestResource kafkaTestResource = new SharedKafkaTestResource() + .withBrokers(3); + + @Before + public void setup() { + // kafka test connections + app.use(ConfigFactory + .empty() + .withValue( + "kafka.connections.test.bootstrap.servers", + ConfigValueFactory.fromAnyRef(kafkaTestResource.getKafkaConnectString()) + ) + .withFallback(ConfigFactory.load("application")) + ); + + // bootstrap app + app.start("test"); + AbstractRepository.setWrapper(new KafkaWrapper(app.require(KafkaModule.class), "test")); + + // test data + KafkaTestUtils kafkaTestUtils = kafkaTestResource.getKafkaTestUtils(); + + kafkaTestUtils.createTopic("empty", 12, (short) 3); + + kafkaTestUtils.createTopic("random", 3, (short) 3); + for (int i = 0; i < 3; i++) { + kafkaTestUtils.produceRecords(100, "random", i); + } + logger.info("Kafka server started with test data on {}", kafkaTestResource.getKafkaConnectString()); + } + + @After + public void tearDown() { + app.stop(); + } +} \ No newline at end of file diff --git a/src/test/java/org/kafkahq/repositories/TopicRepositoryTest.java b/src/test/java/org/kafkahq/repositories/TopicRepositoryTest.java new file mode 100644 index 000000000..feea0ea62 --- /dev/null +++ b/src/test/java/org/kafkahq/repositories/TopicRepositoryTest.java @@ -0,0 +1,15 @@ +package org.kafkahq.repositories; + +import org.junit.Test; +import org.kafkahq.BaseTest; + +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; + +public class TopicRepositoryTest extends BaseTest { + @Test + public void list() throws ExecutionException, InterruptedException { + assertEquals(2, app.require(TopicRepository.class).list().size()); + } +} \ No newline at end of file