Skip to content

Commit

Permalink
Add incremental assign and unassign support
Browse files Browse the repository at this point in the history
  • Loading branch information
thynson committed Aug 11, 2021
1 parent 4885043 commit ce559e0
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 0 deletions.
4 changes: 4 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(): void;

incrementalAssign(assigments: Assignment[]): this;

incrementalUnassign(assignments: Assignment[]): this;

getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;

offsetsStore(topicPartitions: TopicPartitionOffset[]): any;
Expand Down
25 changes: 25 additions & 0 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,19 @@ KafkaConsumer.prototype.assign = function(assignments) {
return this;
};

/**
* Incremental assign the consumer specific partitions and topics
*
* @param {array} assignments - Assignments array. Should contain
* objects with topic and partition set.
* @return {Client} - Returns itself
*/

KafkaConsumer.prototype.incrementalAssign = function(assignments) {
this._client.incrementalAssign(TopicPartition.map(assignments));
return this;
};

/**
* Unassign the consumer from its assigned partitions and topics.
*
Expand All @@ -275,6 +288,18 @@ KafkaConsumer.prototype.unassign = function() {
return this;
};

/**
* Incremental unassign the consumer from specific partitions and topics
*
* @param {array} assignments - Assignments array. Should contain
* objects with topic and partition set.
* @return {Client} - Returns itself
*/

KafkaConsumer.prototype.incrementalUnassign = function(assignments) {
this._client.incrementalUnassign(TopicPartition.map(assignments));
return this;
};

