Skip to content

Commit

Permalink
Implement assignment lost
Browse files Browse the repository at this point in the history
  • Loading branch information
LAN Xingcan committed May 12, 2023
1 parent ee7bf64 commit 85193d8
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 2 deletions.
2 changes: 1 addition & 1 deletion deps/librdkafka
11 changes: 11 additions & 0 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,17 @@ KafkaConsumer.prototype.position = function(toppars) {
return this._errorWrap(this._client.position(toppars), true);
};

/**
* Check whether the consumer considers the current assignment to have been
* lost invountarily.
*
* @throws Throws from native land if
* @returns {boolean} Whether the assignment have been lost or not
*/
KafkaConsumer.prototype.assimentLost = function() {
return this._client.assignmentLost();
}

/**
* Unsubscribe from all currently subscribed topics
*
Expand Down
29 changes: 29 additions & 0 deletions src/kafka-consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,20 @@ Baton KafkaConsumer::Position(std::vector<RdKafka::TopicPartition*> &toppars) {
return Baton(err);
}


Baton KafkaConsumer::AssigmentLost() {
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE, "KafkaConsumer is not connected");
}

RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);

// XXX: Returning a bool by casting it to a pointer,
return Baton(reinterpret_cast<void*>(
static_cast<uintptr_t>(consumer->assignment_lost() ? true : false)));
}

Baton KafkaConsumer::Subscription() {
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE, "Consumer is not connected");
Expand Down Expand Up @@ -585,6 +599,7 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {

Nan::SetPrototypeMethod(tpl, "committed", NodeCommitted);
Nan::SetPrototypeMethod(tpl, "position", NodePosition);
Nan::SetPrototypeMethod(tpl, "assignemntLost", NodeAssignmentLost);
Nan::SetPrototypeMethod(tpl, "assign", NodeAssign);
Nan::SetPrototypeMethod(tpl, "incrementalAssign", NodeIncrementalAssign);
Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign);
Expand Down Expand Up @@ -745,6 +760,20 @@ NAN_METHOD(KafkaConsumer::NodePosition) {
RdKafka::TopicPartition::destroy(toppars);
}

NAN_METHOD(KafkaConsumer::NodeAssignmentLost) {

Nan::HandleScope scope;

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());

Baton b = consumer->AssigmentLost();
if (b.err() != RdKafka::ERR_NO_ERROR) {
Nan::ThrowError(RdKafka::err2str(b.err()).c_str());
}
bool result = static_cast<bool>(reinterpret_cast<uintptr_t>(b.data<void*>()));
info.GetReturnValue().Set(Nan::New<v8::Boolean>(result));
}

NAN_METHOD(KafkaConsumer::NodeAssignments) {
Nan::HandleScope scope;

Expand Down
4 changes: 3 additions & 1 deletion src/kafka-consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class KafkaConsumer : public Connection {

Baton Committed(std::vector<RdKafka::TopicPartition*> &, int timeout_ms);
Baton Position(std::vector<RdKafka::TopicPartition*> &);
Baton AssigmentLost();

Baton RefreshAssignments();

Expand Down Expand Up @@ -101,7 +102,7 @@ class KafkaConsumer : public Connection {
bool m_is_subscribed = false;

void* m_consume_loop;

// Node methods
static NAN_METHOD(NodeConnect);
static NAN_METHOD(NodeSubscribe);
Expand All @@ -117,6 +118,7 @@ class KafkaConsumer : public Connection {
static NAN_METHOD(NodeOffsetsStore);
static NAN_METHOD(NodeCommitted);
static NAN_METHOD(NodePosition);
static NAN_METHOD(NodeAssignmentLost);
static NAN_METHOD(NodeSubscription);
static NAN_METHOD(NodeSeek);
static NAN_METHOD(NodeGetWatermarkOffsets);
Expand Down

0 comments on commit 85193d8

Please sign in to comment.