diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index 005b41b0..369212f4 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -89,25 +89,27 @@ if (isMainThread && !bun()) { }) } else if (isMainThread && !first) { first = true - const _throttler = require('./throttler') - throttler = { - on: _throttler.on, - postMessage: _throttler.postMessage, - removeAllListeners: _throttler.removeAllListeners, - terminate: _throttler.terminate - } - _throttler.init() - throttler.on('message', (msg) => { - switch (msg.status) { - case 'ok': - resolvers[msg.id]() - break - case 'err': - rejectors[msg.id](new Error('Database push error')) - break + setTimeout(() => { + const _throttler = require('./throttler') + throttler = { + on: _throttler.on, + postMessage: _throttler.postMessage, + removeAllListeners: _throttler.removeAllListeners, + terminate: _throttler.terminate } - delete resolvers[msg.id] - delete rejectors[msg.id] + _throttler.init() + throttler.on('message', (msg) => { + switch (msg.status) { + case 'ok': + resolvers[msg.id]() + break + case 'err': + rejectors[msg.id](new Error('Database push error')) + break + } + delete resolvers[msg.id] + delete rejectors[msg.id] + }) }) } // timeSeriesv2Throttler.start(); diff --git a/lib/db/throttler.js b/lib/db/throttler.js index 1316b480..6e50db85 100644 --- a/lib/db/throttler.js +++ b/lib/db/throttler.js @@ -1,6 +1,4 @@ const { isMainThread, parentPort } = require('worker_threads') -const axios = require('axios') -const { getClickhouseUrl, samplesTableName, rawRequest } = require('./clickhouse') const clickhouseOptions = require('./clickhouse_options').databaseOptions const logger = require('../logger') const { DATABASE_NAME } = require('../utils') @@ -8,7 +6,15 @@ const clusterName = require('../../common').clusterName const dist = clusterName ? '_dist' : '' const { EventEmitter } = require('events') +// variables to be initialized in the init() function due to the './clickhouse.js' cross-dependency & bun +let samplesThrottler +let timeSeriesThrottler +let tracesThottler +let samplesTableName +let rawRequest + const axiosError = async (err) => { + console.log('axiosError', err) try { const resp = err.response if (resp) { @@ -24,6 +30,7 @@ const axiosError = async (err) => { (err.responseData ? ' Response data: ' + err.responseData : '')) } } catch (e) { + console.log(e) return err } } @@ -63,14 +70,6 @@ class TimeoutThrottler { } } -const samplesThrottler = new TimeoutThrottler( - `INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`) -const timeSeriesThrottler = new TimeoutThrottler( - `INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`) -const tracesThottler = new TimeoutThrottler( - `INSERT INTO ${clickhouseOptions.queryOptions.database}.traces_input - (trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags) - FORMAT JSONEachRow`) const emitter = new EventEmitter() let on = true @@ -111,6 +110,20 @@ const postMessage = message => { } const init = () => { + [samplesTableName, rawRequest] = [ + require('./clickhouse').samplesTableName, + require('./clickhouse').rawRequest + ] + + samplesThrottler = new TimeoutThrottler( + `INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`) + timeSeriesThrottler = new TimeoutThrottler( + `INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`) + tracesThottler = new TimeoutThrottler( + `INSERT INTO ${clickhouseOptions.queryOptions.database}.traces_input + (trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags) + FORMAT JSONEachRow`) + setTimeout(async () => { // eslint-disable-next-line no-unmodified-loop-condition while (on) {