Skip to content

Commit

Permalink
Moved tests to typescript
Browse files Browse the repository at this point in the history
  • Loading branch information
rob3000 authored and wtrocki committed Aug 20, 2020
1 parent 8f7be0d commit a9a329a
Show file tree
Hide file tree
Showing 41 changed files with 827 additions and 860 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"uuid": "~3.4.0"
},
"devDependencies": {
"@types/mocha": "^8.0.3",
"@types/node": "^14.0.27",
"@typescript-eslint/eslint-plugin": "^3.9.0",
"@typescript-eslint/parser": "^3.9.0",
Expand Down
15 changes: 2 additions & 13 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,7 @@
"use strict";

export { KStream } from "./lib/dsl/KStream";
export { KTable } from "./lib/dsl/KTable";
export { KStream, KTable } from "./lib/dsl";
export { KafkaFactory } from "./lib/KafkaFactory";
export { KafkaStreams } from "./lib/KafkaStreams";
export { KStorage } from "./lib/KStorage.js";
export { KStorage } from "./lib/KStorage";
export { KafkaClient } from "./lib/client/KafkaClient";

// module.exports = {
// default: KafkaStreams,
// KStream,
// KTable,
// KafkaFactory,
// KafkaStreams,
// KStorage,
// KafkaClient
// };
2 changes: 1 addition & 1 deletion src/lib/KStorage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Promise from "bluebird";
import { Promise } from "bluebird";

export class KStorage {
public options: any;
Expand Down
3 changes: 2 additions & 1 deletion src/lib/KafkaFactory.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import debugFactory from "debug";
const debug = debugFactory("kafka-streams:kafkafactory");
import { JSKafkaClient, NativeKafkaClient } from "./client";

const debug = debugFactory("kafka-streams:kafkafactory");

export class KafkaFactory {
public config: any;
public batchOptions: any;
Expand Down
2 changes: 0 additions & 2 deletions src/lib/KafkaStreams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,5 +157,3 @@ export class KafkaStreams extends EventEmitter {
});
}
}

export default KafkaStreams;
2 changes: 1 addition & 1 deletion src/lib/actions/KeyCount.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Promise from "bluebird";
import { Promise } from "bluebird";