/**
* Get the assignments for the consumer
Expand Down
171 changes: 171 additions & 0 deletions src/kafka-consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,31 @@ Baton KafkaConsumer::Assign(std::vector<RdKafka::TopicPartition*> partitions) {
return Baton(errcode);
}

Baton KafkaConsumer::IncrementalAssign(std::vector<RdKafka::TopicPartition*> partitions) {
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE, "KafkaConsumer is disconnected");
}

RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);

RdKafka::Error *e = consumer->incremental_assign(partitions);

if (e) {
RdKafka::ErrorCode errcode = e->code();
delete e;
return Baton(errcode);
}

m_partition_cnt += partitions.size();
for (auto i = partitions.begin(); i != partitions.end(); ++i) {
m_partitions.push_back(*i);
}
partitions.clear();

return Baton(RdKafka::ERR_NO_ERROR);
}

Baton KafkaConsumer::Unassign() {
if (!IsClosing() && !IsConnected()) {
return Baton(RdKafka::ERR__STATE);
Expand All @@ -193,12 +218,41 @@ Baton KafkaConsumer::Unassign() {

// Destroy the old list of partitions since we are no longer using it
RdKafka::TopicPartition::destroy(m_partitions);
m_partitions.clear();

m_partition_cnt = 0;

return Baton(RdKafka::ERR_NO_ERROR);
}

Baton KafkaConsumer::IncrementalUnassign(std::vector<RdKafka::TopicPartition*> partitions) {
if (!IsClosing() && !IsConnected()) {
return Baton(RdKafka::ERR__STATE);
}

RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);

RdKafka::Error *e = consumer->incremental_unassign(partitions);
if (e) {
RdKafka::ErrorCode errcode = e->code();
delete e;
return Baton(errcode);
}

// Destroy the old list of partitions since we are no longer using it
RdKafka::TopicPartition::destroy(partitions);

m_partitions.erase(
std::remove_if(m_partitions.begin(), m_partitions.end(), [&partitions](RdKafka::TopicPartition *x) -> bool {
return std::find(partitions.begin(), partitions.end(), x) != partitions.end();
}),
m_partitions.end()
);
m_partition_cnt -= partitions.size();
return Baton(RdKafka::ERR_NO_ERROR);
}

Baton KafkaConsumer::Commit(std::vector<RdKafka::TopicPartition*> toppars) {
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE);
Expand Down Expand Up @@ -525,7 +579,9 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
Nan::SetPrototypeMethod(tpl, "committed", NodeCommitted);
Nan::SetPrototypeMethod(tpl, "position", NodePosition);
Nan::SetPrototypeMethod(tpl, "assign", NodeAssign);
Nan::SetPrototypeMethod(tpl, "incrementalAssign", NodeIncrementalAssign);
Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign);
Nan::SetPrototypeMethod(tpl, "incrementalUnassign", NodeIncrementalUnassign);
Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments);

Nan::SetPrototypeMethod(tpl, "commit", NodeCommit);
Expand Down Expand Up @@ -757,6 +813,64 @@ NAN_METHOD(KafkaConsumer::NodeAssign) {
info.GetReturnValue().Set(Nan::True());
}

NAN_METHOD(KafkaConsumer::NodeIncrementalAssign) {
Nan::HandleScope scope;

if (info.Length() < 1 || !info[0]->IsArray()) {
// Just throw an exception
return Nan::ThrowError("Need to specify an array of partitions");
}

v8::Local<v8::Array> partitions = info[0].As<v8::Array>();
std::vector<RdKafka::TopicPartition*> topic_partitions;

for (unsigned int i = 0; i < partitions->Length(); ++i) {
v8::Local<v8::Value> partition_obj_value;
if (!(
Nan::Get(partitions, i).ToLocal(&partition_obj_value) &&
partition_obj_value->IsObject())) {
Nan::ThrowError("Must pass topic-partition objects");
}

v8::Local<v8::Object> partition_obj = partition_obj_value.As<v8::Object>();

// Got the object
int64_t partition = GetParameter<int64_t>(partition_obj, "partition", -1);
std::string topic = GetParameter<std::string>(partition_obj, "topic", "");

if (!topic.empty()) {
RdKafka::TopicPartition* part;

if (partition < 0) {
part = Connection::GetPartition(topic);
} else {
part = Connection::GetPartition(topic, partition);
}

// Set the default value to offset invalid. If provided, we will not set
// the offset.
int64_t offset = GetParameter<int64_t>(
partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID);
if (offset != RdKafka::Topic::OFFSET_INVALID) {
part->set_offset(offset);
}

topic_partitions.push_back(part);
}
}

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());

// Hand over the partitions to the consumer.
Baton b = consumer->IncrementalAssign(topic_partitions);

if (b.err() != RdKafka::ERR_NO_ERROR) {
Nan::ThrowError(RdKafka::err2str(b.err()).c_str());
}

info.GetReturnValue().Set(Nan::True());
}

NAN_METHOD(KafkaConsumer::NodeUnassign) {
Nan::HandleScope scope;

Expand All @@ -777,6 +891,63 @@ NAN_METHOD(KafkaConsumer::NodeUnassign) {
info.GetReturnValue().Set(Nan::True());
}

NAN_METHOD(KafkaConsumer::NodeIncrementalUnassign) {
Nan::HandleScope scope;

if (info.Length() < 1 || !info[0]->IsArray()) {
// Just throw an exception
return Nan::ThrowError("Need to specify an array of partitions");
}

v8::Local<v8::Array> partitions = info[0].As<v8::Array>();
std::vector<RdKafka::TopicPartition*> topic_partitions;

for (unsigned int i = 0; i < partitions->Length(); ++i) {
v8::Local<v8::Value> partition_obj_value;
if (!(
Nan::Get(partitions, i).ToLocal(&partition_obj_value) &&
partition_obj_value->IsObject())) {
Nan::ThrowError("Must pass topic-partition objects");
}

v8::Local<v8::Object> partition_obj = partition_obj_value.As<v8::Object>();

// Got the object
int64_t partition = GetParameter<int64_t>(partition_obj, "partition", -1);
std::string topic = GetParameter<std::string>(partition_obj, "topic", "");

if (!topic.empty()) {
RdKafka::TopicPartition* part;

if (partition < 0) {
part = Connection::GetPartition(topic);
} else {
part = Connection::GetPartition(topic, partition);
}

// Set the default value to offset invalid. If provided, we will not set
// the offset.
int64_t offset = GetParameter<int64_t>(
partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID);
if (offset != RdKafka::Topic::OFFSET_INVALID) {
part->set_offset(offset);
}

topic_partitions.push_back(part);
}
}

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
// Hand over the partitions to the consumer.
Baton b = consumer->IncrementalUnassign(topic_partitions);

if (b.err() != RdKafka::ERR_NO_ERROR) {
Nan::ThrowError(RdKafka::err2str(b.err()).c_str());
}

info.GetReturnValue().Set(Nan::True());
}

NAN_METHOD(KafkaConsumer::NodeUnsubscribe) {
Nan::HandleScope scope;

Expand Down
4 changes: 4 additions & 0 deletions src/kafka-consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ class KafkaConsumer : public Connection {
int AssignedPartitionCount();

Baton Assign(std::vector<RdKafka::TopicPartition*>);
Baton IncrementalAssign(std::vector<RdKafka::TopicPartition*>);
Baton Unassign();
Baton IncrementalUnassign(std::vector<RdKafka::TopicPartition*>);

Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms);

Expand Down Expand Up @@ -102,7 +104,9 @@ class KafkaConsumer : public Connection {
static NAN_METHOD(NodeSubscribe);
static NAN_METHOD(NodeDisconnect);
static NAN_METHOD(NodeAssign);
static NAN_METHOD(NodeIncrementalAssign);
static NAN_METHOD(NodeUnassign);
static NAN_METHOD(NodeIncrementalUnassign);
static NAN_METHOD(NodeAssignments);
static NAN_METHOD(NodeUnsubscribe);
static NAN_METHOD(NodeCommit);
Expand Down

0 comments on commit ce559e0

Please sign in to comment.