Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add reconnect to kafka code after kafka broker down #1274

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion lib/kafkaClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -887,9 +887,20 @@ KafkaClient.prototype.loadMetadata = function (callback) {
KafkaClient.prototype.loadMetadataForTopics = function (topics, callback) {
const broker = this.brokerForLeader();

if (!broker || !broker.isConnected()) {
if (!broker) {
return callback(new errors.BrokerNotAvailableError('Broker not available (loadMetadataForTopics)'));
}
// when kafka server restart,program can't recover as expected ,add retry code
if (!broker.isConnected()) {
let reconnectTimer = setInterval(() => {
try {
this.reconnectBroker(broker.socket);
clearInterval(reconnectTimer);
} catch (e) {
logger.debug('try reconnect to broker failed,will retry after 3000ms');
}
}, 3000)
}

const ensureBrokerReady = (broker, cb) => {
if (!broker.isReady()) {
Expand All @@ -907,6 +918,15 @@ KafkaClient.prototype.loadMetadataForTopics = function (topics, callback) {
},
cb => {
const broker = this.brokerForLeader();
// sometime program would encounter problem such as requestTimeOut after 30000ms, but change requestTimeOut to 300000 this also happend,it is because socket isn't in working state
try {
if (broker && !broker.socket.connecting) {
broker.socket.resume();
}
} catch (e) {
logger.debug('resume socket error');
}

const correlationId = this.nextId();
const supportedCoders = getSupportedForRequestType(broker, 'metadata');
const request = supportedCoders.encoder(this.clientId, correlationId, topics);
Expand Down