diff --git a/build.gradle b/build.gradle
index 1a8fad7bc..52e9e798f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,6 +1,6 @@
buildscript {
ext {
- joobyVersion = "1.5.1"
+ joobyVersion = "1.5.+"
}
repositories {
@@ -73,6 +73,8 @@ dependencies {
// test
testCompile "junit:junit:4.12"
+ testCompile 'com.salesforce.kafka.test:kafka-junit4:3.0.1'
+ testCompile 'org.apache.kafka:kafka_2.11:2.0.0'
testCompile "io.rest-assured:rest-assured:3.1.0"
}
diff --git a/conf/application.test.conf b/conf/application.test.conf
new file mode 100644
index 000000000..be6ad312d
--- /dev/null
+++ b/conf/application.test.conf
@@ -0,0 +1,7 @@
+application {
+ port: 28186
+}
+
+server {
+ join: false
+}
\ No newline at end of file
diff --git a/conf/logback.xml b/conf/logback.xml
index 21ec88f64..aa1e0f2f7 100644
--- a/conf/logback.xml
+++ b/conf/logback.xml
@@ -33,7 +33,16 @@
+
+
+
+
+
+
+
+
+
diff --git a/public/includes/topic.ftl b/public/includes/topic.ftl
new file mode 100644
index 000000000..e60b4ba31
--- /dev/null
+++ b/public/includes/topic.ftl
@@ -0,0 +1,111 @@
+<#macro data datas>
+
+
<#if tab == "data">
-
-
-
-
- Key |
- Date |
- Partition |
- Offset |
-
-
-
- <#if datas?size == 0>
-
-
-
- No data available
-
- |
-
- #if>
- <#list datas as data>
-
- ${data.key()!'null'} |
- ${data.timestamp()?number_to_datetime?string.medium_short} |
- ${data.partition()} |
- ${data.offset()} |
-
-
-
- ${data.value()!'null'}
- |
-
- #list>
-
-
-
+ <@topicTemplate.data datas />
#if>
diff --git a/src/main/java/org/kafkahq/models/Partition.java b/src/main/java/org/kafkahq/models/Partition.java
index 12119a49d..588bb2aa0 100644
--- a/src/main/java/org/kafkahq/models/Partition.java
+++ b/src/main/java/org/kafkahq/models/Partition.java
@@ -9,8 +9,9 @@
@ToString
@EqualsAndHashCode
public class Partition {
- public Partition(TopicPartitionInfo partitionInfo, LogDir logDir, Offsets offsets) {
+ public Partition(String topic, TopicPartitionInfo partitionInfo, LogDir logDir, Offsets offsets) {
this.id = partitionInfo.partition();
+ this.topic = topic;
this.logDir = logDir;
this.firstOffset = offsets.getFirstOffset();
this.lastOffset = offsets.getLastOffset();
@@ -31,6 +32,12 @@ public int getId() {
return id;
}
+ private final String topic;
+
+ public String getTopic() {
+ return topic;
+ }
+
private final List
nodes;
public List getNodes() {
diff --git a/src/main/java/org/kafkahq/models/Topic.java b/src/main/java/org/kafkahq/models/Topic.java
index 6ffa43c83..d12a7e3cc 100644
--- a/src/main/java/org/kafkahq/models/Topic.java
+++ b/src/main/java/org/kafkahq/models/Topic.java
@@ -25,6 +25,7 @@ public Topic(
for (TopicPartitionInfo partition : description.partitions()) {
this.partitions.add(new Partition(
+ description.name(),
partition,
logDirs.stream()
.filter(logDir -> logDir.getPartition() == partition.partition())
diff --git a/src/main/java/org/kafkahq/repositories/RecordRepository.java b/src/main/java/org/kafkahq/repositories/RecordRepository.java
index 7f1043d72..313ff891a 100644
--- a/src/main/java/org/kafkahq/repositories/RecordRepository.java
+++ b/src/main/java/org/kafkahq/repositories/RecordRepository.java
@@ -1,5 +1,6 @@
package org.kafkahq.repositories;
+import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -12,46 +13,68 @@
import org.apache.kafka.common.TopicPartition;
import org.jooby.Env;
import org.jooby.Jooby;
+import org.kafkahq.models.Partition;
import org.kafkahq.models.Topic;
import org.kafkahq.modules.KafkaModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
@Singleton
public class RecordRepository extends AbstractRepository implements Jooby.Module {
- @Inject
private KafkaModule kafkaModule;
- @Inject
private TopicRepository topicRepository;
+ private static Logger logger = LoggerFactory.getLogger(RecordRepository.class);
+
+ @Inject
+ public RecordRepository(KafkaModule kafkaModule, TopicRepository topicRepository) {
+ this.kafkaModule = kafkaModule;
+ this.topicRepository = topicRepository;
+ }
+
public List> consume(String clusterId, List topics, Options options) throws ExecutionException, InterruptedException {
return this.kafkaModule.debug(() -> {
+ List topicsDetail = topicRepository.findByName(topics);
KafkaConsumer consumer = this.kafkaModule.getConsumer(clusterId);
- options.seek(topicRepository, consumer, topics);
+ Map> assigments = new TreeMap<>();
List> list = new ArrayList<>();
- HashMap currentOffsets = new HashMap<>();
- boolean isDifferent = true;
-
- while (isDifferent) {
- ConsumerRecords records = consumer.poll(Duration.ofMillis(200));
- HashMap previousOffsets = new HashMap<>(currentOffsets);
- for (ConsumerRecord record : records) {
- list.add(record);
- currentOffsets.put(record.partition(), record.offset());
- }
-
- isDifferent = !previousOffsets.equals(currentOffsets);
- }
+ topicsDetail.forEach(topic -> topic
+ .getPartitions()
+ .forEach(partition -> {
+ assigments.putAll(options.seek(consumer, partition));
+
+ Long endOffset = assigments.get(partition.getTopic()).get(partition.getId());
+ long currentOffset = 0L;
+
+ while (currentOffset < endOffset) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord record : records) {
+ logger.trace(
+ "Record topic {} partion {} offset {}",
+ partition.getTopic(),
+ partition.getId(),
+ record.offset()
+ );
+
+ currentOffset = record.offset();
+
+ if (currentOffset >= endOffset) {
+ break;
+ } else {
+ list.add(record);
+ }
+ }
+ }
+ })
+ );
return list;
}, "Consume {} with options {}", topics, options);
@@ -65,7 +88,7 @@ public enum Sort {
NEWEST,
}
- private int size = 20;
+ private int size = 1;
public int getSize() {
@@ -124,43 +147,48 @@ public Options setPartition(int partition) {
return this;
}
- private void seek(TopicRepository topicRepository, KafkaConsumer consumer, List topics) throws ExecutionException, InterruptedException {
- List topicsDetails = topicRepository.findByName(topics);
-
- // list partitons
- List input = topicsDetails
- .stream()
- .flatMap(topic -> topic.getPartitions().stream().map(partition ->
- new TopicPartition(topic.getName(), partition.getId())
- ))
- .collect(Collectors.toList());
-
- // filter partitions
- if (this.partition != null) {
- input = input.stream()
- .filter(topicPartition -> topicPartition.partition() == this.partition)
- .collect(Collectors.toList());
- }
+ public Map> seek(KafkaConsumer consumer, Partition partition) {
+ TopicPartition topicPartition = new TopicPartition(
+ partition.getTopic(),
+ partition.getId()
+ );
- consumer.assign(input);
+ consumer.assign(Collections.singleton(topicPartition));
+ long begin;
+ long last;
- // offset
if (this.start == 0 && this.sort == Options.Sort.OLDEST) {
- consumer.seekToBeginning(input);
+ begin = partition.getFirstOffset();
+ last = partition.getFirstOffset() + this.size > partition.getLastOffset() ?
+ partition.getLastOffset() :
+ partition.getFirstOffset() + this.size;
} else {
- this.findOffset(topicsDetails)
- .forEach(consumer::seek);
+ if (partition.getLastOffset() - this.size < partition.getFirstOffset()) {
+ begin = partition.getFirstOffset();
+ last = partition.getLastOffset();
+ } else {
+ begin = partition.getLastOffset() - this.size;
+ last = partition.getLastOffset();
+ }
}
- }
- private Map findOffset(List topicsDetails) {
- return new HashMap<>();
+ consumer.seek(topicPartition, begin);
+
+ logger.trace(
+ "Consume topic {} partion {} from {} to {}",
+ partition.getTopic(),
+ partition.getId(),
+ begin,
+ last - 1
+ );
+
+ return ImmutableMap.of(partition.getTopic(), ImmutableMap.of(partition.getId(), last));
}
}
@SuppressWarnings("NullableProblems")
@Override
public void configure(Env env, Config conf, Binder binder) {
- binder.bind(RecordRepository.class).toInstance(new RecordRepository());
+ binder.bind(RecordRepository.class).to(RecordRepository.class);
}
}
diff --git a/src/main/java/org/kafkahq/repositories/TopicRepository.java b/src/main/java/org/kafkahq/repositories/TopicRepository.java
index c0ab08d51..e5a2ef6c5 100644
--- a/src/main/java/org/kafkahq/repositories/TopicRepository.java
+++ b/src/main/java/org/kafkahq/repositories/TopicRepository.java
@@ -17,15 +17,19 @@
@Singleton
public class TopicRepository extends AbstractRepository implements Jooby.Module {
- @Inject
private KafkaModule kafkaModule;
- @Inject
private ConsumerGroupRepository consumerGroupRepository;
- @Inject
private LogDirRepository logDirRepository;
+ @Inject
+ public TopicRepository(KafkaModule kafkaModule, ConsumerGroupRepository consumerGroupRepository, LogDirRepository logDirRepository) {
+ this.kafkaModule = kafkaModule;
+ this.consumerGroupRepository = consumerGroupRepository;
+ this.logDirRepository = logDirRepository;
+ }
+
public List list() throws ExecutionException, InterruptedException {
ArrayList list = new ArrayList<>();
@@ -78,6 +82,6 @@ public void delete(String clusterId, String name) throws ExecutionException, Int
@SuppressWarnings("NullableProblems")
@Override
public void configure(Env env, Config conf, Binder binder) {
- binder.bind(TopicRepository.class).toInstance(new TopicRepository());
+ binder.bind(TopicRepository.class).asEagerSingleton();
}
}
diff --git a/src/test/java/org/kafkahq/BaseTest.java b/src/test/java/org/kafkahq/BaseTest.java
new file mode 100644
index 000000000..01918f545
--- /dev/null
+++ b/src/test/java/org/kafkahq/BaseTest.java
@@ -0,0 +1,58 @@
+package org.kafkahq;
+
+import com.salesforce.kafka.test.KafkaTestUtils;
+import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.kafkahq.modules.KafkaModule;
+import org.kafkahq.modules.KafkaWrapper;
+import org.kafkahq.repositories.AbstractRepository;
+import org.kafkahq.repositories.RecordRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseTest {
+ private static Logger logger = LoggerFactory.getLogger(RecordRepository.class);
+
+ public static App app = new App();
+
+ @ClassRule
+ public static final SharedKafkaTestResource kafkaTestResource = new SharedKafkaTestResource()
+ .withBrokers(3);
+
+ @Before
+ public void setup() {
+ // kafka test connections
+ app.use(ConfigFactory
+ .empty()
+ .withValue(
+ "kafka.connections.test.bootstrap.servers",
+ ConfigValueFactory.fromAnyRef(kafkaTestResource.getKafkaConnectString())
+ )
+ .withFallback(ConfigFactory.load("application"))
+ );
+
+ // bootstrap app
+ app.start("test");
+ AbstractRepository.setWrapper(new KafkaWrapper(app.require(KafkaModule.class), "test"));
+
+ // test data
+ KafkaTestUtils kafkaTestUtils = kafkaTestResource.getKafkaTestUtils();
+
+ kafkaTestUtils.createTopic("empty", 12, (short) 3);
+
+ kafkaTestUtils.createTopic("random", 3, (short) 3);
+ for (int i = 0; i < 3; i++) {
+ kafkaTestUtils.produceRecords(100, "random", i);
+ }
+ logger.info("Kafka server started with test data on {}", kafkaTestResource.getKafkaConnectString());
+ }
+
+ @After
+ public void tearDown() {
+ app.stop();
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/kafkahq/repositories/TopicRepositoryTest.java b/src/test/java/org/kafkahq/repositories/TopicRepositoryTest.java
new file mode 100644
index 000000000..feea0ea62
--- /dev/null
+++ b/src/test/java/org/kafkahq/repositories/TopicRepositoryTest.java
@@ -0,0 +1,15 @@
+package org.kafkahq.repositories;
+
+import org.junit.Test;
+import org.kafkahq.BaseTest;
+
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TopicRepositoryTest extends BaseTest {
+ @Test
+ public void list() throws ExecutionException, InterruptedException {
+ assertEquals(2, app.require(TopicRepository.class).list().size());
+ }
+}
\ No newline at end of file