Skip to content

Commit

Permalink
Add heartbeats, timeouts and Kafka commits to alerta bridge
Browse files Browse the repository at this point in the history
Alerta bridge shows problems with massive alert processing. After some time
Alerta.io is blocking the socket and this leads to Kafka timeouts.
Therefore, an timeout is defined and when the error appears then all successful
alerta.io request before are committed and an exception is thrown. Then Kafka tries again
the current offset but does not repeat the committed offsets before.
The heartbeats are needed to enforce triggering the Alerta filter window in case
there is no other kafka updates.

Signed-off-by: marcel <[email protected]>
  • Loading branch information
wagmarcel authored and abhijith-hr committed Nov 25, 2024
1 parent a9199f5 commit 84985f1
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 18 deletions.
93 changes: 82 additions & 11 deletions KafkaBridge/alerta/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,66 @@ const kafka = new Kafka({
});

const consumer = kafka.consumer({ groupId: GROUPID, allowAutoTopicCreation: false });
const producer = kafka.producer();
console.log(JSON.stringify(config));

const commitAppliedMessages = function (consumer, commitArray) {
logger.debug(`Commit ${commitArray.length} messages `);
consumer.commitOffsets(commitArray);
return [];
};

const pauseresume = function (consumer, topic) {
consumer.pause([{ topic }]);
setTimeout(() => consumer.resume([{ topic }]), config.alerta.kafkaResumeTimeout);
logger.debug('Set timeout. Waiting to resume');
};

const startListener = async function () {
await consumer.connect();
await consumer.subscribe({ topic: config.alerta.topic, fromBeginning: false });

let committedOffsets = [];
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
autoCommit: false,
eachMessage: async ({ topic, partition, message, pause }) => {
let body = null;
try {
const body = JSON.parse(message.value);
if (body !== null) {
const result = await alerta.sendAlert(body).catch((err) => { logger.error('Could not send Alert: ' + err); console.error(err); });

if (result.statusCode !== 201) {
logger.error(`submission to Alerta failed with statuscode ${result.statusCode} and ${JSON.stringify(result.body)}`);
}
}
body = JSON.parse(message.value);
} catch (e) {
logger.error('Could not process message: ' + e);
logger.error(`Could not deserialize message ${message.value}`);
}
if (body !== null && body.resource !== null && body.resource !== '' && body.event !== null && body.event !== '') {
const result = await alerta.sendAlert(body, config.alerta.requestTimeout)
.catch((err) => {
logger.error('Could not send Alert: ' + err);
committedOffsets = commitAppliedMessages(consumer, committedOffsets);
pauseresume(consumer, topic);
throw new Error('Could not send Alert ' + err);
});
logger.debug(`Alerta Result ${result.statusCode}}`);
if (result.statusCode !== 201) {
logger.error(`submission to Alerta failed with statuscode ${result.statusCode} and ${JSON.stringify(result.body)}`);
committedOffsets = commitAppliedMessages(consumer, committedOffsets);
pauseresume(consumer, topic);
throw new Error('Retry submission of Alert.');
} else {
committedOffsets.push({ topic, partition, offset: message.offset });
}
} else {
logger.debug('Ignoring ' + JSON.stringify(body));
committedOffsets.push({ topic, partition, offset: message.offset });
}
if (committedOffsets.length > config.alerta.kafkaCommitThreshold) {
committedOffsets = commitAppliedMessages(consumer, committedOffsets);
}
}
}).catch(e => console.error(`[example/consumer] ${e.message}`, e));
}).catch(
e => {
console.error(`[example/consumer] ${e.message}`, e);
throw e;
}
);

const errorTypes = ['unhandledRejection', 'uncaughtException'];
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'];
Expand Down Expand Up @@ -86,5 +124,38 @@ const startListener = async function () {
}
};

