diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..215205f --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +node_modules +package-lock.json +.vscode \ No newline at end of file diff --git a/example/index.js b/example/index.js new file mode 100644 index 0000000..2078f91 --- /dev/null +++ b/example/index.js @@ -0,0 +1,46 @@ +const {QrynClient,Metric, Stream} = require('../src'); + +async function main() { + try { + const client = new QrynClient({ + baseUrl: process.env['QYRN_URL'], + auth: { + username: process.env['QYRN_LOGIN'], + password: process.env['QRYN_PASSWORD'] + }, + timeout: 5000 + }) + // Create and push Loki streams + const stream1 = client.createStream({ job: 'job1', env: 'prod' }); + stream1.addEntry(Date.now(), 'Log message 1'); + stream1.addEntry(Date.now(), 'Log message 2'); + + const stream2 = new Stream({ job: 'job2', env: 'dev' }); + stream2.addEntry(Date.now(), 'Log message 3'); + + const lokiResponse = await client.loki.push([stream1, stream2]); + console.log('Loki push successful:', lokiResponse); + + // Create and push Prometheus metrics + + + const memoryUsed = client.createMetric({ name: 'memory_use_test_134', labels: { + foo:"bar" + }}); + memoryUsed.addSample(1024 * 1024 * 100); + memoryUsed.addSample(105, Date.now() + 60000); + + const cpuUsed = new Metric('cpu_test_1234', { server: 'web-1' }); + + cpuUsed.addSample(1024 * 1024 * 100); + cpuUsed.addSample(105, Date.now() + 60000); + + const promResponse = await client.prom.push([memoryUsed, cpuUsed]); + console.log('Prometheus push successful:', promResponse); + + } catch (error) { + console.error('An error occurred:', error); + } +} + +main(); \ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 0000000..ce01ec9 --- /dev/null +++ b/package.json @@ -0,0 +1,34 @@ +{ + "name": "qryn-client", + "version": "1.0.0", + "description": "A client library for interacting with qryn, a high-performance observability backend.", + "main": "src/index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "tzachiSh", + "license": "MIT", + "dependencies": { + "protobufjs": "^7.3.2", + "snappy": "^7.2.2" + }, + "devDependencies": {}, + "repository": { + "type": "git", + "url": "https://github.com/username/qryn-client.git" + }, + "keywords": [ + "qryn", + "client", + "observability", + "monitoring", + "logging" + ], + "bugs": { + "url": "https://github.com/username/qryn-client/issues" + }, + "homepage": "https://github.com/username/qryn-client#readme", + "engines": { + "node": ">=18.0.0" + } +} \ No newline at end of file diff --git a/src/clients/loki.js b/src/clients/loki.js new file mode 100644 index 0000000..21b262a --- /dev/null +++ b/src/clients/loki.js @@ -0,0 +1,46 @@ +const { QrynError } = require('../types'); +const { Stream } = require('../models'); +const Http = require('../services/http'); + +class Loki { + /** + * Create a new Loki instance. + * @param {Http} service - The HTTP service to use for requests. + */ + constructor(service) { + this.service = service; + } + + /** + * Push streams to Loki. + * @param {Stream[]} streams - An array of Stream instances to push. + * @returns {Promise} The response from the Loki API. + * @throws {QrynError} If the push fails or if the input is invalid. + */ + async push(streams) { + if (!Array.isArray(streams) || !streams.every(s => s instanceof Stream)) { + throw new QrynError('Streams must be an array of Stream instances'); + } + + const payload = { + streams: streams.map(s => ({ + labels: s.labels, + entries: s.entries + })) + }; + + try { + return await this.service.request('/loki/api/v1/push', { + method: 'POST', + body: JSON.stringify(payload) + }); + } catch (error) { + if (error instanceof QrynError) { + throw error; + } + throw new QrynError(`Loki push failed: ${error.message}`, error.statusCode); + } + } +} + +module.exports = Loki; \ No newline at end of file diff --git a/src/clients/prometheus.js b/src/clients/prometheus.js new file mode 100644 index 0000000..5937105 --- /dev/null +++ b/src/clients/prometheus.js @@ -0,0 +1,50 @@ +const Http = require('../services/http') +const Protobuff = require('../services/protobuff') +const path = require('path'); +const {Metric} = require('../models') +const {QrynError} = require('../types') + +class Prometheus { + /** + * Create a new PrometheusRemoteWrite instance. + * @param {Http} service - The HTTP service for making requests. + */ + constructor(service) { + this.service = service; + this.protobufHandler = new Protobuff(); + } + + async push(metrics) { + if (!Array.isArray(metrics) || !metrics.every(m => m instanceof Metric)) { + throw new QrynError('Metrics must be an array of Metric instances'); + } + + + const timeseries = metrics.map(metric => metric.toTimeSeries()); + const writeRequest = { timeseries }; + + const buffer = this.protobufHandler.encodeWriteRequest(writeRequest); + const compressedBuffer = await this.protobufHandler.compressBuffer(buffer); + + + + try { + return await this.service.request('/api/v1/prom/remote/write', { + method: 'POST', + headers: { + 'Content-Type': 'application/x-protobuf', + 'Content-Encoding': 'snappy', + 'X-Prometheus-Remote-Write-Version': '0.1.0' + }, + body: compressedBuffer + }); + } catch (error) { + if (error instanceof QrynError) { + throw error; + } + throw new QrynError(`Prometheus Remote Write push failed: ${error.message}`, error.statusCode); + } + } +} + +module.exports = Prometheus; \ No newline at end of file diff --git a/src/index.js b/src/index.js new file mode 100644 index 0000000..6019513 --- /dev/null +++ b/src/index.js @@ -0,0 +1,66 @@ +const {Stream, Metric} = require('./models') +const PrometheusClient = require('./clients/prometheus') +const LokiClient = require('./clients/loki') +const Http = require('./services/http') + + + +class Auth{ + constructor({username, password}){ + this.username = username; + this.password = password; + } +} + + +/** + * Main client for qryn operations. + */ +class QrynClient { + /** + * Create a QrynClient. + * @param {Object} config - The configuration object. + * @param {string} [config.baseUrl='http://localhost:3100'] - The base URL for the qryn server. + * @param {Auth} [config.auth] - The base Auth for the qryn server. + * @param {number} [config.timeout=5000] - The timeout for requests in milliseconds. + * @param {Object} [config.headers={}] - Additional headers to send with requests. + */ + constructor(config) { + if (typeof config !== 'object' || config === null) { + throw new QrynError('Config must be a non-null object'); + } + const auth = config.auth + const baseUrl = config.baseUrl || 'http://localhost:3100'; + const timeout = config.timeout || 5000; + const headers = { + 'Content-Type': 'application/json', + ...config.headers + }; + const http = new Http(baseUrl, timeout, headers, auth); + this.prom = new PrometheusClient(http); + this.loki = new LokiClient(http); + } + + /** + * Create a new Stream instance. + * @param {Object} labels - The labels for the stream. + * @returns {Stream} A new Stream instance. + */ + createStream(labels) { + return new Stream(labels); + } + +/** + * Create a new Metric instance. + * @param {string} name - The name of the metric. + * @param {Object} [labels={}] - Optional labels for the metric. + * @returns {Metric} A new Metric instance. + */ + createMetric({ name, labels = {} }) { + return new Metric(name, labels); + } + + // Add more methods for other qryn operations as needed +} + +module.exports = { QrynClient, Stream, Metric }; \ No newline at end of file diff --git a/src/models/index.js b/src/models/index.js new file mode 100644 index 0000000..dfcb66b --- /dev/null +++ b/src/models/index.js @@ -0,0 +1,7 @@ +const Metric = require("./metric"); +const Stream = require("./stream"); + +module.exports = { + Metric, + Stream +} \ No newline at end of file diff --git a/src/models/metric.js b/src/models/metric.js new file mode 100644 index 0000000..a4faf96 --- /dev/null +++ b/src/models/metric.js @@ -0,0 +1,29 @@ +class Metric { + constructor(name, labels = {}) { + this.name = name; + this.labels = labels; + this.samples = []; + } + + addSample(value, timestamp = Date.now()) { + if (typeof value !== 'number') { + throw new Error('Value must be a number'); + } + if (typeof timestamp !== 'number') { + throw new Error('Timestamp must be a number'); + } + + this.samples.push({ value, timestamp }); + } + + toTimeSeries() { + return { + labels: [ + { name: '__name__', value: this.name }, + ...Object.entries(this.labels).map(([name, value]) => ({ name, value })) + ], + samples: this.samples + }; + } +} +module.exports = Metric; \ No newline at end of file diff --git a/src/models/stream.js b/src/models/stream.js new file mode 100644 index 0000000..b86acef --- /dev/null +++ b/src/models/stream.js @@ -0,0 +1,29 @@ +class Stream { + constructor(labels) { + if (typeof labels !== 'object' || labels === null) { + throw new Error('Labels must be a non-null object'); + } + this.labels = this.formatLabels(labels); + this.entries = []; + } + + formatLabels(labels) { + return '{' + Object.entries(labels) + .map(([key, value]) => `${key}="${value}"`) + .join(',') + '}'; + } + + addEntry(timestamp, line) { + timestamp = new Date(timestamp).toISOString() + this.entries.push({ ts: timestamp, line: line }); + } + + toJSON() { + return { + labels: this.labels, + entries: this.entries + }; + } +} + +module.exports = Stream \ No newline at end of file diff --git a/src/services/http.js b/src/services/http.js new file mode 100644 index 0000000..da2d6b5 --- /dev/null +++ b/src/services/http.js @@ -0,0 +1,76 @@ +const { QrynError } = require('../types'); +const { URL } = require('url'); +const QrynResponse = require('../types/qrynResponse'); + +/** + * Handles HTTP requests for QrynClient. + */ +class Http { + baseUrl = null; + timeout = null; + headers = null; + basicAuth = null; + + /** + * Create an HttpClient. + * @param {string} baseUrl - The base URL for the qryn server. + * @param {number} timeout - The timeout for requests in milliseconds. + * @param {Object} headers - Headers to send with requests. + */ + constructor(baseUrl, timeout, headers, auth) { + this.baseUrl = new URL(baseUrl); + this.timeout = timeout; + this.headers = headers; + this.#setBasicAuth(auth) + } + + /** + * Set basic authentication credentials. + * @param {string} username - The username for basic auth. + * @param {string} password - The password for basic auth. + */ + #setBasicAuth({username, password}) { + this.basicAuth = Buffer.from(`${username}:${password}`).toString('base64'); + } + + /** + * Make an HTTP request. + * @param {string} path - The path to append to the base URL. + * @param {Object} options - The options for the fetch request. + * @returns {Promise} The parsed JSON response. + * @throws {QrynError} If the request fails or returns a non-OK status. + */ + async request(path, options = {}) { + const url = new URL(path, this.baseUrl); + const headers = { ...this.headers, ...options.headers }; + + // Add Authorization header if basic auth is set + if (this.basicAuth) { + headers['Authorization'] = `Basic ${this.basicAuth}`; + } + + const fetchOptions = { + ...options, + headers, + signal: AbortSignal.timeout(this.timeout) + }; + + try { + const response = await fetch(url.toString(), fetchOptions); + + if (!response.ok) { + throw new QrynError(`HTTP error! status: ${response.status}`, response.status); + } + + return response.json().then( async json => { + return new QrynResponse(json, response.status, await response.headers) + }).catch(async err => { + return new QrynResponse({}, response.status, await response.headers) + }) + } catch (error) { + + throw new QrynError(`Request failed: ${error.message} ${error?.cause?.message}`, 400, error.cause); } + } +} + +module.exports = Http; \ No newline at end of file diff --git a/src/services/protobuff.js b/src/services/protobuff.js new file mode 100644 index 0000000..a3c7813 --- /dev/null +++ b/src/services/protobuff.js @@ -0,0 +1,22 @@ +// ProtobufHandler.js +const protobuf = require('protobufjs'); +const path = require('path'); +const snappy = require('snappy'); + +class ProtobufHandler { + constructor() { + this.protoRoot = protobuf.loadSync(path.resolve(__dirname, './remote.proto')); + this.WriteRequest = this.protoRoot.lookupType('WriteRequest'); + } + + encodeWriteRequest(timeseries) { + const writeRequest = this.WriteRequest.create(timeseries); + return this.WriteRequest.encode(writeRequest).finish(); + } + + async compressBuffer(buffer) { + return await snappy.compress(buffer); + } +} + +module.exports = ProtobufHandler; \ No newline at end of file diff --git a/src/services/remote.proto b/src/services/remote.proto new file mode 100644 index 0000000..1e8f835 --- /dev/null +++ b/src/services/remote.proto @@ -0,0 +1,68 @@ +syntax = "proto3"; +package prometheus; + +option go_package = "prompb"; + +message WriteRequest { + repeated prometheus.TimeSeries timeseries = 1; +} + +message ReadRequest { + repeated Query queries = 1; +} + +message ReadResponse { + // In same order as the request's queries. + repeated QueryResult results = 1; +} + +message Query { + int64 start_timestamp_ms = 1; + int64 end_timestamp_ms = 2; + repeated prometheus.LabelMatcher matchers = 3; + prometheus.ReadHints hints = 4; +} + +message QueryResult { + // Samples within a time series must be ordered by time. + repeated prometheus.TimeSeries timeseries = 1; +} + +message Sample { + double value = 1; + int64 timestamp = 2; +} + +message TimeSeries { + repeated Label labels = 1; + repeated Sample samples = 2; +} + +message Label { + string name = 1; + string value = 2; +} + +message Labels { + repeated Label labels = 1; +} + +// Matcher specifies a rule, which can match or set of labels or not. +message LabelMatcher { + enum Type { + EQ = 0; + NEQ = 1; + RE = 2; + NRE = 3; + } + Type type = 1; + string name = 2; + string value = 3; +} + +message ReadHints { + int64 step_ms = 1; // Query step size in milliseconds. + string func = 2; // String representation of surrounding function or aggregation. + int64 start_ms = 3; // Start time in milliseconds. + int64 end_ms = 4; // End time in milliseconds. +} \ No newline at end of file diff --git a/src/types/index.js b/src/types/index.js new file mode 100644 index 0000000..352dfd1 --- /dev/null +++ b/src/types/index.js @@ -0,0 +1,7 @@ +const QrynError = require("./qrynError"); +const QrynResponse = require("./qrynResponse"); + +module.exports = { + QrynError, + QrynResponse +} \ No newline at end of file diff --git a/src/types/qrynError.js b/src/types/qrynError.js new file mode 100644 index 0000000..acb0f53 --- /dev/null +++ b/src/types/qrynError.js @@ -0,0 +1,12 @@ +/** + * Custom error class for QrynClient errors. + */ +class QrynError extends Error { + constructor(message, statusCode = null, cause) { + super(message); + this.name = 'QrynError'; + this.statusCode = statusCode; + this.cause = cause; + } +} +module.exports = QrynError; \ No newline at end of file diff --git a/src/types/qrynResponse.js b/src/types/qrynResponse.js new file mode 100644 index 0000000..0629c01 --- /dev/null +++ b/src/types/qrynResponse.js @@ -0,0 +1,67 @@ +/** + * Represents a standardized response from the Qryn API. + */ +class QrynResponse { + /** + * Create a QrynResponse. + * @param {Object} data - The response data from the API. + * @param {number} status - The HTTP status code of the response. + * @param {Object} headers - The headers of the response. + */ + constructor(data, status, headers) { + this.data = data; + this.status = status; + this.headers = headers; + } + + /** + * Check if the response was successful. + * @returns {boolean} True if the status code is in the 2xx range. + */ + isSuccess() { + return this.status >= 200 && this.status < 300; + } + + /** + * Get the response data. + * @returns {Object} The response data. + */ + getData() { + return this.data; + } + + /** + * Get the HTTP status code. + * @returns {number} The HTTP status code. + */ + getStatus() { + return this.status; + } + + /** + * Get the response headers. + * @returns {Object} The response headers. + */ + getHeaders() { + return this.headers; + } + + /** + * Get a specific header value. + * @param {string} name - The name of the header. + * @returns {string|null} The value of the header, or null if not found. + */ + getHeader(name) { + return this.headers[name] || null; + } + + /** + * Convert the response to a string representation. + * @returns {string} A string representation of the response. + */ + toString() { + return `QrynResponse {status: ${this.status}, data: ${JSON.stringify(this.data)}}`; + } +} + +module.exports = QrynResponse; \ No newline at end of file