diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 4da3c704..14ddb6ec 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -887,9 +887,20 @@ KafkaClient.prototype.loadMetadata = function (callback) { KafkaClient.prototype.loadMetadataForTopics = function (topics, callback) { const broker = this.brokerForLeader(); - if (!broker || !broker.isConnected()) { + if (!broker) { return callback(new errors.BrokerNotAvailableError('Broker not available (loadMetadataForTopics)')); } + // when kafka server restart,program can't recover as expected ,add retry code + if (!broker.isConnected()) { + let reconnectTimer = setInterval(() => { + try { + this.reconnectBroker(broker.socket); + clearInterval(reconnectTimer); + } catch (e) { + logger.debug('try reconnect to broker failed,will retry after 3000ms'); + } + }, 3000) + } const ensureBrokerReady = (broker, cb) => { if (!broker.isReady()) { @@ -907,6 +918,15 @@ KafkaClient.prototype.loadMetadataForTopics = function (topics, callback) { }, cb => { const broker = this.brokerForLeader(); + // sometime program would encounter problem such as requestTimeOut after 30000ms, but change requestTimeOut to 300000 this also happend,it is because socket isn't in working state + try { + if (broker && !broker.socket.connecting) { + broker.socket.resume(); + } + } catch (e) { + logger.debug('resume socket error'); + } + const correlationId = this.nextId(); const supportedCoders = getSupportedForRequestType(broker, 'metadata'); const request = supportedCoders.encoder(this.clientId, correlationId, topics);