Skip to content

Commit

Permalink
Fix describe configs for multiple brokers (#1772) (#1280)
Browse files Browse the repository at this point in the history
This fixes describe configs when requesting broker configs for clusters with more than 1 broker.
Instead of sending all requests to all brokers, requests for broker configs are sent to the specific broker. Topic config requests can go to any broker (follows the logic of the Java SDK).

Also reworked a bit to fit better into api support functionality.

Fixes #1172.
  • Loading branch information
jlandersen authored and hyperlink committed Jun 13, 2019
1 parent 707348d commit 51b6292
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 32 deletions.
64 changes: 39 additions & 25 deletions lib/kafkaClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -1379,46 +1379,60 @@ KafkaClient.prototype.describeConfigs = function (payload, callback) {
return callback(new Error('Client is not ready (describeConfigs)'));
}
let err;

// Broker resource requests must go to the specific node
// other requests can go to any node
const brokerResourceRequests = [];
const nonBrokerResourceRequests = [];

_.forEach(payload.resources, function (resource) {
if (resourceTypeMap[resource.resourceType] === undefined) {
err = new Error(`Unexpected resource type ${resource.resourceType} for resource ${resource.resourceName}`);
return false;
} else {
resource.resourceType = resourceTypeMap[resource.resourceType];
}

if (resource.resourceType === resourceTypeMap['broker']) {
brokerResourceRequests.push(resource);
} else {
nonBrokerResourceRequests.push(resource);
}
});

if (err) {
return callback(err);
}
const brokers = this.brokerMetadata;
async.mapValuesLimit(
brokers,
this.options.maxAsyncRequests,
(brokerMetadata, brokerId, cb) => {
const broker = this.brokerForLeader(brokerId);
if (!broker || !broker.isConnected()) {
return cb(new errors.BrokerNotAvailableError('Broker not available (describeConfigs)'));
}

const correlationId = this.nextId();

let apiVersion = 0;
if (broker.apiSupport && broker.apiSupport.describeConfigs) {
apiVersion = broker.apiSupport.describeConfigs.max;
async.parallelLimit([
(cb) => {
if (nonBrokerResourceRequests.length > 0) {
this.sendRequestToAnyBroker('describeConfigs', [{ resources: nonBrokerResourceRequests, includeSynonyms: payload.includeSynonyms }], cb);
} else {
cb(null, []);
}
apiVersion = Math.min(apiVersion, 2);
const request = protocol.encodeDescribeConfigsRequest(this.clientId, correlationId, payload, apiVersion);
this.sendWhenReady(broker, correlationId, request, protocol.decodeDescribeConfigsResponse(apiVersion), cb);
},
(err, results) => {
if (err) {
callback(err);
return;
}
results = _.values(results);
callback(null, _.merge.apply({}, results));
...brokerResourceRequests.map(r => {
return (cb) => {
this.sendRequestToBroker(r.resourceName, 'describeConfigs', [{ resources: [r], includeSynonyms: payload.includeSynonyms }], cb);
};
})
], this.options.maxAsyncRequests, (err, result) => {
if (err) {
return callback(err);
}
);

callback(null, _.flatten(result));
});
};

/**
* Sends a request to any broker in the cluster
*/
KafkaClient.prototype.sendRequestToAnyBroker = function (requestType, args, callback) {
// For now just select the first broker
const brokerId = Object.keys(this.brokerMetadata)[0];
this.sendRequestToBroker(brokerId, requestType, args, callback);
};

module.exports = KafkaClient;
32 changes: 27 additions & 5 deletions lib/protocol/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -1668,7 +1668,19 @@ function decodeVersionsResponse (resp) {
return error || versions;
}

function encodeDescribeConfigsRequest (clientId, correlationId, payload, apiVersion) {
function encodeDescribeConfigsRequest (clientId, correlationId, payload) {
return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 0);
}

function encodeDescribeConfigsRequestV1 (clientId, correlationId, payload) {
return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 1);
}

function encodeDescribeConfigsRequestV2 (clientId, correlationId, payload) {
return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 2);
}

function _encodeDescribeConfigsRequest (clientId, correlationId, payload, apiVersion) {
let request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.describeConfigs, apiVersion);
const resources = payload.resources;
request.Int32BE(resources.length);
Expand All @@ -1692,10 +1704,16 @@ function encodeDescribeConfigsRequest (clientId, correlationId, payload, apiVers
return encodeRequestWithLength(request.make());
}

function decodeDescribeConfigsResponse (apiVersion) {
return function (resp) {
return _decodeDescribeConfigsResponse(resp, apiVersion);
};
function decodeDescribeConfigsResponse (resp) {
return _decodeDescribeConfigsResponse(resp, 0);
}

function decodeDescribeConfigsResponseV1 (resp) {
return _decodeDescribeConfigsResponse(resp, 1);
}

function decodeDescribeConfigsResponseV2 (resp) {
return _decodeDescribeConfigsResponse(resp, 2);
}

function _decodeDescribeConfigsResponse (resp, apiVersion) {
Expand Down Expand Up @@ -1858,4 +1876,8 @@ exports.decodeListGroups = decodeListGroups;
exports.encodeVersionsRequest = encodeVersionsRequest;
exports.decodeVersionsResponse = decodeVersionsResponse;
exports.encodeDescribeConfigsRequest = encodeDescribeConfigsRequest;
exports.encodeDescribeConfigsRequestV1 = encodeDescribeConfigsRequestV1;
exports.encodeDescribeConfigsRequestV2 = encodeDescribeConfigsRequestV2;
exports.decodeDescribeConfigsResponse = decodeDescribeConfigsResponse;
exports.decodeDescribeConfigsResponseV1 = decodeDescribeConfigsResponseV1;
exports.decodeDescribeConfigsResponseV2 = decodeDescribeConfigsResponseV2;
6 changes: 5 additions & 1 deletion lib/protocol/protocolVersions.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ const API_MAP = {
[p.encodeCreateTopicRequestV1, p.decodeCreateTopicResponseV1]
],
deleteTopics: null,
describeConfigs: [[p.encodeDescribeConfigsRequest, p.decodeDescribeConfigsResponse]],
describeConfigs: [
[p.encodeDescribeConfigsRequest, p.decodeDescribeConfigsResponse],
[p.encodeDescribeConfigsRequestV1, p.decodeDescribeConfigsResponseV1],
[p.encodeDescribeConfigsRequestV2, p.decodeDescribeConfigsResponseV2]
],
saslAuthenticate: [[p.encodeSaslAuthenticationRequest, p.decodeSaslAuthenticationResponse]]
};

Expand Down
2 changes: 1 addition & 1 deletion test/test.admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ describe('Admin', function () {
};
admin.describeConfigs(payload, function (error, res) {
should.not.exist(res);
error.should.have.property('message').and.containEql('Unexpected broker id');
error.should.have.property('message').and.containEql('No broker with id ' + brokerId);
done();
});
});
Expand Down

0 comments on commit 51b6292

Please sign in to comment.