const startPeriodicProducer = async function () {
if (config.alerta.heartbeatInterval === 0 || config.alerta.heartbeatInterval === undefined) {
return;
}
logger.info(`Starting heartbeat to topic ${config.alerta.heartbeatTopic} with interval ${config.alerta.heartbeatInterval} and delay ${config.alerta.heartbeatDelay}`);
await producer.connect();
const heartbeat = {
key: '{"resource":"heartbeat-owner","event":"heartbeat"}',
value: null,
topic: config.alerta.heartbeatTopic
};

setInterval(async () => {
try {
const timestamp = Date.now() - config.alerta.heartbeatDelay; // Current Kafka timestamp - 5 seconds
await producer.send({
topic: heartbeat.topic,
messages: [
{
key: heartbeat.key,
value: heartbeat.value,
timestamp: timestamp.toString()
}
]
});
logger.info('Alert heartbeat sent successfully');
} catch (err) {
logger.error('Could not send heartbeat: ' + err);
}
}, config.alerta.heartbeatInterval); // Send every second
};
logger.info('Now staring Kafka listener');
startListener();
logger.info('Now starting Kafka periodic producer');
startPeriodicProducer();
8 changes: 7 additions & 1 deletion KafkaBridge/config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@
"alerta": {
"topic": "iff.alerts",
"hostname": "alerta.iff",
"heartbeatTopic": "iff.alerts.bulk",
"port": "8080",
"protocol": "http:",
"accessKeyVariable": "ALERTA_API_KEY"
"accessKeyVariable": "ALERTA_API_KEY",
"kafkaResumeTimeout": 2000,
"heartbeatInterval": 1000,
"heartbeatDelay": 10000,
"requestTimeout": 2000,
"kafkaCommitThreshold": 200
},
"logger": {
"loglevel": "debug"
Expand Down
5 changes: 4 additions & 1 deletion KafkaBridge/lib/alerta.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module.exports = function Alerta (conf) {

let headers;

this.sendAlert = async function (body) {
this.sendAlert = async function (body, timeout) {
headers = {};
headers.Authorization = 'Key ' + token;

Expand All @@ -41,6 +41,9 @@ module.exports = function Alerta (conf) {
method: 'POST',
headers: headers
};
if (timeout !== undefined) {
options.timeout = timeout;
}
return await rest.postBody({ options, body, disableChunks: true });
};
};
1 change: 1 addition & 0 deletions KafkaBridge/lib/rest.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ module.exports = function Rest (config) {
req.on('error', error => {
reject(error);
});
req.on('timeout', () => req.destroy());
logger.debug('Now sending: ' + body);
req.write(body);
req.end();
Expand Down
6 changes: 3 additions & 3 deletions KafkaBridge/test/testLibRest.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ describe('Test postBody', function () {
const evmap = {};
const req = {
on: function (ev, cb) {
ev.should.equal('error');
ev.should.oneOf(['error', 'timeout']);
},
write: function (bo) {
bo.should.equal(JSON.stringify(body));
Expand Down Expand Up @@ -111,7 +111,7 @@ describe('Test postBody', function () {
const evmap = {};
const req = {
on: function (ev, cb) {
ev.should.equal('error');
ev.should.oneOf(['error', 'timeout']);
},
write: function (bo) {
bo.should.equal(body);
Expand Down Expand Up @@ -177,7 +177,7 @@ describe('Test postBody', function () {
const evmap = {};
const req = {
on: function (ev, cb) {
ev.should.equal('error');
ev.should.oneOf(['error', 'timeout']);
},
write: function (bo) {
bo.should.equal(JSON.stringify(body));
Expand Down
8 changes: 7 additions & 1 deletion helm/charts/kafka-bridges/templates/bridge-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ data:
"alerta": {
"topic": {{ .Values.kafkaBridge.alerta.listenTopic | quote }},
"hostname": {{ .Values.alerta.internalService | quote }},
"heartbeatTopic": {{ .Values.kafkaBridge.alerta.bulkTopic | quote }},
"port": {{ .Values.alerta.internalPort | quote }},
"protocol": {{ .Values.alerta.internalProtocol | quote }},
"accessKeyVariable": "ALERTA_API_KEY"
"accessKeyVariable": "ALERTA_API_KEY",
"kafkaResumeTimeout": {{ .Values.alerta.kafkaResumeTimeout }},
"heartbeatInterval": {{ .Values.alerta.heartbeatInterval }},
"heartbeatDelay": {{ .Values.alerta.heartbeatDelay }},
"requestTimeout": {{ .Values.alerta.requestTimeout }},
"kafkaCommitThreshold": {{ .Values.alerta.kafkaCommitThreshold }}
},
"logger": {
"loglevel": "info"
Expand Down
2 changes: 1 addition & 1 deletion helm/charts/sql-core/templates/core-statementsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ spec:
)
|| '}' as indexagg
FROM TABLE( TUMBLE(TABLE attributes, DESCRIPTOR(ts), INTERVAL {{.Values.flink.ngsildUpdateWindow|squote}} second)) as A
WHERE A.entityId IS NOT NULL
WHERE A.`type` IS NOT NULL
GROUP BY A.entityId, A.index, A.`https://uri.etsi.org/ngsi-ld/datasetId`, A.name, window_start, window_end) as B
GROUP BY window_start, window_end, entityId, name
) GROUP BY window_start, window_end, id;
Expand Down
5 changes: 5 additions & 0 deletions helm/values.yaml.gotmpl
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ alerta:
# apiKey: {{ .StateValues.alertaApiKey }}
adminPassword: {{ .StateValues.alertaAdminPassword }}
adminKey: {{ .StateValues.alertaAdminKey }}
kafkaResumeTimeout: 2000
heartbeatInterval: 1000
heartbeatDelay: 2000
requestTimeout: 2000
kafkaCommitThreshold: 200

kafka:
name: my-cluster
Expand Down

0 comments on commit 84985f1

Please sign in to comment.