Skip to content

Commit

Permalink
error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
TzachiSh committed Sep 17, 2024
1 parent cb8ea6e commit 487d5b2
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 15 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "qryn-client",
"version": "1.0.1",
"version": "1.0.2",
"description": "A client library for interacting with qryn, a high-performance observability backend.",
"main": "src/index.js",
"scripts": {
Expand Down
18 changes: 18 additions & 0 deletions src/types/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
const QrynError = require("./qrynError");
const QrynResponse = require("./qrynResponse");

class NetworkError extends QrynError {
constructor(message, options = {}) {
super(message, options);
this.name = 'NetworkError';
this.statusCode = options.statusCode;
}
}

class ValidationError extends QrynError {
constructor(message, options = {}) {
super(message, options);
this.name = 'ValidationError';
this.field = options.field;
}
}

module.exports = {
NetworkError,
ValidationError,
QrynError,
QrynResponse
}
69 changes: 55 additions & 14 deletions src/utils/collector.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,40 @@
const { QrynError } = require('../types');
const { QrynError, NetworkError, ValidationError } = require('../types');
const { Stream, Metric } = require('../models');
const EventEmitter = require('events');

/**
* Collector class for collecting and pushing streams and metrics to Qryn.
* @extends EventEmitter
*/
class Collector {
class Collector extends EventEmitter {
/**
* Create a Collector instance.
* @param {Object} qrynClient - The Qryn client instance.
* @param {Object} [options={}] - The collector options.
* @param {number} [options.maxBulkSize=1000] - The maximum bulk size for pushing data.
* @param {number} [options.maxTimeout=5000] - The maximum timeout for pushing data.
* @param {string} options.orgId - The organization ID.
* @param {number} [options.retryAttempts=3] - The number of retry attempts for failed pushes.
* @param {number} [options.retryDelay=1000] - The delay between retry attempts in milliseconds.
*/
constructor(qrynClient, options = {}) {
super();
this.qrynClient = qrynClient;
this.maxBulkSize = options.maxBulkSize || 1000;
this.maxTimeout = options.maxTimeout || 5000;
this.orgId = options.orgId;
this.retryAttempts = options.retryAttempts || 3;
this.retryDelay = options.retryDelay || 1000;
this.streams = [];
this.metrics = [];
this.timeoutId = null;
}
get options(){

/**
* Get the current options for the collector.
* @returns {Object} The current options.
*/
get options() {
return {
orgId: this.orgId
}
Expand All @@ -31,11 +43,13 @@ class Collector {
/**
* Add a stream to the collector.
* @param {Stream} stream - The stream instance to add.
* @throws {QrynError} If the stream is not a valid Stream instance.
* @throws {ValidationError} If the stream is not a valid Stream instance.
*/
addStream(stream) {
if (!(stream instanceof Stream)) {
throw new QrynError('Invalid stream instance');
const error = new ValidationError('Invalid stream instance');
this.emit('error', error);
throw error;
}
this.streams.push(stream);
this.checkBulkSize();
Expand All @@ -44,11 +58,13 @@ class Collector {
/**
* Add a metric to the collector.
* @param {Metric} metric - The metric instance to add.
* @throws {QrynError} If the metric is not a valid Metric instance.
* @throws {ValidationError} If the metric is not a valid Metric instance.
*/
addMetric(metric) {
if (!(metric instanceof Metric)) {
throw new QrynError('Invalid metric instance');
const error = new ValidationError('Invalid metric instance');
this.emit('error', error);
throw error;
}
this.metrics.push(metric);
this.checkBulkSize();
Expand Down Expand Up @@ -84,13 +100,38 @@ class Collector {
*/
async pushBulk() {
clearTimeout(this.timeoutId);
if (this.streams.length > 0) {
await this.qrynClient.loki.push(this.streams, this.options);
this.streams = [];
}
if (this.metrics.length > 0) {
await this.qrynClient.prom.push(this.metrics, this.options);
this.metrics = [];
await this.retryOperation(async () => {
if (this.streams.length > 0) {
await this.qrynClient.loki.push(this.streams, this.options);
this.streams = [];
}
if (this.metrics.length > 0) {
await this.qrynClient.prom.push(this.metrics, this.options);
this.metrics = [];
}
});
}

/**
* Retry an operation with exponential backoff.
* @private
* @async
* @param {Function} operation - The operation to retry.
* @throws {QrynError} If all retry attempts fail.
*/
async retryOperation(operation) {
for (let attempt = 1; attempt <= this.retryAttempts; attempt++) {
try {
await operation();
return;
} catch (error) {
if (attempt === this.retryAttempts) {
const qrynError = new QrynError('Failed to push data after multiple attempts', { cause: error });
this.emit('error', qrynError);
throw qrynError;
}
await new Promise(resolve => setTimeout(resolve, this.retryDelay * Math.pow(2, attempt - 1)));
}
}
}
}
Expand Down

0 comments on commit 487d5b2

Please sign in to comment.