/**
* used to count keys in a stream
Expand Down
2 changes: 1 addition & 1 deletion src/lib/actions/LastState.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Promise from "bluebird";
import { Promise } from "bluebird";

/**
* used to hold the last state of key values
Expand Down
2 changes: 1 addition & 1 deletion src/lib/actions/Max.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Promise from "bluebird";
import { Promise } from "bluebird";

/**
* used to grab the highest value of key values
Expand Down
2 changes: 1 addition & 1 deletion src/lib/actions/Min.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Promise from "bluebird";
import { Promise } from "bluebird";

/**
* used grab the lowest value of
Expand Down
2 changes: 1 addition & 1 deletion src/lib/actions/Sum.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Promise from "bluebird";
import { Promise } from "bluebird";

/**
* used to sum up key values in a stream
Expand Down
4 changes: 2 additions & 2 deletions src/lib/client/JSKafkaClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ export class JSKafkaClient extends KafkaClient {
if (withBackPressure) {
this.consumer = new PartitionDrainer(this.kafkaConsumerClient, workerPerPartition || 1, false, false);
} else {
this.consumer = new Drainer(this.kafkaConsumerClient, workerPerPartition, false, true);
this.consumer = new Drainer(this.kafkaConsumerClient, workerPerPartition, true, false, false);
}

//consumer has to wait for producer
Expand Down Expand Up @@ -196,7 +196,7 @@ export class JSKafkaClient extends KafkaClient {
}

this.kafkaProducerClient.becomeProducer([this.produceTopic], clientName, options);
this.producer = new Publisher(this.kafkaProducerClient, partitions || 1);
this.producer = new Publisher(this.kafkaProducerClient, partitions || 1, 0, 100);
}

/**
Expand Down
6 changes: 3 additions & 3 deletions src/lib/dsl/KStream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Promise from "bluebird";
import { Promise } from "bluebird";
import { async as createSubject } from "most-subject";
import lodashClone from "lodash.clone";
import lodashCloneDeep from "lodash.clonedeep";
import * as lodashClone from "lodash.clone";
import * as lodashCloneDeep from "lodash.clonedeep";
import { StreamDSL } from "./StreamDSL";
import { messageProduceHandle } from "../messageProduceHandle";
import { Window } from "../actions";
Expand Down
4 changes: 1 addition & 3 deletions src/lib/dsl/KTable.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"use strict";

import { EventEmitter } from "events";
import * as most from "most";
import Promise from "bluebird";
import { Promise } from "bluebird";
import { StreamDSL } from "./StreamDSL";
import { LastState } from "../actions";
import { StorageMerger } from "../StorageMerger";
Expand Down
6 changes: 3 additions & 3 deletions src/lib/dsl/StreamDSL.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EventEmitter } from "events";
import * as most from "most";
import Promise from "bluebird";
import uuid from "uuid";
import { Promise } from "bluebird";
import { v4 as uuidv4 } from "uuid";
import debugFactory from "debug";
const debug = debugFactory("kafka-streams:streamdsl");
import KStorage from "../KStorage";
Expand Down Expand Up @@ -658,7 +658,7 @@ export class StreamDSL {

this.map(message => {

const id = getId ? getId(message) : uuid.v4();
const id = getId ? getId(message) : uuidv4();

return {
payload: message,
Expand Down
3 changes: 2 additions & 1 deletion src/lib/messageProduceHandle.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import debugFactory from "debug";
const debug = debugFactory("kafka-streams:mph");
import PRODUCE_TYPES from "./produceTypes";

const debug = debugFactory("kafka-streams:mph");

/**
* returns true if the message is an object
* with key and value fields
Expand Down
122 changes: 0 additions & 122 deletions test/int/ConsumerProducerE2E.test.js

This file was deleted.

120 changes: 120 additions & 0 deletions test/int/ConsumerProducerE2E.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import { KafkaStreams } from "../../src/index";
import { nativeConfig as config } from "../test-config";

const keyValueMapperEtl = (message) => {
console.log(message);
const elements = message.toLowerCase().split(" ");
return {
key: elements[0],
value: elements[1]
};
};

/*
E2E or integration tests using a kafka broker are always
a bit flakey, with the right configuration and enough patience (mocha timeouts)
it is relatively possible.
*/

describe("E2E INT", () => {

let kafkaStreams = null;

const topic = "my-input-topic";
const outputTopic = "my-output-topic";

const messages = [
"bla",
"blup",
"bluuu",
"bla",
"bla",
"blup",
"xd",
"12x3"
];

before(() => {
kafkaStreams = new KafkaStreams(config);
});

after(async () => {
await kafkaStreams.closeAll();
});

it("should be able to produce to a topic via stream", done => {

const stream = kafkaStreams.getKStream();
stream.to(topic);

let count = 0;
stream.createAndSetProduceHandler().on("delivered", message => {
console.log(message.value);
count++;
if (count === messages.length) {
setTimeout(done, 250);
}
});

stream.start().then(() => {
console.log("started");
stream.writeToStream(messages);
}).catch((error) => {
done(error);
});
});

it("should give kafka some time", done => {
setTimeout(done, 2500);
});

it("should run complexer wordcount sample", done => {

const stream = kafkaStreams.getKStream();

stream
.from(topic)
.mapJSONConvenience() //buffer -> json
.mapWrapKafkaValue() //message.value -> value
.map(keyValueMapperEtl)
.countByKey("key", "count")
.filter(kv => kv.count >= 2)
.map(kv => kv.key + " " + kv.count)
.tap(_ => { })
.wrapAsKafkaValue()
.to(outputTopic);

let count = 0;
stream.createAndSetProduceHandler().on("delivered", () => {
count++;
if (count === 2) {
setTimeout(done, 250);
}
});

stream.start();
});

it("should give kafka some time again", done => {
setTimeout(done, 2500);
});

it("should be able to consume produced wordcount results", done => {

const stream = kafkaStreams.getKStream();

let count = 0;
stream
.from(outputTopic)
.mapJSONConvenience() //buffer -> json
.tap(_ => {
count++;
if (count === 2) {
setTimeout(done, 100);
}
})
.forEach(console.log);

stream.start();
});
});
Loading

0 comments on commit a9a329a

Please sign in to comment.