Skip to content

Commit

Permalink
#fix: refactor to use rawRequest instead of axios.post; not finished
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Oct 25, 2023
1 parent c60ab47 commit 17a76ed
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 26 deletions.
33 changes: 25 additions & 8 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ const initialize = async function (dbName) {
const checkCapabilities = async () => {
logger.info('Checking clickhouse capabilities')
try {
await axios.post(getClickhouseUrl() + '/?allow_experimental_live_view=1',
`CREATE LIVE VIEW ${clickhouseOptions.queryOptions.database}.lvcheck WITH TIMEOUT 1 AS SELECT 1`)
await rawRequest(`CREATE LIVE VIEW ${clickhouseOptions.queryOptions.database}.lvcheck WITH TIMEOUT 1 AS SELECT 1`,
null, clickhouseOptions.queryOptions.database, { allow_experimental_live_view: 1 })
capabilities.liveView = !isCustomSamplesOrderingRule()
logger.info('LIVE VIEW: supported')
} catch (e) {
Expand Down Expand Up @@ -1335,14 +1335,31 @@ const samplesReadTable = {
* @param database {string}
* @returns {Promise<AxiosResponse<any>>}
*/
const rawRequest = async (query, data, database) => {
const rawRequest = async (query, data, database, config) => {
config = config || {}
const params = config.params || {}
try {
const getParams = [
(database ? `database=${encodeURIComponent(database)}` : null),
(data ? `query=${encodeURIComponent(query)}` : null)
].filter(p => p)
let getParams = {}
if (database) {
getParams.database = database
}
if (data) {
getParams.query = query
}
getParams = {
...params,
...getParams
}
getParams = Object.entries(getParams).map(([k, v]) => `${k}=${encodeURIComponent(v)}`)
const url = `${getClickhouseUrl()}/${getParams.length ? `?${getParams.join('&')}` : ''}`
return await axios.post(url, data || query)
const res = await axios.post(url, data || query, {
...config,
headers: {
'Content-Type': 'text/plain',
...(config.headers || {})
}
})
return res
} catch (e) {
logger.error(new Error(e.message))
if (e.response) {
Expand Down
24 changes: 13 additions & 11 deletions lib/db/clickhouse_alerting.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
const axios = require('axios')
const { DATABASE_NAME } = require('../utils')
const UTILS = require('../utils')
const { getClickhouseUrl } = require('./clickhouse')
const { getClickhouseUrl, rawRequest } = require('./clickhouse')
const Sql = require('@cloki/clickhouse-sql')
const { clusterName } = require('../../common')
const onCluster = clusterName ? `ON CLUSTER ${clusterName}` : ''
Expand All @@ -19,14 +19,13 @@ const dist = clusterName ? '_dist' : ''
module.exports.getAlertRule = async (ns, group, name) => {
const fp = getRuleFP(ns, group, name)
const mark = Math.random()
const res = await axios.post(getClickhouseUrl(),
const res = await rawRequest(
'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' +
`FROM ${DATABASE_NAME()}.settings${dist} ` +
`WHERE fingerprint = ${fp} AND ${mark} == ${mark} ` +
'GROUP BY fingerprint ' +
'HAVING name != \'\' ' +
'FORMAT JSON'
)
'FORMAT JSON', null, DATABASE_NAME())
if (!res.data.data.length) {
return undefined
}
Expand All @@ -49,10 +48,11 @@ module.exports.putAlertRule = async (namespace, group, rule) => {
const groupName = JSON.stringify({ type: 'alert_group', ns: namespace, group: group.name })
const groupFp = getGroupFp(namespace, group.name)
const groupVal = JSON.stringify({ name: group.name, interval: group.interval })
await axios.post(getClickhouseUrl(),
`INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow \n` +
await rawRequest(
`INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow`,
JSON.stringify({ fingerprint: ruleFp, type: 'alert_rule', name: ruleName, value: JSON.stringify(ruleVal), inserted_at: Date.now() * 1000000 }) + '\n' +
JSON.stringify({ fingerprint: groupFp, type: 'alert_group', name: groupName, value: groupVal, inserted_at: Date.now() * 1000000 })
JSON.stringify({ fingerprint: groupFp, type: 'alert_group', name: groupName, value: groupVal, inserted_at: Date.now() * 1000000 }),
DATABASE_NAME()
)
}

Expand All @@ -66,8 +66,9 @@ module.exports.putAlertRule = async (namespace, group, rule) => {
module.exports.getLastCheck = async (ns, group, rule, id) => {
const fp = getRuleFP(ns, group, rule)
id = id || 0
const resp = await axios.post(getClickhouseUrl(),
`SELECT max(mark) as maxmark FROM ${DATABASE_NAME()}._alert_view_${fp}_mark WHERE id = ${id} FORMAT JSON`
const resp = await rawRequest(
`SELECT max(mark) as maxmark FROM ${DATABASE_NAME()}._alert_view_${fp}_mark WHERE id = ${id} FORMAT JSON`,
null, DATABASE_NAME()
)
if (!resp.data.data[0]) {
return 0
Expand Down Expand Up @@ -100,11 +101,12 @@ module.exports.getAlertRules = async (limit, offset) => {
const _limit = limit ? `LIMIT ${limit}` : ''
const _offset = offset ? `OFFSET ${offset}` : ''
const mark = Math.random()
const res = await axios.post(getClickhouseUrl(),
const res = rawRequest(
'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' +
`FROM ${DATABASE_NAME()}.settings${dist} ` +
`WHERE type == 'alert_rule' AND ${mark} == ${mark} ` +
`GROUP BY fingerprint HAVING name != '' ORDER BY name ${_limit} ${_offset} FORMAT JSON`)
`GROUP BY fingerprint HAVING name != '' ORDER BY name ${_limit} ${_offset} FORMAT JSON`,
null, DATABASE_NAME())
return res.data.data.map(e => {
return { rule: JSON.parse(e.value), name: JSON.parse(e.name) }
})
Expand Down
10 changes: 3 additions & 7 deletions lib/db/throttler.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
const { isMainThread, parentPort } = require('worker_threads')
const axios = require('axios')
const { getClickhouseUrl, samplesTableName } = require('./clickhouse')
const { getClickhouseUrl, samplesTableName, rawRequest } = require('./clickhouse')
const clickhouseOptions = require('./clickhouse').databaseOptions
const logger = require('../logger')
const { DATABASE_NAME } = require('../utils')
const clusterName = require('../../common').clusterName
const dist = clusterName ? '_dist' : ''

Expand Down Expand Up @@ -49,12 +50,7 @@ class TimeoutThrottler {
}
const _queue = this.queue
this.queue = []
await axios.post(`${getClickhouseUrl()}/?query=${this.statement}`,
_queue.join('\n'),
{
maxBodyLength: Infinity
}
)
await rawRequest(this.statement, _queue.join('\n'), DATABASE_NAME(), { maxBodyLength: Infinity })
}

stop () {
Expand Down

0 comments on commit 17a76ed

Please sign in to comment.