From f82c3dc970a61cf198442ecbfc8c8ee1a4562438 Mon Sep 17 00:00:00 2001 From: Lorenzo Mangani Date: Sun, 22 Oct 2023 17:21:28 +0200 Subject: [PATCH 1/7] Always use db name explicitly Signed-off-by: Lorenzo Mangani --- lib/db/clickhouse.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index 6e7ab8bf..ea0d4d29 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -1299,7 +1299,7 @@ const samplesReadTable = { } logger.info('checking last timestamp') const v1EndTime = await axios.post(`${getClickhouseUrl()}/?database=${UTILS.DATABASE_NAME()}`, - `SELECT max(timestamp_ns) as ts FROM ${tableName} format JSON`) + `SELECT max(timestamp_ns) as ts FROM ${UTILS.DATABASE_NAME()}.${tableName} format JSON`) if (!v1EndTime.data.rows) { samplesReadTable.v1 = false return @@ -1319,7 +1319,7 @@ const samplesReadTable = { settingsVersions: async function () { const versions = await axios.post(`${getClickhouseUrl()}/?database=${UTILS.DATABASE_NAME()}`, `SELECT argMax(name, inserted_at) as _name, argMax(value, inserted_at) as _value - FROM settings${dist} WHERE type == 'update' GROUP BY fingerprint HAVING _name != '' FORMAT JSON`) + FROM ${UTILS.DATABASE_NAME()}.settings${dist} WHERE type == 'update' GROUP BY fingerprint HAVING _name != '' FORMAT JSON`) for (const version of versions.data.data) { this.versions[version._name] = parseInt(version._value) * 1000 } @@ -1355,7 +1355,7 @@ const getSettings = async (names, database) => { 'short-hash')) const settings = await rawRequest(`SELECT argMax(name, inserted_at) as _name, argMax(value, inserted_at) as _value - FROM settings${dist} WHERE fingerprint IN (${fps.join(',')}) GROUP BY fingerprint HAVING _name != '' FORMAT JSON`, + FROM ${UTILS.DATABASE_NAME()}.settings${dist} WHERE fingerprint IN (${fps.join(',')}) GROUP BY fingerprint HAVING _name != '' FORMAT JSON`, null, database) return settings.data.data.reduce((sum, cur) => { sum[cur._name] = cur._value @@ -1373,7 +1373,7 @@ const getSettings = async (names, database) => { */ const addSetting = async (type, name, value, database) => { const fp = UTILS.fingerPrint(JSON.stringify({ type: type, name: name }), false, 'short-hash') - return rawRequest('INSERT INTO settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow', + return rawRequest('INSERT INTO ${UTILS.DATABASE_NAME()}.settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow', JSON.stringify({ fingerprint: fp, type: type, From a855ba5d53a5f8a3817119d8c50681211f2749eb Mon Sep 17 00:00:00 2001 From: Lorenzo Mangani Date: Sun, 22 Oct 2023 17:30:08 +0200 Subject: [PATCH 2/7] Use {{DB}} in all queries Signed-off-by: Lorenzo Mangani --- lib/db/maintain/scripts.js | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/db/maintain/scripts.js b/lib/db/maintain/scripts.js index 08ad00a2..a595ffac 100644 --- a/lib/db/maintain/scripts.js +++ b/lib/db/maintain/scripts.js @@ -40,7 +40,7 @@ module.exports.overall = [ AS SELECT date, pairs.1 as key, pairs.2 as val, fingerprint FROM time_series ARRAY JOIN JSONExtractKeysAndValues(time_series.labels, 'String') as pairs`, - `INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_5'), 'update', + `INSERT INTO {{DB}}.settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_5'), 'update', 'v3_1', toString(toUnixTimestamp(NOW())), NOW())`, `CREATE TABLE IF NOT EXISTS {{DB}}.metrics_15s {{{OnCluster}}} ( @@ -65,10 +65,10 @@ SELECT fingerprint, countState() as count, sumSimpleState(value) as sum, sumSimpleState(length(string)) as bytes -FROM samples_v3 as samples +FROM {{DB}}.samples_v3 as samples GROUP BY fingerprint, timestamp_ns;`, - `INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_2'), 'update', + `INSERT INTO {{DB}}.settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_2'), 'update', 'v3_2', toString(toUnixTimestamp(NOW())), NOW())` ] @@ -151,7 +151,7 @@ module.exports.traces = [ duration_ns as duration FROM traces_input ARRAY JOIN tags`, - `INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('tempo_traces_v1'), 'update', + `INSERT INTO {{DB}}.settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('tempo_traces_v1'), 'update', 'tempo_traces_v2', toString(toUnixTimestamp(NOW())), NOW())` ] @@ -198,7 +198,7 @@ module.exports.overall_dist = [ ] module.exports.traces_dist = [ - `CREATE TABLE IF NOT EXISTS tempo_traces_kv_dist {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_kv_dist {{{OnCluster}}} ( oid String, date Date, key String, @@ -206,7 +206,7 @@ module.exports.traces_dist = [ val String ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_kv', sipHash64(oid, key));`, - `CREATE TABLE IF NOT EXISTS tempo_traces_dist {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_dist {{{OnCluster}}} ( oid String, trace_id FixedString(16), span_id FixedString(8), @@ -219,7 +219,7 @@ module.exports.traces_dist = [ payload String, ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces', sipHash64(oid, trace_id));`, - `CREATE TABLE IF NOT EXISTS tempo_traces_attrs_gin_dist {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_attrs_gin_dist {{{OnCluster}}} ( oid String, date Date, key String, From 366a36d55b4ee72bc7e71483370717f075223325 Mon Sep 17 00:00:00 2001 From: Lorenzo Mangani Date: Sun, 22 Oct 2023 17:32:22 +0200 Subject: [PATCH 3/7] Use {{DB}} in all queries Signed-off-by: Lorenzo Mangani --- lib/db/maintain/index.js | 42 ++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/lib/db/maintain/index.js b/lib/db/maintain/index.js index f1689154..5fb58d4a 100644 --- a/lib/db/maintain/index.js +++ b/lib/db/maintain/index.js @@ -51,7 +51,7 @@ const upgradeSingle = async (db, key, scripts) => { 'ENGINE={{ReplacingMergeTree}}(ver) ORDER BY k', true) } } - let ver = await client.rawRequest(`SELECT max(ver) as ver FROM ver WHERE k = ${key} FORMAT JSON`, + let ver = await client.rawRequest(`SELECT max(ver) as ver FROM {{DB}}.ver WHERE k = ${key} FORMAT JSON`, null, db) ver = ver.data.data && ver.data.data[0] && ver.data.data[0].ver ? ver.data.data[0].ver : 0 for (let i = parseInt(ver); i < scripts.length; ++i) { @@ -83,8 +83,8 @@ const upgradeRequest = async (db, request, useDefaultDB, updateVer) => { console.log(request) await client.rawRequest(request, null, useDefaultDB ? db : undefined) if (updateVer) { - console.log(`INSERT INTO ver (k, ver) VALUES (${updateVer.key}, ${updateVer.to})`) - await client.rawRequest(`INSERT INTO ver (k, ver) VALUES (${updateVer.key}, ${updateVer.to})`, null, db) + console.log(`INSERT INTO {{DB}}.ver (k, ver) VALUES (${updateVer.key}, ${updateVer.to})`) + await client.rawRequest(`INSERT INTO {{DB}}.ver (k, ver) VALUES (${updateVer.key}, ${updateVer.to})`, null, db) } } @@ -105,53 +105,53 @@ module.exports.rotate = async (opts) => { return upgradeRequest(db.db, req, true) } if (db.samples_days + '' !== settings.v3_samples_days) { - const alterTable = 'ALTER TABLE samples_v3 {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - const rotateTable = `ALTER TABLE samples_v3 {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` + const alterTable = 'ALTER TABLE {{DB}}.samples_v3 {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + const rotateTable = `ALTER TABLE {{DB}}.samples_v3 {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) await client.addSetting('rotate', 'v3_samples_days', db.samples_days + '', db.db) } if (db.time_series_days + '' !== settings.v3_time_series_days) { - const alterTable = 'ALTER TABLE time_series {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - const rotateTable = `ALTER TABLE time_series {{{OnCluster}}} MODIFY TTL "date" + INTERVAL ${db.time_series_days} DAY` + const alterTable = 'ALTER TABLE {{DB}}.time_series {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + const rotateTable = `ALTER TABLE {{DB}}.time_series {{{OnCluster}}} MODIFY TTL "date" + INTERVAL ${db.time_series_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) - const alterView = 'ALTER TABLE time_series_gin {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - const rotateView = `ALTER TABLE time_series_gin {{{OnCluster}}} MODIFY TTL "date" + INTERVAL ${db.time_series_days} DAY` + const alterView = 'ALTER TABLE {{DB}}.time_series_gin {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + const rotateView = `ALTER TABLE {{DB}}.time_series_gin {{{OnCluster}}} MODIFY TTL "date" + INTERVAL ${db.time_series_days} DAY` await _update(alterView, null, db.db) await _update(rotateView, null, db.db) await client.addSetting('rotate', 'v3_time_series_days', db.time_series_days + '', db.db) } if (db.storage_policy && db.storage_policy !== settings.v3_storage_policy) { logger.debug(`Altering storage policy: ${db.storage_policy}`) - const alterTs = `ALTER TABLE time_series {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` - const alterTsVw = `ALTER TABLE time_series_gin {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` - const alterSm = `ALTER TABLE samples_v3 {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` + const alterTs = `ALTER TABLE {{DB}}.time_series {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` + const alterTsVw = `ALTER TABLE {{DB}}.time_series_gin {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` + const alterSm = `ALTER TABLE {{DB}}.samples_v3 {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` await _update(alterTs, null, db.db) await _update(alterTsVw, null, db.db) await _update(alterSm, null, db.db) await client.addSetting('rotate', 'v3_storage_policy', db.storage_policy, db.db) } if (db.samples_days + '' !== settings.v1_traces_days) { - let alterTable = 'ALTER TABLE tempo_traces {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - let rotateTable = `ALTER TABLE tempo_traces {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` + let alterTable = 'ALTER TABLE {{DB}}.tempo_traces {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + let rotateTable = `ALTER TABLE {{DB}}.tempo_traces {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) - alterTable = 'ALTER TABLE tempo_traces_attrs_gin {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - rotateTable = `ALTER TABLE tempo_traces_attrs_gin {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + alterTable = 'ALTER TABLE {{DB}}.tempo_traces_attrs_gin {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE {{DB}}.tempo_traces_attrs_gin {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) - alterTable = 'ALTER TABLE tempo_traces_kv {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - rotateTable = `ALTER TABLE tempo_traces_kv {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + alterTable = 'ALTER TABLE {{DB}}.tempo_traces_kv {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE {{DB}}.tempo_traces_kv {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) await client.addSetting('rotate', 'v1_traces_days', db.samples_days + '', db.db) } if (db.storage_policy && db.storage_policy !== settings.v1_traces_storage_policy) { logger.debug(`Altering storage policy: ${db.storage_policy}`) - const alterTs = `ALTER TABLE tempo_traces MODIFY SETTING storage_policy='${db.storage_policy}'` - const alterTsVw = `ALTER TABLE tempo_traces_attrs_gin MODIFY SETTING storage_policy='${db.storage_policy}'` - const alterSm = `ALTER TABLE tempo_traces_kv MODIFY SETTING storage_policy='${db.storage_policy}'` + const alterTs = `ALTER TABLE {{DB}}.tempo_traces MODIFY SETTING storage_policy='${db.storage_policy}'` + const alterTsVw = `ALTER TABLE {{DB}}.tempo_traces_attrs_gin MODIFY SETTING storage_policy='${db.storage_policy}'` + const alterSm = `ALTER TABLE {{DB}}.tempo_traces_kv MODIFY SETTING storage_policy='${db.storage_policy}'` await _update(alterTs, null, db.db) await _update(alterTsVw, null, db.db) await _update(alterSm, null, db.db) From a140ba68abefd11d5cbce77ed7d2e673e4706da4 Mon Sep 17 00:00:00 2001 From: akvlad Date: Tue, 24 Oct 2023 13:09:02 +0300 Subject: [PATCH 4/7] #fix: db name substitution --- lib/db/clickhouse.js | 31 ++++++++++++++++++++----------- lib/db/maintain/index.js | 12 ++++++------ 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index ea0d4d29..bc7fd20e 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -1335,13 +1335,21 @@ const samplesReadTable = { * @param database {string} * @returns {Promise>} */ -const rawRequest = (query, data, database) => { - const getParams = [ - (database ? `database=${encodeURIComponent(database)}` : null), - (data ? `query=${encodeURIComponent(query)}` : null) - ].filter(p => p) - const url = `${getClickhouseUrl()}/${getParams.length ? `?${getParams.join('&')}` : ''}` - return axios.post(url, data || query) +const rawRequest = async (query, data, database) => { + try { + const getParams = [ + (database ? `database=${encodeURIComponent(database)}` : null), + (data ? `query=${encodeURIComponent(query)}` : null) + ].filter(p => p) + const url = `${getClickhouseUrl()}/${getParams.length ? `?${getParams.join('&')}` : ''}` + return await axios.post(url, data || query) + } catch (e) { + logger.error(new Error(e.message)) + if (e.response) { + logger.error(`${e.response.status} ${e.response.data}`) + } + throw e + } } /** @@ -1353,10 +1361,11 @@ const rawRequest = (query, data, database) => { const getSettings = async (names, database) => { const fps = names.map(n => UTILS.fingerPrint(JSON.stringify({ type: n.type, name: n.name }), false, 'short-hash')) - const settings = await rawRequest(`SELECT argMax(name, inserted_at) as _name, + const r1 = await rawRequest(`SHOW TABLES FROM ${database} FORMAT JSON`, null, database) + const req = `SELECT argMax(name, inserted_at) as _name, argMax(value, inserted_at) as _value - FROM ${UTILS.DATABASE_NAME()}.settings${dist} WHERE fingerprint IN (${fps.join(',')}) GROUP BY fingerprint HAVING _name != '' FORMAT JSON`, - null, database) + FROM ${database}.settings${dist} WHERE fingerprint IN (${fps.join(',')}) GROUP BY fingerprint HAVING _name != '' FORMAT JSON` + const settings = await rawRequest(req, null, database) return settings.data.data.reduce((sum, cur) => { sum[cur._name] = cur._value return sum @@ -1373,7 +1382,7 @@ const getSettings = async (names, database) => { */ const addSetting = async (type, name, value, database) => { const fp = UTILS.fingerPrint(JSON.stringify({ type: type, name: name }), false, 'short-hash') - return rawRequest('INSERT INTO ${UTILS.DATABASE_NAME()}.settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow', + return rawRequest(`INSERT INTO ${UTILS.DATABASE_NAME()}.settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow`, JSON.stringify({ fingerprint: fp, type: type, diff --git a/lib/db/maintain/index.js b/lib/db/maintain/index.js index 5fb58d4a..1a756b46 100644 --- a/lib/db/maintain/index.js +++ b/lib/db/maintain/index.js @@ -51,8 +51,8 @@ const upgradeSingle = async (db, key, scripts) => { 'ENGINE={{ReplacingMergeTree}}(ver) ORDER BY k', true) } } - let ver = await client.rawRequest(`SELECT max(ver) as ver FROM {{DB}}.ver WHERE k = ${key} FORMAT JSON`, - null, db) + let ver = await _upgradeRequest(`SELECT max(ver) as ver FROM {{DB}}.ver WHERE k = ${key} FORMAT JSON`, + true) ver = ver.data.data && ver.data.data[0] && ver.data.data[0].ver ? ver.data.data[0].ver : 0 for (let i = parseInt(ver); i < scripts.length; ++i) { if (!scripts[i]) { continue } @@ -66,7 +66,7 @@ const upgradeSingle = async (db, key, scripts) => { * @param request {string} request tpl * @param useDefaultDB {boolean} use db as default db * @param updateVer {{key: number, to: number}} update ver table - * @returns {Promise} + * @returns {Promise>} */ const upgradeRequest = async (db, request, useDefaultDB, updateVer) => { const tpl = hb.compile(request) @@ -81,11 +81,11 @@ const upgradeRequest = async (db, request, useDefaultDB, updateVer) => { AggregatingMergeTree: clusterName ? 'ReplicatedAggregatingMergeTree' : 'AggregatingMergeTree' }) console.log(request) - await client.rawRequest(request, null, useDefaultDB ? db : undefined) + const res = await client.rawRequest(request, null, useDefaultDB ? db : undefined) if (updateVer) { - console.log(`INSERT INTO {{DB}}.ver (k, ver) VALUES (${updateVer.key}, ${updateVer.to})`) - await client.rawRequest(`INSERT INTO {{DB}}.ver (k, ver) VALUES (${updateVer.key}, ${updateVer.to})`, null, db) + await client.rawRequest(`INSERT INTO ${db}.ver (k, ver) VALUES (${updateVer.key}, ${updateVer.to})`, null, db) } + return res } /** From 17a76edbc00edc6f78f6edc272202e771006039d Mon Sep 17 00:00:00 2001 From: akvlad Date: Wed, 25 Oct 2023 12:48:35 +0300 Subject: [PATCH 5/7] #fix: refactor to use `rawRequest` instead of axios.post; not finished --- lib/db/clickhouse.js | 33 +++++++++++++++++++++++++-------- lib/db/clickhouse_alerting.js | 24 +++++++++++++----------- lib/db/throttler.js | 10 +++------- 3 files changed, 41 insertions(+), 26 deletions(-) diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index bc7fd20e..78971af0 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -181,8 +181,8 @@ const initialize = async function (dbName) { const checkCapabilities = async () => { logger.info('Checking clickhouse capabilities') try { - await axios.post(getClickhouseUrl() + '/?allow_experimental_live_view=1', - `CREATE LIVE VIEW ${clickhouseOptions.queryOptions.database}.lvcheck WITH TIMEOUT 1 AS SELECT 1`) + await rawRequest(`CREATE LIVE VIEW ${clickhouseOptions.queryOptions.database}.lvcheck WITH TIMEOUT 1 AS SELECT 1`, + null, clickhouseOptions.queryOptions.database, { allow_experimental_live_view: 1 }) capabilities.liveView = !isCustomSamplesOrderingRule() logger.info('LIVE VIEW: supported') } catch (e) { @@ -1335,14 +1335,31 @@ const samplesReadTable = { * @param database {string} * @returns {Promise>} */ -const rawRequest = async (query, data, database) => { +const rawRequest = async (query, data, database, config) => { + config = config || {} + const params = config.params || {} try { - const getParams = [ - (database ? `database=${encodeURIComponent(database)}` : null), - (data ? `query=${encodeURIComponent(query)}` : null) - ].filter(p => p) + let getParams = {} + if (database) { + getParams.database = database + } + if (data) { + getParams.query = query + } + getParams = { + ...params, + ...getParams + } + getParams = Object.entries(getParams).map(([k, v]) => `${k}=${encodeURIComponent(v)}`) const url = `${getClickhouseUrl()}/${getParams.length ? `?${getParams.join('&')}` : ''}` - return await axios.post(url, data || query) + const res = await axios.post(url, data || query, { + ...config, + headers: { + 'Content-Type': 'text/plain', + ...(config.headers || {}) + } + }) + return res } catch (e) { logger.error(new Error(e.message)) if (e.response) { diff --git a/lib/db/clickhouse_alerting.js b/lib/db/clickhouse_alerting.js index 5c99d642..25749b65 100644 --- a/lib/db/clickhouse_alerting.js +++ b/lib/db/clickhouse_alerting.js @@ -5,7 +5,7 @@ const axios = require('axios') const { DATABASE_NAME } = require('../utils') const UTILS = require('../utils') -const { getClickhouseUrl } = require('./clickhouse') +const { getClickhouseUrl, rawRequest } = require('./clickhouse') const Sql = require('@cloki/clickhouse-sql') const { clusterName } = require('../../common') const onCluster = clusterName ? `ON CLUSTER ${clusterName}` : '' @@ -19,14 +19,13 @@ const dist = clusterName ? '_dist' : '' module.exports.getAlertRule = async (ns, group, name) => { const fp = getRuleFP(ns, group, name) const mark = Math.random() - const res = await axios.post(getClickhouseUrl(), + const res = await rawRequest( 'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' + `FROM ${DATABASE_NAME()}.settings${dist} ` + `WHERE fingerprint = ${fp} AND ${mark} == ${mark} ` + 'GROUP BY fingerprint ' + 'HAVING name != \'\' ' + - 'FORMAT JSON' - ) + 'FORMAT JSON', null, DATABASE_NAME()) if (!res.data.data.length) { return undefined } @@ -49,10 +48,11 @@ module.exports.putAlertRule = async (namespace, group, rule) => { const groupName = JSON.stringify({ type: 'alert_group', ns: namespace, group: group.name }) const groupFp = getGroupFp(namespace, group.name) const groupVal = JSON.stringify({ name: group.name, interval: group.interval }) - await axios.post(getClickhouseUrl(), - `INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow \n` + + await rawRequest( + `INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow`, JSON.stringify({ fingerprint: ruleFp, type: 'alert_rule', name: ruleName, value: JSON.stringify(ruleVal), inserted_at: Date.now() * 1000000 }) + '\n' + - JSON.stringify({ fingerprint: groupFp, type: 'alert_group', name: groupName, value: groupVal, inserted_at: Date.now() * 1000000 }) + JSON.stringify({ fingerprint: groupFp, type: 'alert_group', name: groupName, value: groupVal, inserted_at: Date.now() * 1000000 }), + DATABASE_NAME() ) } @@ -66,8 +66,9 @@ module.exports.putAlertRule = async (namespace, group, rule) => { module.exports.getLastCheck = async (ns, group, rule, id) => { const fp = getRuleFP(ns, group, rule) id = id || 0 - const resp = await axios.post(getClickhouseUrl(), - `SELECT max(mark) as maxmark FROM ${DATABASE_NAME()}._alert_view_${fp}_mark WHERE id = ${id} FORMAT JSON` + const resp = await rawRequest( + `SELECT max(mark) as maxmark FROM ${DATABASE_NAME()}._alert_view_${fp}_mark WHERE id = ${id} FORMAT JSON`, + null, DATABASE_NAME() ) if (!resp.data.data[0]) { return 0 @@ -100,11 +101,12 @@ module.exports.getAlertRules = async (limit, offset) => { const _limit = limit ? `LIMIT ${limit}` : '' const _offset = offset ? `OFFSET ${offset}` : '' const mark = Math.random() - const res = await axios.post(getClickhouseUrl(), + const res = rawRequest( 'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' + `FROM ${DATABASE_NAME()}.settings${dist} ` + `WHERE type == 'alert_rule' AND ${mark} == ${mark} ` + - `GROUP BY fingerprint HAVING name != '' ORDER BY name ${_limit} ${_offset} FORMAT JSON`) + `GROUP BY fingerprint HAVING name != '' ORDER BY name ${_limit} ${_offset} FORMAT JSON`, + null, DATABASE_NAME()) return res.data.data.map(e => { return { rule: JSON.parse(e.value), name: JSON.parse(e.name) } }) diff --git a/lib/db/throttler.js b/lib/db/throttler.js index 21a3250c..f9bce7f4 100644 --- a/lib/db/throttler.js +++ b/lib/db/throttler.js @@ -1,8 +1,9 @@ const { isMainThread, parentPort } = require('worker_threads') const axios = require('axios') -const { getClickhouseUrl, samplesTableName } = require('./clickhouse') +const { getClickhouseUrl, samplesTableName, rawRequest } = require('./clickhouse') const clickhouseOptions = require('./clickhouse').databaseOptions const logger = require('../logger') +const { DATABASE_NAME } = require('../utils') const clusterName = require('../../common').clusterName const dist = clusterName ? '_dist' : '' @@ -49,12 +50,7 @@ class TimeoutThrottler { } const _queue = this.queue this.queue = [] - await axios.post(`${getClickhouseUrl()}/?query=${this.statement}`, - _queue.join('\n'), - { - maxBodyLength: Infinity - } - ) + await rawRequest(this.statement, _queue.join('\n'), DATABASE_NAME(), { maxBodyLength: Infinity }) } stop () { From c407653d0a7f1b6ae9354ced72fc5dc6ad81f321 Mon Sep 17 00:00:00 2001 From: akvlad Date: Tue, 19 Mar 2024 13:47:30 +0200 Subject: [PATCH 6/7] fix: await --- lib/db/clickhouse_alerting.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/db/clickhouse_alerting.js b/lib/db/clickhouse_alerting.js index 25749b65..f2b0765a 100644 --- a/lib/db/clickhouse_alerting.js +++ b/lib/db/clickhouse_alerting.js @@ -101,7 +101,7 @@ module.exports.getAlertRules = async (limit, offset) => { const _limit = limit ? `LIMIT ${limit}` : '' const _offset = offset ? `OFFSET ${offset}` : '' const mark = Math.random() - const res = rawRequest( + const res = await rawRequest( 'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' + `FROM ${DATABASE_NAME()}.settings${dist} ` + `WHERE type == 'alert_rule' AND ${mark} == ${mark} ` + From 6cd25c2b6af23d5d88ded5ed9c6a202c3b2dc675 Mon Sep 17 00:00:00 2001 From: akvlad Date: Tue, 19 Mar 2024 13:50:46 +0200 Subject: [PATCH 7/7] fix: DB.table --- lib/db/maintain/index.js | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/db/maintain/index.js b/lib/db/maintain/index.js index 045c7b11..b66aaabd 100644 --- a/lib/db/maintain/index.js +++ b/lib/db/maintain/index.js @@ -163,20 +163,20 @@ module.exports.rotate = async (opts) => { await client.addSetting('rotate', 'v1_traces_storage_policy', db.storage_policy, db.db) } if (db.samples_days + '' !== settings.v1_profiles_days) { - let alterTable = 'ALTER TABLE profiles {{DB}}.{{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - let rotateTable = `ALTER TABLE profiles {{DB}}.{{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` + let alterTable = 'ALTER TABLE {{DB}}.profiles {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + let rotateTable = `ALTER TABLE {{DB}}.profiles {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) - alterTable = 'ALTER TABLE profiles_series {{DB}}.{{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - rotateTable = `ALTER TABLE profiles_series {{DB}}.{{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + alterTable = 'ALTER TABLE {{DB}}.profiles_series {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE {{DB}}.profiles_series {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) - alterTable = 'ALTER TABLE profiles_series_gin {{DB}}.{{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - rotateTable = `ALTER TABLE profiles_series_gin {{DB}}.{{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + alterTable = 'ALTER TABLE {{DB}}.profiles_series_gin {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE {{DB}}.profiles_series_gin {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) - alterTable = 'ALTER TABLE profiles_series_keys {{DB}}.{{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - rotateTable = `ALTER TABLE profiles_series_keys {{DB}}.{{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + alterTable = 'ALTER TABLE {{DB}}.profiles_series_keys {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE {{DB}}.profiles_series_keys {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) await client.addSetting('rotate', 'v1_profiles_days', db.samples_days + '', db.db)