Skip to content

Commit

Permalink
Adds Progress reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
hey-johnnypark committed Feb 24, 2018
1 parent 86297ee commit fc90fff
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 4 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

> Measures the latency of a Kafka topic


## Getting started



<b>Optional</b>: Start Kafka in Docker
```bash
docker run --name broker --rm -p 2181:2181 -p 9092:9092 -e ADVERTISED_HOST=127.0.0.1 johnnypark/kafka-zookeeper
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<version>1.5.10.RELEASE</version>

</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
public class RateLimitedProducerService {

private static final Logger LOG = LoggerFactory.getLogger(App.class);
public static final int NUM_STEPS = 20;

@Value("${kafka.topic}")
private String topic;
Expand All @@ -42,10 +43,21 @@ public class RateLimitedProducerService {
@Autowired
private KafkaTemplate<String, byte[]> template;

private ProgressReporter reporter;

@PostConstruct
public void postConstruct() {
initRateLimiter();
initMockMessage();
initProgressLogger();
}

private void initProgressLogger() {
reporter = new ProgressReporter(numMessages);
}

private void initRateLimiter() {
rateLimiter = RateLimiter.create(ratePerSecond);
generateMessage();
}

@Async
Expand All @@ -59,13 +71,14 @@ public CompletableFuture<Long> produceMessages() {
.mapToObj(this::createProducerRecord)
.peek(this::registerRecord)
.peek(this::sendRecord)
.peek(reporter::progress)
.peek(this::rateLimit)
.count();
LOG.info("Produced {} messages", count);
LOG.info("Done producing {} messages", count);
return CompletableFuture.completedFuture(count);
}

private void generateMessage() {
private void initMockMessage() {
message = new byte[messageSize];
new Random().nextBytes(message);
LOG.info("Generated message with {} bytes", message.length);
Expand All @@ -90,4 +103,28 @@ private ProducerRecord registerRecord(ProducerRecord<String, byte[]> record) {
return record;
}

private class ProgressReporter {

private int currentProgress = 0;

private final int end;

private final int step;

private ProgressReporter(int end) {
this.end = end;
step = end / NUM_STEPS;
}

private void progress(ProducerRecord record) {
if (++currentProgress % step == 0) {
LOG.info("Produced {}/{} messages ({}%)",
currentProgress,
end,
(int)((float) currentProgress / (float) end * 100));
}
}

}

}

0 comments on commit fc90fff

Please sign in to comment.