Skip to content

Commit

Permalink
added failover client
Browse files Browse the repository at this point in the history
  • Loading branch information
TzachiSh committed Jun 30, 2024
1 parent 9fc2e4d commit 3e996a9
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 57 deletions.
28 changes: 21 additions & 7 deletions example/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const {QrynClient,Metric, Stream} = require('../src');

async function main() {
try {
const client = new QrynClient({
baseUrl: process.env['QYRN_URL'],
auth: {
Expand All @@ -10,6 +9,14 @@ async function main() {
},
timeout: 5000
})
const client2 = new QrynClient({
baseUrl: process.env['QYRN_URL_BACKUP'],
auth: {
username: process.env['QYRN_LOGIN_BACKUP'],
password: process.env['QRYN_PASSWORD_BACKUP']
},
timeout: 5000
})
// Create and push Loki streams
const stream1 = client.createStream({ job: 'job1', env: 'prod' });
stream1.addEntry(Date.now(), 'Log message 1');
Expand All @@ -18,7 +25,10 @@ async function main() {
const stream2 = new Stream({ job: 'job2', env: 'dev' });
stream2.addEntry(Date.now(), 'Log message 3');

const lokiResponse = await client.loki.push([stream1, stream2]);
const lokiResponse = await client.loki.push([stream1, stream2]).catch(error => {
console.log(error)
return client2.loki.push([stream1, stream2]);
});
console.log('Loki push successful:', lokiResponse);

// Create and push Prometheus metrics
Expand All @@ -33,14 +43,18 @@ async function main() {
const cpuUsed = new Metric('cpu_test_1234', { server: 'web-1' });

cpuUsed.addSample(1024 * 1024 * 100);
client.prom.push([memoryUsed, cpuUsed]).catch(error => {
console.error(error);
return client2.prom.push([memoryUsed, cpuUsed]);
});
cpuUsed.addSample(105, Date.now() + 60000);
memoryUsed.addSample(1024 * 1024 * 100);

const promResponse = await client.prom.push([memoryUsed, cpuUsed]);
const promResponse = await client.prom.push([memoryUsed, cpuUsed]).catch(error => {
console.error(error);
return client2.prom.push([memoryUsed, cpuUsed]);
});
console.log('Prometheus push successful:', promResponse);

} catch (error) {
console.error('An error occurred:', error);
}
}

main();
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"protobufjs": "^7.3.2",
"snappy": "^7.2.2"
},
"devDependencies": {},
"repository": {
"type": "git",
"url": "https://github.com/username/qryn-client.git"
Expand All @@ -31,4 +30,4 @@
"engines": {
"node": ">=18.0.0"
}
}
}
24 changes: 11 additions & 13 deletions src/clients/loki.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,21 @@ class Loki {
}

const payload = {
streams: streams.map(s => ({
labels: s.labels,
entries: s.entries
}))
streams: streams.map(s => s.collect())
};

try {
return await this.service.request('/loki/api/v1/push', {
return this.service.request('/loki/api/v1/push', {
method: 'POST',
body: JSON.stringify(payload)
}).then( res => {
streams.map(s => s.reset());
return res
}).catch(error => {
streams.map(s => s.undo());
if (error instanceof QrynError) {
throw error;
}
throw new QrynError(`Loki push failed: ${error.message}`, error.statusCode);
});
} catch (error) {
if (error instanceof QrynError) {
throw error;
}
throw new QrynError(`Loki push failed: ${error.message}`, error.statusCode);
}
}
}

Expand Down
39 changes: 19 additions & 20 deletions src/clients/prometheus.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,30 @@ class Prometheus {
throw new QrynError('Metrics must be an array of Metric instances');
}


const timeseries = metrics.map(metric => metric.toTimeSeries());
const timeseries = metrics.map(metric => metric.collect());
const writeRequest = { timeseries };

const buffer = this.protobufHandler.encodeWriteRequest(writeRequest);
const compressedBuffer = await this.protobufHandler.compressBuffer(buffer);



try {
return await this.service.request('/api/v1/prom/remote/write', {
method: 'POST',
headers: {
'Content-Type': 'application/x-protobuf',
'Content-Encoding': 'snappy',
'X-Prometheus-Remote-Write-Version': '0.1.0'
},
body: compressedBuffer
});
} catch (error) {
if (error instanceof QrynError) {
throw error;
}
throw new QrynError(`Prometheus Remote Write push failed: ${error.message}`, error.statusCode);
}
return this.service.request('/api/v1/prom/remote/write', {
method: 'POST',
headers: {
'Content-Type': 'application/x-protobuf',
'Content-Encoding': 'snappy',
'X-Prometheus-Remote-Write-Version': '0.1.0'
},
body: compressedBuffer
}).then(res => {
metrics.forEach(metric => metric.reset());
return res;
}).catch(error => {
metrics.forEach(metric => metric.undo());
if (error instanceof QrynError) {
throw error;
}
throw new QrynError(`Prometheus Remote Write push failed: ${error.message}`, error.statusCode);
})
}
}

Expand Down
41 changes: 29 additions & 12 deletions src/models/metric.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,46 @@
class Metric {
#cachedLabels = {}
constructor(name, labels = {}) {
this.name = name;
this.labels = labels;
this.samples = [];
this.collectedSamples = [];
this.#cachedLabels = this.generateLabels();
}

generateLabels() {
return [
{ name: '__name__', value: this.name },
...Object.entries(this.labels).map(([name, value]) => ({ name, value }))
];
}

addSample(value, timestamp = Date.now()) {
if (typeof value !== 'number') {
throw new Error('Value must be a number');
}
if (typeof timestamp !== 'number') {
throw new Error('Timestamp must be a number');
if (typeof value !== 'number' || typeof timestamp !== 'number') {
throw new Error('Value and timestamp must be numbers');
}

this.samples.push({ value, timestamp });
}

toTimeSeries() {
return {
labels: [
{ name: '__name__', value: this.name },
...Object.entries(this.labels).map(([name, value]) => ({ name, value }))
],
collect() {
const collectedData = {
labels: this.#cachedLabels,
samples: this.samples
};
this.collectedSamples = this.samples;
this.samples = [];
return collectedData;
}

undo() {
this.samples = this.collectedSamples.concat(this.samples);
this.collectedSamples = [];
}

reset() {
this.samples = [];
this.collectedSamples = [];
}
}

module.exports = Metric;
26 changes: 23 additions & 3 deletions src/models/stream.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@

class Stream {
constructor(labels) {
#collectEntries = [];
constructor(labels, options = {}) {
if (typeof labels !== 'object' || labels === null) {
throw new Error('Labels must be a non-null object');
}
this.labels = this.formatLabels(labels);
this.entries = [];

}

formatLabels(labels) {
Expand All @@ -14,10 +17,27 @@ class Stream {
}

addEntry(timestamp, line) {
timestamp = new Date(timestamp).toISOString()
timestamp = new Date(timestamp).toISOString();
this.entries.push({ ts: timestamp, line: line });
}

collect() {
const collectedData = this.toJSON();
this.#collectEntries = this.entries;
this.entries = [];
return collectedData;
}

undo() {
this.entries = this.#collectEntries.concat(this.entries);
this.#collectEntries = [];
}

reset() {
this.entries = [];
this.#collectEntries = [];
}

toJSON() {
return {
labels: this.labels,
Expand All @@ -26,4 +46,4 @@ class Stream {
}
}

module.exports = Stream
module.exports = Stream;

0 comments on commit 3e996a9

Please sign in to comment.