From 17a76edbc00edc6f78f6edc272202e771006039d Mon Sep 17 00:00:00 2001 From: akvlad Date: Wed, 25 Oct 2023 12:48:35 +0300 Subject: [PATCH] #fix: refactor to use `rawRequest` instead of axios.post; not finished --- lib/db/clickhouse.js | 33 +++++++++++++++++++++++++-------- lib/db/clickhouse_alerting.js | 24 +++++++++++++----------- lib/db/throttler.js | 10 +++------- 3 files changed, 41 insertions(+), 26 deletions(-) diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index bc7fd20e..78971af0 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -181,8 +181,8 @@ const initialize = async function (dbName) { const checkCapabilities = async () => { logger.info('Checking clickhouse capabilities') try { - await axios.post(getClickhouseUrl() + '/?allow_experimental_live_view=1', - `CREATE LIVE VIEW ${clickhouseOptions.queryOptions.database}.lvcheck WITH TIMEOUT 1 AS SELECT 1`) + await rawRequest(`CREATE LIVE VIEW ${clickhouseOptions.queryOptions.database}.lvcheck WITH TIMEOUT 1 AS SELECT 1`, + null, clickhouseOptions.queryOptions.database, { allow_experimental_live_view: 1 }) capabilities.liveView = !isCustomSamplesOrderingRule() logger.info('LIVE VIEW: supported') } catch (e) { @@ -1335,14 +1335,31 @@ const samplesReadTable = { * @param database {string} * @returns {Promise>} */ -const rawRequest = async (query, data, database) => { +const rawRequest = async (query, data, database, config) => { + config = config || {} + const params = config.params || {} try { - const getParams = [ - (database ? `database=${encodeURIComponent(database)}` : null), - (data ? `query=${encodeURIComponent(query)}` : null) - ].filter(p => p) + let getParams = {} + if (database) { + getParams.database = database + } + if (data) { + getParams.query = query + } + getParams = { + ...params, + ...getParams + } + getParams = Object.entries(getParams).map(([k, v]) => `${k}=${encodeURIComponent(v)}`) const url = `${getClickhouseUrl()}/${getParams.length ? `?${getParams.join('&')}` : ''}` - return await axios.post(url, data || query) + const res = await axios.post(url, data || query, { + ...config, + headers: { + 'Content-Type': 'text/plain', + ...(config.headers || {}) + } + }) + return res } catch (e) { logger.error(new Error(e.message)) if (e.response) { diff --git a/lib/db/clickhouse_alerting.js b/lib/db/clickhouse_alerting.js index 5c99d642..25749b65 100644 --- a/lib/db/clickhouse_alerting.js +++ b/lib/db/clickhouse_alerting.js @@ -5,7 +5,7 @@ const axios = require('axios') const { DATABASE_NAME } = require('../utils') const UTILS = require('../utils') -const { getClickhouseUrl } = require('./clickhouse') +const { getClickhouseUrl, rawRequest } = require('./clickhouse') const Sql = require('@cloki/clickhouse-sql') const { clusterName } = require('../../common') const onCluster = clusterName ? `ON CLUSTER ${clusterName}` : '' @@ -19,14 +19,13 @@ const dist = clusterName ? '_dist' : '' module.exports.getAlertRule = async (ns, group, name) => { const fp = getRuleFP(ns, group, name) const mark = Math.random() - const res = await axios.post(getClickhouseUrl(), + const res = await rawRequest( 'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' + `FROM ${DATABASE_NAME()}.settings${dist} ` + `WHERE fingerprint = ${fp} AND ${mark} == ${mark} ` + 'GROUP BY fingerprint ' + 'HAVING name != \'\' ' + - 'FORMAT JSON' - ) + 'FORMAT JSON', null, DATABASE_NAME()) if (!res.data.data.length) { return undefined } @@ -49,10 +48,11 @@ module.exports.putAlertRule = async (namespace, group, rule) => { const groupName = JSON.stringify({ type: 'alert_group', ns: namespace, group: group.name }) const groupFp = getGroupFp(namespace, group.name) const groupVal = JSON.stringify({ name: group.name, interval: group.interval }) - await axios.post(getClickhouseUrl(), - `INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow \n` + + await rawRequest( + `INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow`, JSON.stringify({ fingerprint: ruleFp, type: 'alert_rule', name: ruleName, value: JSON.stringify(ruleVal), inserted_at: Date.now() * 1000000 }) + '\n' + - JSON.stringify({ fingerprint: groupFp, type: 'alert_group', name: groupName, value: groupVal, inserted_at: Date.now() * 1000000 }) + JSON.stringify({ fingerprint: groupFp, type: 'alert_group', name: groupName, value: groupVal, inserted_at: Date.now() * 1000000 }), + DATABASE_NAME() ) } @@ -66,8 +66,9 @@ module.exports.putAlertRule = async (namespace, group, rule) => { module.exports.getLastCheck = async (ns, group, rule, id) => { const fp = getRuleFP(ns, group, rule) id = id || 0 - const resp = await axios.post(getClickhouseUrl(), - `SELECT max(mark) as maxmark FROM ${DATABASE_NAME()}._alert_view_${fp}_mark WHERE id = ${id} FORMAT JSON` + const resp = await rawRequest( + `SELECT max(mark) as maxmark FROM ${DATABASE_NAME()}._alert_view_${fp}_mark WHERE id = ${id} FORMAT JSON`, + null, DATABASE_NAME() ) if (!resp.data.data[0]) { return 0 @@ -100,11 +101,12 @@ module.exports.getAlertRules = async (limit, offset) => { const _limit = limit ? `LIMIT ${limit}` : '' const _offset = offset ? `OFFSET ${offset}` : '' const mark = Math.random() - const res = await axios.post(getClickhouseUrl(), + const res = rawRequest( 'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' + `FROM ${DATABASE_NAME()}.settings${dist} ` + `WHERE type == 'alert_rule' AND ${mark} == ${mark} ` + - `GROUP BY fingerprint HAVING name != '' ORDER BY name ${_limit} ${_offset} FORMAT JSON`) + `GROUP BY fingerprint HAVING name != '' ORDER BY name ${_limit} ${_offset} FORMAT JSON`, + null, DATABASE_NAME()) return res.data.data.map(e => { return { rule: JSON.parse(e.value), name: JSON.parse(e.name) } }) diff --git a/lib/db/throttler.js b/lib/db/throttler.js index 21a3250c..f9bce7f4 100644 --- a/lib/db/throttler.js +++ b/lib/db/throttler.js @@ -1,8 +1,9 @@ const { isMainThread, parentPort } = require('worker_threads') const axios = require('axios') -const { getClickhouseUrl, samplesTableName } = require('./clickhouse') +const { getClickhouseUrl, samplesTableName, rawRequest } = require('./clickhouse') const clickhouseOptions = require('./clickhouse').databaseOptions const logger = require('../logger') +const { DATABASE_NAME } = require('../utils') const clusterName = require('../../common').clusterName const dist = clusterName ? '_dist' : '' @@ -49,12 +50,7 @@ class TimeoutThrottler { } const _queue = this.queue this.queue = [] - await axios.post(`${getClickhouseUrl()}/?query=${this.statement}`, - _queue.join('\n'), - { - maxBodyLength: Infinity - } - ) + await rawRequest(this.statement, _queue.join('\n'), DATABASE_NAME(), { maxBodyLength: Infinity }) } stop () {