From 3e996a9826fc6b283a6a316c1f0e7b7dfebc6172 Mon Sep 17 00:00:00 2001 From: TzachiSh Date: Sun, 30 Jun 2024 12:45:19 +0300 Subject: [PATCH] added failover client --- example/index.js | 28 +++++++++++++++++++------- package.json | 3 +-- src/clients/loki.js | 24 +++++++++++------------ src/clients/prometheus.js | 39 ++++++++++++++++++------------------- src/models/metric.js | 41 +++++++++++++++++++++++++++------------ src/models/stream.js | 26 ++++++++++++++++++++++--- 6 files changed, 104 insertions(+), 57 deletions(-) diff --git a/example/index.js b/example/index.js index 2078f91..ee611c4 100644 --- a/example/index.js +++ b/example/index.js @@ -1,7 +1,6 @@ const {QrynClient,Metric, Stream} = require('../src'); async function main() { - try { const client = new QrynClient({ baseUrl: process.env['QYRN_URL'], auth: { @@ -10,6 +9,14 @@ async function main() { }, timeout: 5000 }) + const client2 = new QrynClient({ + baseUrl: process.env['QYRN_URL_BACKUP'], + auth: { + username: process.env['QYRN_LOGIN_BACKUP'], + password: process.env['QRYN_PASSWORD_BACKUP'] + }, + timeout: 5000 + }) // Create and push Loki streams const stream1 = client.createStream({ job: 'job1', env: 'prod' }); stream1.addEntry(Date.now(), 'Log message 1'); @@ -18,7 +25,10 @@ async function main() { const stream2 = new Stream({ job: 'job2', env: 'dev' }); stream2.addEntry(Date.now(), 'Log message 3'); - const lokiResponse = await client.loki.push([stream1, stream2]); + const lokiResponse = await client.loki.push([stream1, stream2]).catch(error => { + console.log(error) + return client2.loki.push([stream1, stream2]); + }); console.log('Loki push successful:', lokiResponse); // Create and push Prometheus metrics @@ -33,14 +43,18 @@ async function main() { const cpuUsed = new Metric('cpu_test_1234', { server: 'web-1' }); cpuUsed.addSample(1024 * 1024 * 100); + client.prom.push([memoryUsed, cpuUsed]).catch(error => { + console.error(error); + return client2.prom.push([memoryUsed, cpuUsed]); + }); cpuUsed.addSample(105, Date.now() + 60000); + memoryUsed.addSample(1024 * 1024 * 100); - const promResponse = await client.prom.push([memoryUsed, cpuUsed]); + const promResponse = await client.prom.push([memoryUsed, cpuUsed]).catch(error => { + console.error(error); + return client2.prom.push([memoryUsed, cpuUsed]); + }); console.log('Prometheus push successful:', promResponse); - - } catch (error) { - console.error('An error occurred:', error); - } } main(); \ No newline at end of file diff --git a/package.json b/package.json index ce01ec9..3e32e12 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,6 @@ "protobufjs": "^7.3.2", "snappy": "^7.2.2" }, - "devDependencies": {}, "repository": { "type": "git", "url": "https://github.com/username/qryn-client.git" @@ -31,4 +30,4 @@ "engines": { "node": ">=18.0.0" } -} \ No newline at end of file +} diff --git a/src/clients/loki.js b/src/clients/loki.js index 21b262a..a002f98 100644 --- a/src/clients/loki.js +++ b/src/clients/loki.js @@ -23,23 +23,21 @@ class Loki { } const payload = { - streams: streams.map(s => ({ - labels: s.labels, - entries: s.entries - })) + streams: streams.map(s => s.collect()) }; - - try { - return await this.service.request('/loki/api/v1/push', { + return this.service.request('/loki/api/v1/push', { method: 'POST', body: JSON.stringify(payload) + }).then( res => { + streams.map(s => s.reset()); + return res + }).catch(error => { + streams.map(s => s.undo()); + if (error instanceof QrynError) { + throw error; + } + throw new QrynError(`Loki push failed: ${error.message}`, error.statusCode); }); - } catch (error) { - if (error instanceof QrynError) { - throw error; - } - throw new QrynError(`Loki push failed: ${error.message}`, error.statusCode); - } } } diff --git a/src/clients/prometheus.js b/src/clients/prometheus.js index 5937105..23b342c 100644 --- a/src/clients/prometheus.js +++ b/src/clients/prometheus.js @@ -19,31 +19,30 @@ class Prometheus { throw new QrynError('Metrics must be an array of Metric instances'); } - - const timeseries = metrics.map(metric => metric.toTimeSeries()); + const timeseries = metrics.map(metric => metric.collect()); const writeRequest = { timeseries }; const buffer = this.protobufHandler.encodeWriteRequest(writeRequest); const compressedBuffer = await this.protobufHandler.compressBuffer(buffer); - - - try { - return await this.service.request('/api/v1/prom/remote/write', { - method: 'POST', - headers: { - 'Content-Type': 'application/x-protobuf', - 'Content-Encoding': 'snappy', - 'X-Prometheus-Remote-Write-Version': '0.1.0' - }, - body: compressedBuffer - }); - } catch (error) { - if (error instanceof QrynError) { - throw error; - } - throw new QrynError(`Prometheus Remote Write push failed: ${error.message}`, error.statusCode); - } + return this.service.request('/api/v1/prom/remote/write', { + method: 'POST', + headers: { + 'Content-Type': 'application/x-protobuf', + 'Content-Encoding': 'snappy', + 'X-Prometheus-Remote-Write-Version': '0.1.0' + }, + body: compressedBuffer + }).then(res => { + metrics.forEach(metric => metric.reset()); + return res; + }).catch(error => { + metrics.forEach(metric => metric.undo()); + if (error instanceof QrynError) { + throw error; + } + throw new QrynError(`Prometheus Remote Write push failed: ${error.message}`, error.statusCode); + }) } } diff --git a/src/models/metric.js b/src/models/metric.js index a4faf96..959745c 100644 --- a/src/models/metric.js +++ b/src/models/metric.js @@ -1,29 +1,46 @@ class Metric { + #cachedLabels = {} constructor(name, labels = {}) { this.name = name; this.labels = labels; this.samples = []; + this.collectedSamples = []; + this.#cachedLabels = this.generateLabels(); + } + + generateLabels() { + return [ + { name: '__name__', value: this.name }, + ...Object.entries(this.labels).map(([name, value]) => ({ name, value })) + ]; } addSample(value, timestamp = Date.now()) { - if (typeof value !== 'number') { - throw new Error('Value must be a number'); - } - if (typeof timestamp !== 'number') { - throw new Error('Timestamp must be a number'); + if (typeof value !== 'number' || typeof timestamp !== 'number') { + throw new Error('Value and timestamp must be numbers'); } - this.samples.push({ value, timestamp }); } - toTimeSeries() { - return { - labels: [ - { name: '__name__', value: this.name }, - ...Object.entries(this.labels).map(([name, value]) => ({ name, value })) - ], + collect() { + const collectedData = { + labels: this.#cachedLabels, samples: this.samples }; + this.collectedSamples = this.samples; + this.samples = []; + return collectedData; + } + + undo() { + this.samples = this.collectedSamples.concat(this.samples); + this.collectedSamples = []; + } + + reset() { + this.samples = []; + this.collectedSamples = []; } } + module.exports = Metric; \ No newline at end of file diff --git a/src/models/stream.js b/src/models/stream.js index b86acef..3395cc7 100644 --- a/src/models/stream.js +++ b/src/models/stream.js @@ -1,10 +1,13 @@ + class Stream { - constructor(labels) { + #collectEntries = []; + constructor(labels, options = {}) { if (typeof labels !== 'object' || labels === null) { throw new Error('Labels must be a non-null object'); } this.labels = this.formatLabels(labels); this.entries = []; + } formatLabels(labels) { @@ -14,10 +17,27 @@ class Stream { } addEntry(timestamp, line) { - timestamp = new Date(timestamp).toISOString() + timestamp = new Date(timestamp).toISOString(); this.entries.push({ ts: timestamp, line: line }); } + collect() { + const collectedData = this.toJSON(); + this.#collectEntries = this.entries; + this.entries = []; + return collectedData; + } + + undo() { + this.entries = this.#collectEntries.concat(this.entries); + this.#collectEntries = []; + } + + reset() { + this.entries = []; + this.#collectEntries = []; + } + toJSON() { return { labels: this.labels, @@ -26,4 +46,4 @@ class Stream { } } -module.exports = Stream \ No newline at end of file +module.exports = Stream; \ No newline at end of file