From 6a6f6d681e2c7aa8dc84cbd46a8e7c59bbe73270 Mon Sep 17 00:00:00 2001 From: akvlad Date: Sun, 1 Oct 2023 16:02:23 +0300 Subject: [PATCH 01/10] #fix: init; insert; logql requests; support for cluster --- common.js | 2 + lib/db/clickhouse.js | 6 +- lib/db/maintain/index.js | 131 ++++++++++++------ lib/db/maintain/scripts.js | 121 ++++++++++++---- lib/db/throttler.js | 6 +- parser/registry/common.js | 13 ++ .../smart_optimizations/optimization_v3_2.js | 4 +- .../stream_selector_indexed_registry.js | 2 +- .../stream_selector_operator_registry.js | 10 +- parser/transpiler.js | 19 +-- 10 files changed, 225 insertions(+), 89 deletions(-) diff --git a/common.js b/common.js index ad13f486..acf354a2 100644 --- a/common.js +++ b/common.js @@ -123,3 +123,5 @@ module.exports.isCustomSamplesOrderingRule = () => { } module.exports.CORS = process.env.CORS_ALLOW_ORIGIN || '*' + +module.exports.clusterName = process.env.CLUSTER_NAME diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index d61d997d..d05915fd 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -12,6 +12,8 @@ const Zipkin = require('./zipkin') const Otlp = require('./otlp') const logfmt = require('logfmt') const csql = require('@cloki/clickhouse-sql') +const clusterName = require('../../common').clusterName +const dist = clusterName ? '_dist' : '' /* DB Helper */ const ClickHouse = require('@apla/clickhouse') @@ -1317,7 +1319,7 @@ const samplesReadTable = { settingsVersions: async function () { const versions = await axios.post(`${getClickhouseUrl()}/?database=${UTILS.DATABASE_NAME()}`, `SELECT argMax(name, inserted_at) as _name, argMax(value, inserted_at) as _value - FROM settings WHERE type == 'update' GROUP BY fingerprint HAVING _name != '' FORMAT JSON`) + FROM settings${dist} WHERE type == 'update' GROUP BY fingerprint HAVING _name != '' FORMAT JSON`) for (const version of versions.data.data) { this.versions[version._name] = parseInt(version._value) * 1000 } @@ -1353,7 +1355,7 @@ const getSettings = async (names, database) => { 'short-hash')) const settings = await rawRequest(`SELECT argMax(name, inserted_at) as _name, argMax(value, inserted_at) as _value - FROM settings WHERE fingerprint IN (${fps.join(',')}) GROUP BY fingerprint HAVING _name != '' FORMAT JSON`, + FROM settings${dist} WHERE fingerprint IN (${fps.join(',')}) GROUP BY fingerprint HAVING _name != '' FORMAT JSON`, null, database) return settings.data.data.reduce((sum, cur) => { sum[cur._name] = cur._value diff --git a/lib/db/maintain/index.js b/lib/db/maintain/index.js index 020916d9..f1689154 100644 --- a/lib/db/maintain/index.js +++ b/lib/db/maintain/index.js @@ -1,7 +1,8 @@ const hb = require('handlebars') const client = require('../clickhouse') const logger = require('../../logger') -const {samplesOrderingRule} = require('../../../common') +const {samplesOrderingRule, clusterName} = require('../../../common') +const scripts = require('./scripts') const getEnv = () => { return { CLICKHOUSE_DB: 'cloki', @@ -20,7 +21,13 @@ module.exports.upgrade = async (db) => { const scripts = require('./scripts') await upgradeSingle(db, 1, scripts.overall) await upgradeSingle(db, 2, scripts.traces) + if (clusterName) { + await upgradeSingle(db, 3, scripts.overall_dist) + await upgradeSingle(db, 4, scripts.traces_dist) + } } + +let isDBCreated = false /** * * @param db {string} @@ -28,21 +35,56 @@ module.exports.upgrade = async (db) => { * @param scripts {string[]} */ const upgradeSingle = async (db, key, scripts) => { - await client.rawRequest(`CREATE DATABASE IF NOT EXISTS ${db}`) - await client.rawRequest('CREATE TABLE IF NOT EXISTS ver (k UInt64, ver UInt64) ' + - 'ENGINE=ReplacingMergeTree(ver) ORDER BY k', null, db) + const _upgradeRequest = (request, useDefaultDB, updateVer) => { + return upgradeRequest(db, request, useDefaultDB, updateVer) + } + if (!isDBCreated) { + isDBCreated = true + await _upgradeRequest('CREATE DATABASE IF NOT EXISTS {{DB}} {{{OnCluster}}}') + if (clusterName) { + await _upgradeRequest('CREATE TABLE IF NOT EXISTS {{DB}}._ver {{{OnCluster}}} (k UInt64, ver UInt64) ' + + 'ENGINE={{ReplacingMergeTree}}(ver) ORDER BY k', true) + await _upgradeRequest('CREATE TABLE IF NOT EXISTS {{DB}}.ver {{{OnCluster}}} (k UInt64, ver UInt64) ' + + 'ENGINE=Distributed(\'{{CLUSTER}}\',\'{{DB}}\', \'_ver\', rand())', true) + } else { + await _upgradeRequest('CREATE TABLE IF NOT EXISTS {{DB}}.ver {{{OnCluster}}} (k UInt64, ver UInt64) ' + + 'ENGINE={{ReplacingMergeTree}}(ver) ORDER BY k', true) + } + } let ver = await client.rawRequest(`SELECT max(ver) as ver FROM ver WHERE k = ${key} FORMAT JSON`, null, db) ver = ver.data.data && ver.data.data[0] && ver.data.data[0].ver ? ver.data.data[0].ver : 0 for (let i = parseInt(ver); i < scripts.length; ++i) { if (!scripts[i]) { continue } scripts[i] = scripts[i].trim() - const tpl = hb.compile(scripts[i]) - scripts[i] = tpl({ ...getEnv(), DB: db, SAMPLES_ORDER_RUL: samplesOrderingRule() }) - logger.info(`v${i} -> v${i + 1}`) - logger.debug(scripts[i]) - await client.rawRequest(scripts[i], null, db) - await client.rawRequest(`INSERT INTO ver (k, ver) VALUES (${key}, ${i + 1})`, null, db) + await _upgradeRequest(scripts[i], true, { key: key, to: i + 1 }) + } +} + +/** + * @param db {string} database to update + * @param request {string} request tpl + * @param useDefaultDB {boolean} use db as default db + * @param updateVer {{key: number, to: number}} update ver table + * @returns {Promise} + */ +const upgradeRequest = async (db, request, useDefaultDB, updateVer) => { + const tpl = hb.compile(request) + request = tpl({ + ...getEnv(), + DB: db, + CLUSTER: clusterName || '', + SAMPLES_ORDER_RUL: samplesOrderingRule(), + OnCluster: clusterName ? `ON CLUSTER \`${clusterName}\`` : '', + MergeTree: clusterName ? 'ReplicatedMergeTree' : 'MergeTree', + ReplacingMergeTree: clusterName ? 'ReplicatedReplacingMergeTree' : 'ReplacingMergeTree', + AggregatingMergeTree: clusterName ? 'ReplicatedAggregatingMergeTree' : 'AggregatingMergeTree' + }) + console.log(request) + await client.rawRequest(request, null, useDefaultDB ? db : undefined) + if (updateVer) { + console.log(`INSERT INTO ver (k, ver) VALUES (${updateVer.key}, ${updateVer.to})`) + await client.rawRequest(`INSERT INTO ver (k, ver) VALUES (${updateVer.key}, ${updateVer.to})`, null, db) } } @@ -59,47 +101,50 @@ module.exports.rotate = async (opts) => { { type: 'rotate', name: 'v1_traces_days' }, { type: 'rotate', name: 'v1_traces_storage_policy' } ], db.db) + const _update = (req) => { + return upgradeRequest(db.db, req, true) + } if (db.samples_days + '' !== settings.v3_samples_days) { - const alterTable = 'ALTER TABLE samples_v3 MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - const rotateTable = `ALTER TABLE samples_v3 MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` - await client.rawRequest(alterTable, null, db.db) - await client.rawRequest(rotateTable, null, db.db) + const alterTable = 'ALTER TABLE samples_v3 {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + const rotateTable = `ALTER TABLE samples_v3 {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` + await _update(alterTable, null, db.db) + await _update(rotateTable, null, db.db) await client.addSetting('rotate', 'v3_samples_days', db.samples_days + '', db.db) } if (db.time_series_days + '' !== settings.v3_time_series_days) { - const alterTable = 'ALTER TABLE time_series MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - const rotateTable = `ALTER TABLE time_series MODIFY TTL "date" + INTERVAL ${db.time_series_days} DAY` - await client.rawRequest(alterTable, null, db.db) - await client.rawRequest(rotateTable, null, db.db) - const alterView = 'ALTER TABLE time_series_gin MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - const rotateView = `ALTER TABLE time_series_gin MODIFY TTL "date" + INTERVAL ${db.time_series_days} DAY` - await client.rawRequest(alterView, null, db.db) - await client.rawRequest(rotateView, null, db.db) + const alterTable = 'ALTER TABLE time_series {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + const rotateTable = `ALTER TABLE time_series {{{OnCluster}}} MODIFY TTL "date" + INTERVAL ${db.time_series_days} DAY` + await _update(alterTable, null, db.db) + await _update(rotateTable, null, db.db) + const alterView = 'ALTER TABLE time_series_gin {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + const rotateView = `ALTER TABLE time_series_gin {{{OnCluster}}} MODIFY TTL "date" + INTERVAL ${db.time_series_days} DAY` + await _update(alterView, null, db.db) + await _update(rotateView, null, db.db) await client.addSetting('rotate', 'v3_time_series_days', db.time_series_days + '', db.db) } if (db.storage_policy && db.storage_policy !== settings.v3_storage_policy) { logger.debug(`Altering storage policy: ${db.storage_policy}`) - const alterTs = `ALTER TABLE time_series MODIFY SETTING storage_policy='${db.storage_policy}'` - const alterTsVw = `ALTER TABLE time_series_gin MODIFY SETTING storage_policy='${db.storage_policy}'` - const alterSm = `ALTER TABLE samples_v3 MODIFY SETTING storage_policy='${db.storage_policy}'` - await client.rawRequest(alterTs, null, db.db) - await client.rawRequest(alterTsVw, null, db.db) - await client.rawRequest(alterSm, null, db.db) + const alterTs = `ALTER TABLE time_series {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` + const alterTsVw = `ALTER TABLE time_series_gin {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` + const alterSm = `ALTER TABLE samples_v3 {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` + await _update(alterTs, null, db.db) + await _update(alterTsVw, null, db.db) + await _update(alterSm, null, db.db) await client.addSetting('rotate', 'v3_storage_policy', db.storage_policy, db.db) } if (db.samples_days + '' !== settings.v1_traces_days) { - let alterTable = 'ALTER TABLE tempo_traces MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - let rotateTable = `ALTER TABLE tempo_traces MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` - await client.rawRequest(alterTable, null, db.db) - await client.rawRequest(rotateTable, null, db.db) - alterTable = 'ALTER TABLE tempo_traces_attrs_gin MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - rotateTable = `ALTER TABLE tempo_traces_attrs_gin MODIFY TTL date + INTERVAL ${db.samples_days} DAY` - await client.rawRequest(alterTable, null, db.db) - await client.rawRequest(rotateTable, null, db.db) - alterTable = 'ALTER TABLE tempo_traces_kv MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - rotateTable = `ALTER TABLE tempo_traces_kv MODIFY TTL date + INTERVAL ${db.samples_days} DAY` - await client.rawRequest(alterTable, null, db.db) - await client.rawRequest(rotateTable, null, db.db) + let alterTable = 'ALTER TABLE tempo_traces {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + let rotateTable = `ALTER TABLE tempo_traces {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` + await _update(alterTable, null, db.db) + await _update(rotateTable, null, db.db) + alterTable = 'ALTER TABLE tempo_traces_attrs_gin {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE tempo_traces_attrs_gin {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + await _update(alterTable, null, db.db) + await _update(rotateTable, null, db.db) + alterTable = 'ALTER TABLE tempo_traces_kv {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE tempo_traces_kv {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + await _update(alterTable, null, db.db) + await _update(rotateTable, null, db.db) await client.addSetting('rotate', 'v1_traces_days', db.samples_days + '', db.db) } if (db.storage_policy && db.storage_policy !== settings.v1_traces_storage_policy) { @@ -107,9 +152,9 @@ module.exports.rotate = async (opts) => { const alterTs = `ALTER TABLE tempo_traces MODIFY SETTING storage_policy='${db.storage_policy}'` const alterTsVw = `ALTER TABLE tempo_traces_attrs_gin MODIFY SETTING storage_policy='${db.storage_policy}'` const alterSm = `ALTER TABLE tempo_traces_kv MODIFY SETTING storage_policy='${db.storage_policy}'` - await client.rawRequest(alterTs, null, db.db) - await client.rawRequest(alterTsVw, null, db.db) - await client.rawRequest(alterSm, null, db.db) + await _update(alterTs, null, db.db) + await _update(alterTsVw, null, db.db) + await _update(alterSm, null, db.db) await client.addSetting('rotate', 'v1_traces_storage_policy', db.storage_policy, db.db) } } diff --git a/lib/db/maintain/scripts.js b/lib/db/maintain/scripts.js index a09949da..9492da3d 100644 --- a/lib/db/maintain/scripts.js +++ b/lib/db/maintain/scripts.js @@ -1,49 +1,49 @@ module.exports.overall = [ - `CREATE TABLE IF NOT EXISTS time_series (date Date,fingerprint UInt64,labels String, name String) - ENGINE = ReplacingMergeTree(date) PARTITION BY date ORDER BY fingerprint`, + `CREATE TABLE IF NOT EXISTS {{DB}}.time_series {{{OnCluster}}} (date Date,fingerprint UInt64,labels String, name String) + ENGINE = {{ReplacingMergeTree}}(date) PARTITION BY date ORDER BY fingerprint`, - `CREATE TABLE IF NOT EXISTS samples_v3 + `CREATE TABLE IF NOT EXISTS {{DB}}.samples_v3 {{{OnCluster}}} ( fingerprint UInt64, timestamp_ns Int64 CODEC(DoubleDelta), value Float64 CODEC(Gorilla), string String - ) ENGINE = MergeTree + ) ENGINE = {{MergeTree}} PARTITION BY toStartOfDay(toDateTime(timestamp_ns / 1000000000)) ORDER BY ({{SAMPLES_ORDER_RUL}})`, - `CREATE TABLE IF NOT EXISTS settings + `CREATE TABLE IF NOT EXISTS {{DB}}.settings {{{OnCluster}}} (fingerprint UInt64, type String, name String, value String, inserted_at DateTime64(9, 'UTC')) - ENGINE = ReplacingMergeTree(inserted_at) ORDER BY fingerprint`, + ENGINE = {{ReplacingMergeTree}}(inserted_at) ORDER BY fingerprint`, - 'DROP TABLE IF EXISTS samples_read', + 'DROP TABLE IF EXISTS {{DB}}.samples_read {{{OnCluster}}}', - `CREATE TABLE IF NOT EXISTS samples_read + `CREATE TABLE IF NOT EXISTS {{DB}}.samples_read {{{OnCluster}}} (fingerprint UInt64,timestamp_ms Int64,value Float64,string String) ENGINE=Merge('{{DB}}', '^(samples|samples_v2)$')`, - `CREATE VIEW IF NOT EXISTS samples_read_v2_1 AS + `CREATE VIEW IF NOT EXISTS {{DB}}.samples_read_v2_1 {{{OnCluster}}} AS SELECT fingerprint, timestamp_ms * 1000000 as timestamp_ns, value, string FROM samples_read`, - `CREATE TABLE IF NOT EXISTS samples_read_v2_2 + `CREATE TABLE IF NOT EXISTS {{DB}}.samples_read_v2_2 {{{OnCluster}}} (fingerprint UInt64,timestamp_ns Int64,value Float64,string String) ENGINE=Merge('{{DB}}', '^(samples_read_v2_1|samples_v3)$')`, - `CREATE TABLE IF NOT EXISTS time_series_gin ( + `CREATE TABLE IF NOT EXISTS {{DB}}.time_series_gin {{{OnCluster}}} ( date Date, key String, val String, fingerprint UInt64 - ) ENGINE = ReplacingMergeTree() PARTITION BY date ORDER BY (key, val, fingerprint)`, + ) ENGINE = {{ReplacingMergeTree}}() PARTITION BY date ORDER BY (key, val, fingerprint)`, - `CREATE MATERIALIZED VIEW IF NOT EXISTS time_series_gin_view TO time_series_gin + `CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.time_series_gin_view {{{OnCluster}}} TO time_series_gin AS SELECT date, pairs.1 as key, pairs.2 as val, fingerprint FROM time_series ARRAY JOIN JSONExtractKeysAndValues(time_series.labels, 'String') as pairs`, `INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_5'), 'update', 'v3_1', toString(toUnixTimestamp(NOW())), NOW())`, - `CREATE TABLE IF NOT EXISTS metrics_15s ( + `CREATE TABLE IF NOT EXISTS {{DB}}.metrics_15s {{{OnCluster}}} ( fingerprint UInt64, timestamp_ns Int64 CODEC(DoubleDelta), last AggregateFunction(argMax, Float64, Int64), @@ -52,11 +52,11 @@ module.exports.overall = [ count AggregateFunction(count), sum SimpleAggregateFunction(sum, Float64), bytes SimpleAggregateFunction(sum, Float64) -) ENGINE = AggregatingMergeTree +) ENGINE = {{AggregatingMergeTree}} PARTITION BY toDate(toDateTime(intDiv(timestamp_ns, 1000000000))) ORDER BY (fingerprint, timestamp_ns);`, - `CREATE MATERIALIZED VIEW IF NOT EXISTS metrics_15s_mv TO metrics_15s AS + `CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.metrics_15s_mv {{{OnCluster}}} TO metrics_15s AS SELECT fingerprint, intDiv(samples.timestamp_ns, 15000000000) * 15000000000 as timestamp_ns, argMaxState(value, samples.timestamp_ns) as last, @@ -73,7 +73,7 @@ GROUP BY fingerprint, timestamp_ns;`, ] module.exports.traces = [ - `CREATE TABLE IF NOT EXISTS tempo_traces ( + `CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces {{{OnCluster}}} ( oid String DEFAULT '0', trace_id FixedString(16), span_id FixedString(8), @@ -84,10 +84,10 @@ module.exports.traces = [ service_name String, payload_type Int8, payload String - ) Engine MergeTree() ORDER BY (oid, trace_id, timestamp_ns) + ) Engine = {{MergeTree}}() ORDER BY (oid, trace_id, timestamp_ns) PARTITION BY (oid, toDate(FROM_UNIXTIME(intDiv(timestamp_ns, 1000000000))));`, - `CREATE TABLE IF NOT EXISTS tempo_traces_attrs_gin ( + `CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_attrs_gin {{{OnCluster}}} ( oid String, date Date, key String, @@ -96,24 +96,24 @@ module.exports.traces = [ span_id FixedString(8), timestamp_ns Int64, duration Int64 - ) Engine = ReplacingMergeTree() + ) Engine = {{ReplacingMergeTree}}() PARTITION BY date ORDER BY (oid, date, key, val, timestamp_ns, trace_id, span_id);`, - `CREATE TABLE IF NOT EXISTS tempo_traces_kv ( + `CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_kv {{{OnCluster}}} ( oid String, date Date, key String, val_id UInt64, val String - ) Engine = ReplacingMergeTree() + ) Engine = {{ReplacingMergeTree}}() PARTITION BY (oid, date) ORDER BY (oid, date, key, val_id)`, - `CREATE MATERIALIZED VIEW IF NOT EXISTS tempo_traces_kv_mv TO tempo_traces_kv AS + `CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.tempo_traces_kv_mv {{{OnCluster}}} TO tempo_traces_kv AS SELECT oid, date, key, cityHash64(val) % 10000 as val_id, val FROM tempo_traces_attrs_gin`, - `CREATE TABLE IF NOT EXISTS traces_input ( + `CREATE TABLE IF NOT EXISTS {{DB}}.traces_input {{{OnCluster}}} ( oid String DEFAULT '0', trace_id String, span_id String, @@ -127,7 +127,7 @@ module.exports.traces = [ tags Array(Tuple(String, String)) ) Engine=Null`, - `CREATE MATERIALIZED VIEW IF NOT EXISTS traces_input_traces_mv TO tempo_traces AS + `CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.traces_input_traces_mv {{{OnCluster}}} TO tempo_traces AS SELECT oid, unhex(trace_id)::FixedString(16) as trace_id, unhex(span_id)::FixedString(8) as span_id, @@ -140,7 +140,7 @@ module.exports.traces = [ payload FROM traces_input`, - `CREATE MATERIALIZED VIEW IF NOT EXISTS traces_input_tags_mv TO tempo_traces_attrs_gin AS + `CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.traces_input_tags_mv {{{OnCluster}}} TO tempo_traces_attrs_gin AS SELECT oid, toDate(intDiv(timestamp_ns, 1000000000)) as date, tags.1 as key, @@ -154,3 +154,72 @@ module.exports.traces = [ `INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('tempo_traces_v1'), 'update', 'tempo_traces_v2', toString(toUnixTimestamp(NOW())), NOW())` ] + +module.exports.overall_dist = [ + `CREATE TABLE {{DB}}.metrics_15s_dist {{{OnCluster}}} ( + \`fingerprint\` UInt64, + \`timestamp_ns\` Int64 CODEC(DoubleDelta), + \`last\` AggregateFunction(argMax, Float64, Int64), + \`max\` SimpleAggregateFunction(max, Float64), + \`min\` SimpleAggregateFunction(min, Float64), + \`count\` AggregateFunction(count), + \`sum\` SimpleAggregateFunction(sum, Float64), + \`bytes\` SimpleAggregateFunction(sum, Float64) +) ENGINE = Distributed('{{CLUSTER}}', '{{DB}}', 'metrics_15s', fingerprint)`, + + `CREATE TABLE IF NOT EXISTS {{DB}}.samples_v3_dist {{{OnCluster}}} ( + \`fingerprint\` UInt64, + \`timestamp_ns\` Int64 CODEC(DoubleDelta), + \`value\` Float64 CODEC(Gorilla), + \`string\` String +) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'samples_v3', fingerprint)`, + + `CREATE TABLE IF NOT EXISTS {{DB}}.time_series_dist {{{OnCluster}}} ( + \`date\` Date, + \`fingerprint\` UInt64, + \`labels\` String, + \`name\` String +) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series', fingerprint);`, + + `CREATE TABLE IF NOT EXISTS {{DB}}.settings_dist {{{OnCluster}}} ( + \`fingerprint\` UInt64, + \`type\` String, + \`name\` String, + \`value\` String, + \`inserted_at\` DateTime64(9, 'UTC') +) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'settings', rand());` +] + +module.exports.traces_dist = [ + `CREATE TABLE IF NOT EXISTS tempo_traces_kv_dist {{{OnCluster}}} ( + oid String, + date Date, + key String, + val_id String, + val String +) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_kv', sipHash64(oid, key));`, + + `CREATE TABLE IF NOT EXISTS tempo_traces_dist {{{OnCluster}}} ( + oid String, + trace_id FixedString(16), + span_id FixedString(8), + parent_id String, + name String, + timestamp_ns Int64 CODEC(DoubleDelta), + duration_ns Int64, + service_name String, + payload_type Int8, + payload String, +) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces', sipHash64(oid, trace_id));`, + + `CREATE TABLE IF NOT EXISTS tempo_traces_attrs_gin_dist {{{OnCluster}}} ( + oid String, + date Date, + key String, + val String, + trace_id FixedString(16), + span_id FixedString(8), + timestamp_ns Int64, + duration Int64 +) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_attrs_gin', sipHash64(oid, trace_id));` +] diff --git a/lib/db/throttler.js b/lib/db/throttler.js index dce7dbf5..21a3250c 100644 --- a/lib/db/throttler.js +++ b/lib/db/throttler.js @@ -3,6 +3,8 @@ const axios = require('axios') const { getClickhouseUrl, samplesTableName } = require('./clickhouse') const clickhouseOptions = require('./clickhouse').databaseOptions const logger = require('../logger') +const clusterName = require('../../common').clusterName +const dist = clusterName ? '_dist' : '' const axiosError = async (err) => { try { @@ -61,9 +63,9 @@ class TimeoutThrottler { } const samplesThrottler = new TimeoutThrottler( - `INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}(fingerprint, timestamp_ns, value, string) FORMAT JSONEachRow`) + `INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string) FORMAT JSONEachRow`) const timeSeriesThrottler = new TimeoutThrottler( - `INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series(date, fingerprint, labels, name) FORMAT JSONEachRow`) + `INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name) FORMAT JSONEachRow`) const tracesThottler = new TimeoutThrottler( `INSERT INTO ${clickhouseOptions.queryOptions.database}.traces_input (trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags) diff --git a/parser/registry/common.js b/parser/registry/common.js index 5a2b7bec..a028a106 100644 --- a/parser/registry/common.js +++ b/parser/registry/common.js @@ -1,6 +1,8 @@ const { hashLabels, parseLabels } = require('../../common') const { getPlg } = require('../../plugins/engine') const Sql = require('@cloki/clickhouse-sql') +const clusterName = require('../../common').clusterName +module.exports.dist = clusterName ? '_dist' : '' /** * @param query {registry_types.Request | string[]} @@ -385,3 +387,14 @@ module.exports.sharedParamNames = { * @returns {number} */ module.exports.durationToMs = require('../../common').durationToMs + +module.exports.Aliased = class { + constructor (name, alias) { + this.name = name + this.alias = alias + } + + toString () { + return `${Sql.quoteTerm(this.name)} AS ${this.alias}` + } +} diff --git a/parser/registry/smart_optimizations/optimization_v3_2.js b/parser/registry/smart_optimizations/optimization_v3_2.js index 1c69d1f2..e44670e2 100644 --- a/parser/registry/smart_optimizations/optimization_v3_2.js +++ b/parser/registry/smart_optimizations/optimization_v3_2.js @@ -1,4 +1,4 @@ -const { getDuration } = require('../common') +const { getDuration, dist, Aliased } = require('../common') const reg = require('./log_range_agg_reg_v3_2') const Sql = require('@cloki/clickhouse-sql') const { DATABASE_NAME, checkVersion } = require('../../../lib/utils') @@ -48,7 +48,7 @@ module.exports.apply = (token, fromNS, toNS, stepNS) => { .select(['samples.fingerprint', 'fingerprint']) .from([`${DATABASE_NAME()}.metrics_15s`, 'samples']) .where(tsClause) - q.join(`${DATABASE_NAME()}.time_series`, 'left any', + q.join(new Aliased(`${DATABASE_NAME()}.time_series${dist}`, 'time_series'), 'left any', Sql.Eq('samples.fingerprint', new Sql.Raw('time_series.fingerprint'))) q.select([new Sql.Raw('any(JSONExtractKeysAndValues(time_series.labels, \'String\'))'), 'labels']) diff --git a/parser/registry/stream_selector_operator_registry/stream_selector_indexed_registry.js b/parser/registry/stream_selector_operator_registry/stream_selector_indexed_registry.js index 0425ee62..8d8e1a51 100644 --- a/parser/registry/stream_selector_operator_registry/stream_selector_indexed_registry.js +++ b/parser/registry/stream_selector_operator_registry/stream_selector_indexed_registry.js @@ -58,7 +58,7 @@ module.exports.indexedAnd = (query, subquery) => { } idxSel = new Sql.With('idx_sel', (new Sql.Select()) .select(`${id}.fingerprint`) - .from([subquery, id])) + .from([subquery, id]), query.ctx.inline) return query.with(idxSel) .where(new InSubreq('samples.fingerprint', new Sql.WithReference(idxSel))) } diff --git a/parser/registry/stream_selector_operator_registry/stream_selector_operator_registry.js b/parser/registry/stream_selector_operator_registry/stream_selector_operator_registry.js index 7aa0cc98..c2afb1ab 100644 --- a/parser/registry/stream_selector_operator_registry/stream_selector_operator_registry.js +++ b/parser/registry/stream_selector_operator_registry/stream_selector_operator_registry.js @@ -1,4 +1,4 @@ -const { isEOF } = require('../common') +const { isEOF, sharedParamNames } = require('../common') const { labelAndVal } = require('./common') const Sql = require('@cloki/clickhouse-sql') /** @@ -40,17 +40,17 @@ function simpleSelectorClauses (regex, eq, label, value) { * @returns {With} */ const streamSelectQuery = (query) => { - const param = query.getParam('timeSeriesTable') || new Sql.Parameter('timeSeriesTable') + const param = query.getParam(sharedParamNames.timeSeriesTable) || + new Sql.Parameter(sharedParamNames.timeSeriesTable) query.addParam(param) const res = new Sql.With( 'str_sel', (new Sql.Select()) .select('fingerprint') .distinct(true) - .from(param) - ) + .from(param), query.ctx.inline) if (query.with() && query.with().idx_sel) { - res.query = res.query.where(new Sql.Raw('fingerprint IN idx_sel')) + res.query = res.query.where(new Sql.In('fingerprint', 'in', new Sql.WithReference(query.with().idx_sel))) } return res } diff --git a/parser/transpiler.js b/parser/transpiler.js index 95a542c8..0ac2975d 100644 --- a/parser/transpiler.js +++ b/parser/transpiler.js @@ -10,7 +10,7 @@ const lineFormat = require('./registry/line_format') const parserRegistry = require('./registry/parser_registry') const unwrap = require('./registry/unwrap') const unwrapRegistry = require('./registry/unwrap_registry') -const { durationToMs, sharedParamNames, getStream } = require('./registry/common') +const { durationToMs, sharedParamNames, getStream, Aliased } = require('./registry/common') const compiler = require('./bnf') const { parseMs, @@ -26,6 +26,8 @@ const { simpleAnd } = require('./registry/stream_selector_operator_registry/stre const logger = require('../lib/logger') const { QrynBadRequest } = require('../lib/handlers/errors') const optimizations = require('./registry/smart_optimizations') +const clusterName = require('../common').clusterName +const dist = clusterName ? '_dist' : '' /** * @param joinLabels {boolean} @@ -68,7 +70,7 @@ module.exports.initQuery = (joinLabels) => { .addParam(limit) .addParam(matrix) if (joinLabels) { - q.join(`${DATABASE_NAME()}.time_series`, 'left any', + q.join(new Aliased(`${DATABASE_NAME()}.time_series`, 'time_series'), 'left any', Sql.Eq('samples.fingerprint', new Sql.Raw('time_series.fingerprint'))) q.select([new Sql.Raw('JSONExtractKeysAndValues(time_series.labels, \'String\')'), 'labels']) } @@ -96,7 +98,7 @@ module.exports.initQueryV3_2 = (joinLabels) => { .addParam(from) .addParam(to) if (joinLabels) { - q.join(`${DATABASE_NAME()}.time_series`, 'left any', + q.join(new Aliased(`${DATABASE_NAME()}.time_series${dist}`, 'time_series'), 'left any', Sql.Eq('samples.fingerprint', new Sql.Raw('time_series.fingerprint'))) q.select([new Sql.Raw('JSONExtractKeysAndValues(time_series.labels, \'String\')'), 'labels']) } @@ -161,7 +163,8 @@ module.exports.transpile = (request) => { query.ctx = { step: step, legacy: !checkVersion('v3_1', start), - joinLabels: joinLabels + joinLabels: joinLabels, + inline: clusterName !== '' } let duration = null const matrixOp = [ @@ -203,7 +206,7 @@ module.exports.transpile = (request) => { .orderBy(['labels', order], ['timestamp_ns', order]) setQueryParam(query, sharedParamNames.limit, limit) if (!joinLabels) { - query.join(`${DATABASE_NAME()}.time_series`, 'left any', + query.join(new Aliased(`${DATABASE_NAME()}.time_series${dist}`, 'time_series'), 'left any', Sql.Eq('sel_a.fingerprint', new Sql.Raw('time_series.fingerprint'))) query.select([new Sql.Raw('JSONExtractKeysAndValues(time_series.labels, \'String\')'), 'labels'], new Sql.Raw('sel_a.*')) @@ -218,7 +221,7 @@ module.exports.transpile = (request) => { query = parameterizedAggregationRegistry[op](token.Child('parameterized_expression'), query) } setQueryParam(query, sharedParamNames.timeSeriesTable, `${DATABASE_NAME()}.time_series`) - setQueryParam(query, sharedParamNames.samplesTable, `${DATABASE_NAME()}.${readTable}`) + setQueryParam(query, sharedParamNames.samplesTable, `${DATABASE_NAME()}.${readTable}${dist}`) setQueryParam(query, sharedParamNames.from, start + '000000') setQueryParam(query, sharedParamNames.to, end + '000000') setQueryParam(query, 'isMatrix', query.ctx.matrix) @@ -344,8 +347,8 @@ module.exports.transpileSeries = (request) => { const _query = getQuery(req) query.withs.idx_sel.query.sqls.push(_query.withs.idx_sel.query) } - setQueryParam(query, sharedParamNames.timeSeriesTable, `${DATABASE_NAME()}.time_series`) - setQueryParam(query, sharedParamNames.samplesTable, `${DATABASE_NAME()}.${samplesReadTableName()}`) + setQueryParam(query, sharedParamNames.timeSeriesTable, `${DATABASE_NAME()}.time_series${dist}`) + setQueryParam(query, sharedParamNames.samplesTable, `${DATABASE_NAME()}.${samplesReadTableName()}${dist}`) // logger.debug(query.toString()) return query.toString() } From e1b81d106b62a5dd38d87587ecd157afcbb43263 Mon Sep 17 00:00:00 2001 From: akvlad Date: Sun, 1 Oct 2023 18:35:51 +0300 Subject: [PATCH 02/10] #fix: alerting for cluster; e2e tests --- lib/db/alerting/alertWatcher/index.js | 3 +- lib/db/clickhouse.js | 8 ++--- lib/db/clickhouse_alerting.js | 46 +++++++++++++++++---------- lib/db/maintain/scripts.js | 9 +++++- lib/handlers/label.js | 5 ++- lib/handlers/label_values.js | 4 ++- parser/transpiler.js | 2 +- 7 files changed, 51 insertions(+), 26 deletions(-) diff --git a/lib/db/alerting/alertWatcher/index.js b/lib/db/alerting/alertWatcher/index.js index 1fbd2ff7..7f30c9f9 100644 --- a/lib/db/alerting/alertWatcher/index.js +++ b/lib/db/alerting/alertWatcher/index.js @@ -4,6 +4,7 @@ const CallbackTimeSeriesAlertWatcher = require('./callbackTimeSeriesAlertWatcher const CallbackCliqlAlertWatcher = require('./callbackCliqlAlertWatcher') const MVAlertWatcher = require('./MVAlertWatcher') const { parseCliQL } = require('../../../cliql') +const {clusterName} = require('../../../../common') /** * @param nsName {string} * @param group {alerting.group | alerting.objGroup} @@ -24,7 +25,7 @@ module.exports = (nsName, group, rule) => { if (q.matrix) { return new CallbackTimeSeriesAlertWatcher(nsName, group, rule) } - if (q.stream && q.stream.length) { + if ((q.stream && q.stream.length) || clusterName) { return new CallbackLogAlertWatcher(nsName, group, rule) } return new MVAlertWatcher(nsName, group, rule) diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index d05915fd..6e7ab8bf 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -276,7 +276,7 @@ const tempoQueryScan = async function (query, res, traceId) { const tempoQueryScanV2 = async function (query, res, traceId) { logger.debug(`Scanning Tempo Fingerprints... ${traceId}`) const _stream = await axios.post(getClickhouseUrl() + '/', - `SELECT payload_type, payload FROM ${DATABASE_NAME()}.tempo_traces WHERE oid='0' AND trace_id=unhex('${traceId}') ORDER BY timestamp_ns ASC LIMIT 2000 FORMAT JSONEachRow`, + `SELECT payload_type, payload FROM ${DATABASE_NAME()}.tempo_traces${dist} WHERE oid='0' AND trace_id=unhex('${traceId}') ORDER BY timestamp_ns ASC LIMIT 2000 FORMAT JSONEachRow`, { responseType: 'stream' } @@ -422,7 +422,7 @@ const queryTempoScanV2 = async function (query) { const select = `SELECT hex(trace_id) as traceID, service_name as rootServiceName, name as rootTraceName, timestamp_ns as startTimeUnixNano, intDiv(duration_ns, 1000000) as durationMs` - const from = `FROM ${DATABASE_NAME()}.tempo_traces` + const from = `FROM ${DATABASE_NAME()}.tempo_traces${dist}` const where = [ 'oid = \'0\'', `timestamp_ns >= ${parseInt(query.start)} AND timestamp_ns <= ${parseInt(query.end)}`, @@ -461,7 +461,7 @@ const queryTempoScanV2 = async function (query) { async function queryTempoTags () { const q = `SELECT distinct key - FROM ${DATABASE_NAME()}.tempo_traces_kv + FROM ${DATABASE_NAME()}.tempo_traces_kv${dist} WHERE oid='0' AND date >= toDate(NOW()) - interval '1 day' FORMAT JSON` const resp = await axios.post(getClickhouseUrl() + '/',q) @@ -475,7 +475,7 @@ async function queryTempoTags () { */ async function queryTempoValues (tag) { const q = `SELECT distinct val - FROM ${DATABASE_NAME()}.tempo_traces_kv + FROM ${DATABASE_NAME()}.tempo_traces_kv${dist} WHERE oid='0' AND date >= toDate(NOW()) - interval '1 day' AND key = ${csql.quoteVal(tag)} FORMAT JSON` const resp = await axios.post(getClickhouseUrl() + '/', q) diff --git a/lib/db/clickhouse_alerting.js b/lib/db/clickhouse_alerting.js index d33c58b9..7014b47f 100644 --- a/lib/db/clickhouse_alerting.js +++ b/lib/db/clickhouse_alerting.js @@ -7,6 +7,9 @@ const { DATABASE_NAME } = require('../utils') const UTILS = require('../utils') const { getClickhouseUrl } = require('./clickhouse') const Sql = require('@cloki/clickhouse-sql') +const { clusterName } = require('../../common') +const onCluster = clusterName ? `ON CLUSTER ${clusterName}` : '' +const dist = clusterName ? '_dist' : '' /** * @param ns {string} * @param group {string} @@ -18,7 +21,7 @@ module.exports.getAlertRule = async (ns, group, name) => { const mark = Math.random() const res = await axios.post(getClickhouseUrl(), 'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' + - `FROM ${DATABASE_NAME()}.settings ` + + `FROM ${DATABASE_NAME()}.settings${dist} ` + `WHERE fingerprint = ${fp} AND ${mark} == ${mark} ` + 'GROUP BY fingerprint ' + 'HAVING name != \'\' ' + @@ -47,7 +50,7 @@ module.exports.putAlertRule = async (namespace, group, rule) => { const groupFp = getGroupFp(namespace, group.name) const groupVal = JSON.stringify({ name: group.name, interval: group.interval }) await axios.post(getClickhouseUrl(), - `INSERT INTO ${DATABASE_NAME()}.settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow \n` + + `INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow \n` + JSON.stringify({ fingerprint: ruleFp, type: 'alert_rule', name: ruleName, value: JSON.stringify(ruleVal), inserted_at: Date.now() * 1000000 }) + '\n' + JSON.stringify({ fingerprint: groupFp, type: 'alert_group', name: groupName, value: groupVal, inserted_at: Date.now() * 1000000 }) ) @@ -99,7 +102,7 @@ module.exports.getAlertRules = async (limit, offset) => { const mark = Math.random() const res = await axios.post(getClickhouseUrl(), 'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' + - `FROM ${DATABASE_NAME()}.settings ` + + `FROM ${DATABASE_NAME()}.settings${dist} ` + `WHERE type == 'alert_rule' AND ${mark} == ${mark} ` + `GROUP BY fingerprint HAVING name != '' ORDER BY name ${_limit} ${_offset} FORMAT JSON`) return res.data.data.map(e => { @@ -119,7 +122,7 @@ module.exports.getAlertGroups = async (limit, offset) => { const mark = Math.random() const res = await axios.post(getClickhouseUrl(), 'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' + - `FROM ${DATABASE_NAME()}.settings ` + + `FROM ${DATABASE_NAME()}.settings${dist} ` + `WHERE type == 'alert_group' AND ${mark} == ${mark} ` + `GROUP BY fingerprint HAVING name != '' ORDER BY name ${_limit} ${_offset} FORMAT JSON`) return res.data.data.map(e => { @@ -134,7 +137,7 @@ module.exports.getAlertRulesCount = async () => { const mark = Math.random() const res = await axios.post(getClickhouseUrl(), 'SELECT COUNT(1) as count FROM (SELECT fingerprint ' + - `FROM ${DATABASE_NAME()}.settings ` + + `FROM ${DATABASE_NAME()}.settings${dist} ` + `WHERE type=\'alert_rule\' AND ${mark} == ${mark} ` + 'GROUP BY fingerprint ' + 'HAVING argMax(name, inserted_at) != \'\') FORMAT JSON') @@ -153,8 +156,8 @@ module.exports.deleteAlertRule = async (ns, group, rule) => { `INSERT INTO ${DATABASE_NAME()}.settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow\n` + JSON.stringify({ fingerprint: fp, type: 'alert_rule', name: '', value: '', inserted_at: Date.now() }) ) - await axios.post(getClickhouseUrl(), - `ALTER TABLE ${DATABASE_NAME()}.settings DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC')` + await axios.post(getClickhouseUrl() + '/?allow_nondeterministic_mutations=1&mutations_execute_nondeterministic_on_initiator=1', + `ALTER TABLE ${DATABASE_NAME()}.settings ${onCluster} DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC')` ) } @@ -166,11 +169,12 @@ module.exports.deleteAlertRule = async (ns, group, rule) => { module.exports.deleteGroup = async (ns, group) => { const fp = getGroupFp(ns, group) await axios.post(getClickhouseUrl(), - `INSERT INTO ${DATABASE_NAME()}.settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow\n` + + `INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow\n` + JSON.stringify({ fingerprint: fp, type: 'alert_group', name: '', value: '', inserted_at: Date.now() }) ) - await axios.post(getClickhouseUrl(), - `ALTER TABLE ${DATABASE_NAME()}.settings DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC')` + await axios.post(getClickhouseUrl() + '/?allow_nondeterministic_mutations=1&mutations_execute_nondeterministic_on_initiator=1', + `ALTER TABLE ${DATABASE_NAME()}.settings ${onCluster} DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC') +SETTINGS allow_nondeterministic_mutations=1` ) } @@ -183,9 +187,11 @@ module.exports.deleteGroup = async (ns, group) => { module.exports.dropAlertViews = async (ns, group, rule) => { const fp = getRuleFP(ns, group, rule) await axios.post(getClickhouseUrl(), - `DROP VIEW IF EXISTS ${DATABASE_NAME()}._alert_view_${fp}`) + `DROP VIEW IF EXISTS ${DATABASE_NAME()}._alert_view_${fp} ${onCluster}`) await axios.post(getClickhouseUrl(), - `DROP TABLE IF EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark`) + `DROP TABLE IF EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark ${onCluster}`) + await axios.post(getClickhouseUrl(), + `DROP TABLE IF EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark_dist ${onCluster}`) } /** @@ -197,9 +203,15 @@ module.exports.dropAlertViews = async (ns, group, rule) => { module.exports.createMarksTable = async (ns, group, rule) => { const fp = getRuleFP(ns, group, rule) await axios.post(getClickhouseUrl(), - `CREATE TABLE IF NOT EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark ` + + `CREATE TABLE IF NOT EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark ${onCluster}` + '(id UInt8 default 0,mark UInt64, inserted_at DateTime default now()) ' + - 'ENGINE ReplacingMergeTree(mark) ORDER BY id') + `ENGINE ${clusterName ? 'Replicated' : ''}ReplacingMergeTree(mark) ORDER BY id`) + if (clusterName) { + await axios.post(getClickhouseUrl(), + `CREATE TABLE IF NOT EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark_dist ${onCluster}` + + '(id UInt8 default 0,mark UInt64, inserted_at DateTime default now()) ' + + `ENGINE=Distributed('${clusterName}', '${DATABASE_NAME()}', '_alert_view_${fp}_mark', id)`) + } } /** @@ -235,7 +247,7 @@ module.exports.createAlertViews = async (ns, group, rule, request) => { module.exports.getLastMark = async (ns, group, rule) => { const fp = getRuleFP(ns, group, rule) const mark = await axios.post(getClickhouseUrl(), - `SELECT max(mark) as mark FROM ${DATABASE_NAME()}._alert_view_${fp}_mark WHERE id = 0 FORMAT JSON`) + `SELECT max(mark) as mark FROM ${DATABASE_NAME()}._alert_view_${fp}_mark${dist} WHERE id = 0 FORMAT JSON`) return parseInt(mark.data.data[0].mark) } @@ -253,7 +265,7 @@ module.exports.incAlertMark = async (ns, group, rule, newMark, id) => { newMark = newMark || Date.now() id = id || 0 await axios.post(getClickhouseUrl(), - `INSERT INTO ${DATABASE_NAME()}._alert_view_${fp}_mark (mark, id) VALUES (${newMark}, ${id})`) + `INSERT INTO ${DATABASE_NAME()}._alert_view_${fp}_mark${dist} (mark, id) VALUES (${newMark}, ${id})`) return [mark, newMark] } @@ -285,7 +297,7 @@ module.exports.getAlerts = async (ns, group, rule, mark) => { module.exports.dropOutdatedParts = async (ns, group, rule, mark) => { const fp = getRuleFP(ns, group, rule) const partitions = await axios.post(getClickhouseUrl(), - `SELECT DISTINCT mark FROM ${DATABASE_NAME()}._alert_view_${fp} WHERE mark <= ${mark} FORMAT JSON`) + `SELECT DISTINCT mark FROM ${DATABASE_NAME()}._alert_view_${fp}${dist} WHERE mark <= ${mark} FORMAT JSON`) if (!partitions.data || !partitions.data.data || !partitions.data.data.length) { return } diff --git a/lib/db/maintain/scripts.js b/lib/db/maintain/scripts.js index 9492da3d..08ad00a2 100644 --- a/lib/db/maintain/scripts.js +++ b/lib/db/maintain/scripts.js @@ -187,7 +187,14 @@ module.exports.overall_dist = [ \`name\` String, \`value\` String, \`inserted_at\` DateTime64(9, 'UTC') -) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'settings', rand());` +) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'settings', rand());`, + + `CREATE TABLE IF NOT EXISTS {{DB}}.time_series_gin_dist {{{OnCluster}}} ( + date Date, + key String, + val String, + fingerprint UInt64 + ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series_gin', rand());`, ] module.exports.traces_dist = [ diff --git a/lib/handlers/label.js b/lib/handlers/label.js index 231cc5dd..375c62ff 100644 --- a/lib/handlers/label.js +++ b/lib/handlers/label.js @@ -13,6 +13,8 @@ const clickhouse = require('../db/clickhouse') const utils = require('../utils') +const { clusterName } = require('../../common') +const dist = clusterName ? '_dist' : '' async function handler (req, res) { req.log.debug('GET /loki/api/v1/label') @@ -21,7 +23,8 @@ async function handler (req, res) { req.query.end && !isNaN(parseInt(req.query.end)) ? `date <= toDate(FROM_UNIXTIME(intDiv(${parseInt(req.query.end)}, 1000000000)))` : null ].filter(w => w) where = where.length ? `WHERE ${where.join(' AND ')}` : '' - const q = `SELECT DISTINCT key FROM time_series_gin ${where} FORMAT JSON` + const q = `SELECT DISTINCT key FROM time_series_gin${dist} ${where} FORMAT JSON` + console.log(q) const allLabels = await clickhouse.rawRequest(q, null, utils.DATABASE_NAME()) const resp = { status: 'success', data: allLabels.data.data.map(r => r.key) } return res.send(resp) diff --git a/lib/handlers/label_values.js b/lib/handlers/label_values.js index 49aa8754..b6f51965 100644 --- a/lib/handlers/label_values.js +++ b/lib/handlers/label_values.js @@ -14,6 +14,8 @@ const clickhouse = require('../db/clickhouse') const Sql = require('@cloki/clickhouse-sql') const utils = require('../utils') +const { clusterName } = require('../../common') +const dist = clusterName ? '_dist' : '' async function handler (req, res) { req.log.debug(`GET /api/prom/label/${req.params.name}/values`) @@ -23,7 +25,7 @@ async function handler (req, res) { req.query.end && !isNaN(parseInt(req.query.end)) ? `date <= toDate(FROM_UNIXTIME(intDiv(${parseInt(req.query.end)}, 1000000000)))` : null ].filter(w => w) where = `WHERE ${where.join(' AND ')}` - const q = `SELECT DISTINCT val FROM time_series_gin ${where} FORMAT JSON` + const q = `SELECT DISTINCT val FROM time_series_gin${dist} ${where} FORMAT JSON` const allValues = await clickhouse.rawRequest(q, null, utils.DATABASE_NAME()) const resp = { status: 'success', data: allValues.data.data.map(r => r.val) } return res.send(resp) diff --git a/parser/transpiler.js b/parser/transpiler.js index 0ac2975d..cf78914f 100644 --- a/parser/transpiler.js +++ b/parser/transpiler.js @@ -164,7 +164,7 @@ module.exports.transpile = (request) => { step: step, legacy: !checkVersion('v3_1', start), joinLabels: joinLabels, - inline: clusterName !== '' + inline: !!clusterName } let duration = null const matrixOp = [ From 6e466d63c3ba430eb13817cc1f1044e59278a16f Mon Sep 17 00:00:00 2001 From: akvlad Date: Sun, 1 Oct 2023 18:36:17 +0300 Subject: [PATCH 03/10] #fix: unit tests update due to cluster support --- test/__snapshots__/transpiler.test.js.snap | 30 +++++++++++----------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/test/__snapshots__/transpiler.test.js.snap b/test/__snapshots__/transpiler.test.js.snap index e64e2967..17978b19 100644 --- a/test/__snapshots__/transpiler.test.js.snap +++ b/test/__snapshots__/transpiler.test.js.snap @@ -9002,7 +9002,7 @@ exports[`should transpile complex pipelines 1`] = ` Object { "duration": 1000, "matrix": false, - "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '\${testID}')) and ((JSONHas(labels, 'freq') = 1) and (toFloat64OrNull(JSONExtractString(labels, 'freq')) >= '4'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 0000000 and 100000000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` asc limit 1000) select * from sel_a order by \`labels\` asc,\`timestamp_ns\` asc", + "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '\${testID}')) and ((JSONHas(labels, 'freq') = 1) and (toFloat64OrNull(JSONExtractString(labels, 'freq')) >= '4'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` AS time_series on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 0000000 and 100000000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` asc limit 1000) select * from sel_a order by \`labels\` asc,\`timestamp_ns\` asc", "stream": Array [], } `; @@ -12372,7 +12372,7 @@ exports[`should transpile new style 1 1`] = ` Object { "duration": 1000, "matrix": false, - "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.7387779420506657'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\` from loki.samples_vX as \`samples\` where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` desc limit 2000) select JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\`,sel_a.* from sel_a left any join \`loki\`.\`time_series\` on \`sel_a\`.\`fingerprint\` = time_series.fingerprint order by \`labels\` desc,\`timestamp_ns\` desc", + "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.7387779420506657'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\` from loki.samples_vX as \`samples\` where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` desc limit 2000) select JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\`,sel_a.* from sel_a left any join \`loki\`.\`time_series\` AS time_series on \`sel_a\`.\`fingerprint\` = time_series.fingerprint order by \`labels\` desc,\`timestamp_ns\` desc", "stream": Array [], } `; @@ -12381,7 +12381,7 @@ exports[`should transpile new style 2 1`] = ` Object { "duration": 1000, "matrix": false, - "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.2119268970232')) and ((JSONHas(labels, 'freq') = 1) and (JSONExtractString(labels, 'freq') = '2'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\` from loki.samples_vX as \`samples\` where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) and (match(string, '2[0-9]$') = 1) order by \`timestamp_ns\` desc limit 2000) select JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\`,sel_a.* from sel_a left any join \`loki\`.\`time_series\` on \`sel_a\`.\`fingerprint\` = time_series.fingerprint order by \`labels\` desc,\`timestamp_ns\` desc", + "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.2119268970232')) and ((JSONHas(labels, 'freq') = 1) and (JSONExtractString(labels, 'freq') = '2'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\` from loki.samples_vX as \`samples\` where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) and (match(string, '2[0-9]$') = 1) order by \`timestamp_ns\` desc limit 2000) select JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\`,sel_a.* from sel_a left any join \`loki\`.\`time_series\` AS time_series on \`sel_a\`.\`fingerprint\` = time_series.fingerprint order by \`labels\` desc,\`timestamp_ns\` desc", "stream": Array [], } `; @@ -12390,7 +12390,7 @@ exports[`should transpile new style 3 1`] = ` Object { "duration": 1000, "matrix": true, - "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.7026038163617259')) and ((JSONHas(labels, 'freq') = 1) and (JSONExtractString(labels, 'freq') = '2'))), rate_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,intDiv(samples.timestamp_ns, 1000000) as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) and (match(string, '2[0-9]$') = 1) order by \`timestamp_ns\` desc), rate_b AS (select labels as \`labels\`,intDiv(timestamp_ns, 1000) * 1000 as \`timestamp_ns\`,toFloat64(count(1)) * 1000 / 1000 as \`value\` from rate_a group by \`labels\`,\`timestamp_ns\` order by \`labels\` asc,\`timestamp_ns\` asc) select \`labels\`,intDiv(timestamp_ns, 2000) * 2000 as \`timestamp_ns\`,argMin(rate_b.value, rate_b.timestamp_ns) as \`value\` from \`rate_b\` group by \`labels\`,\`timestamp_ns\` order by \`labels\` asc,\`timestamp_ns\` asc", + "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.7026038163617259')) and ((JSONHas(labels, 'freq') = 1) and (JSONExtractString(labels, 'freq') = '2'))), rate_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,intDiv(samples.timestamp_ns, 1000000) as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` AS time_series on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) and (match(string, '2[0-9]$') = 1) order by \`timestamp_ns\` desc), rate_b AS (select labels as \`labels\`,intDiv(timestamp_ns, 1000) * 1000 as \`timestamp_ns\`,toFloat64(count(1)) * 1000 / 1000 as \`value\` from rate_a group by \`labels\`,\`timestamp_ns\` order by \`labels\` asc,\`timestamp_ns\` asc) select \`labels\`,intDiv(timestamp_ns, 2000) * 2000 as \`timestamp_ns\`,argMin(rate_b.value, rate_b.timestamp_ns) as \`value\` from \`rate_b\` group by \`labels\`,\`timestamp_ns\` order by \`labels\` asc,\`timestamp_ns\` asc", "stream": Array [], } `; @@ -12399,7 +12399,7 @@ exports[`should transpile new style 4 1`] = ` Object { "duration": 1000, "matrix": true, - "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.7026038163617259')) and ((JSONHas(labels, 'freq') = 1) and (JSONExtractString(labels, 'freq') = '2'))) select \`labels\`,toUInt64(intDiv(timestamp_ns, 1000000000) * 1000) as \`timestamp_ns\`,toFloat64(0) as \`value\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) and (match(string, '2[0-9]$') = 1) group by \`labels\`,\`timestamp_ns\` order by \`labels\` asc,\`timestamp_ns\` asc", + "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.7026038163617259')) and ((JSONHas(labels, 'freq') = 1) and (JSONExtractString(labels, 'freq') = '2'))) select \`labels\`,toUInt64(intDiv(timestamp_ns, 1000000000) * 1000) as \`timestamp_ns\`,toFloat64(0) as \`value\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` AS time_series on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) and (match(string, '2[0-9]$') = 1) group by \`labels\`,\`timestamp_ns\` order by \`labels\` asc,\`timestamp_ns\` asc", "stream": Array [ [Function], ], @@ -12410,7 +12410,7 @@ exports[`should transpile new style 5 1`] = ` Object { "duration": 1000, "matrix": false, - "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.000341166036469831_json'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` desc limit 2000) select * from sel_a order by \`labels\` desc,\`timestamp_ns\` desc", + "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.000341166036469831_json'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` AS time_series on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` desc limit 2000) select * from sel_a order by \`labels\` desc,\`timestamp_ns\` desc", "stream": Array [ [Function], ], @@ -12421,7 +12421,7 @@ exports[`should transpile new style 6 1`] = ` Object { "duration": 1000, "matrix": false, - "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.2053747382122484_json'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\`,arrayFilter((x) -> x.2 != '', [('lbl_repl', if(JSONType(samples.string, 'new_lbl') == 'String', JSONExtractString(samples.string, 'new_lbl'), JSONExtractRaw(samples.string, 'new_lbl')))]) as \`extra_labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) and (isValidJSON(samples.string) = 1) and ((indexOf(extra_labels, ('lbl_repl', 'new_val')) > 0) or ((arrayExists(x -> x.1 == 'lbl_repl', extra_labels) = 0) and ((arrayExists(x -> x.1 == 'lbl_repl', labels) = 1) and (arrayFirst(x -> x.1 == 'lbl_repl', labels).2 = 'new_val')))) order by \`timestamp_ns\` desc limit 2000) select * from sel_a order by \`labels\` desc,\`timestamp_ns\` desc", + "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.2053747382122484_json'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\`,arrayFilter((x) -> x.2 != '', [('lbl_repl', if(JSONType(samples.string, 'new_lbl') == 'String', JSONExtractString(samples.string, 'new_lbl'), JSONExtractRaw(samples.string, 'new_lbl')))]) as \`extra_labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` AS time_series on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) and (isValidJSON(samples.string) = 1) and ((indexOf(extra_labels, ('lbl_repl', 'new_val')) > 0) or ((arrayExists(x -> x.1 == 'lbl_repl', extra_labels) = 0) and ((arrayExists(x -> x.1 == 'lbl_repl', labels) = 1) and (arrayFirst(x -> x.1 == 'lbl_repl', labels).2 = 'new_val')))) order by \`timestamp_ns\` desc limit 2000) select * from sel_a order by \`labels\` desc,\`timestamp_ns\` desc", "stream": Array [], } `; @@ -12430,7 +12430,7 @@ exports[`should transpile new style 7 1`] = ` Object { "duration": 3000, "matrix": true, - "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.1547558751138609_json'))) select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,intDiv(samples.timestamp_ns, 1000000) as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` desc", + "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.1547558751138609_json'))) select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,intDiv(samples.timestamp_ns, 1000000) as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` AS time_series on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` desc", "stream": Array [ [Function], [Function], @@ -12446,7 +12446,7 @@ exports[`should transpile new style 8 1`] = ` Object { "duration": 1000, "matrix": true, - "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.4075242197275857'))) select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,intDiv(samples.timestamp_ns, 1000000) as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` desc", + "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.4075242197275857'))) select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,intDiv(samples.timestamp_ns, 1000000) as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` AS time_series on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` desc", "stream": Array [ [Function], [Function], @@ -12462,7 +12462,7 @@ exports[`should transpile new style 9 1`] = ` Object { "duration": 1000, "matrix": false, - "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.7186063017626447_json'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\`,arrayFilter((x) -> x.2 != '', [('sid', if(JSONType(samples.string, 'str_id') == 'String', JSONExtractString(samples.string, 'str_id'), JSONExtractRaw(samples.string, 'str_id')))]) as \`extra_labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) and (isValidJSON(samples.string) = 1) and ((arrayExists(x -> x.1 == 'sid' AND (coalesce(toFloat64OrNull(x.2) >= '598', 0)), extra_labels) != 0) or ((arrayExists(x -> x.1 == 'sid', extra_labels) = 0) and (arrayExists(x -> x.1 == 'sid', labels) = 1) and (toFloat64OrNull(arrayFirst(x -> x.1 == 'sid', labels).2) >= '598'))) order by \`timestamp_ns\` desc limit 2000) select * from sel_a order by \`labels\` desc,\`timestamp_ns\` desc", + "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.7186063017626447_json'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\`,arrayFilter((x) -> x.2 != '', [('sid', if(JSONType(samples.string, 'str_id') == 'String', JSONExtractString(samples.string, 'str_id'), JSONExtractRaw(samples.string, 'str_id')))]) as \`extra_labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` AS time_series on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) and (isValidJSON(samples.string) = 1) and ((arrayExists(x -> x.1 == 'sid' AND (coalesce(toFloat64OrNull(x.2) >= '598', 0)), extra_labels) != 0) or ((arrayExists(x -> x.1 == 'sid', extra_labels) = 0) and (arrayExists(x -> x.1 == 'sid', labels) = 1) and (toFloat64OrNull(arrayFirst(x -> x.1 == 'sid', labels).2) >= '598'))) order by \`timestamp_ns\` desc limit 2000) select * from sel_a order by \`labels\` desc,\`timestamp_ns\` desc", "stream": Array [], } `; @@ -12471,7 +12471,7 @@ exports[`should transpile new style 10 1`] = ` Object { "duration": 1000, "matrix": false, - "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.5505504081219323'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\`,arrayFilter(x -> x.1 != '' AND x.2 != '', arrayZip(['e'], arrayMap(x -> x[length(x)], extractAllGroupsHorizontal(string, '^([^0-9]+)[0-9]+$')))) as \`extra_labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` desc limit 2000) select * from sel_a order by \`labels\` desc,\`timestamp_ns\` desc", + "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (JSONExtractString(labels, 'test_id') = '0.5505504081219323'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\`,arrayFilter(x -> x.1 != '' AND x.2 != '', arrayZip(['e'], arrayMap(x -> x[length(x)], extractAllGroupsHorizontal(string, '^([^0-9]+)[0-9]+$')))) as \`extra_labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` AS time_series on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` desc limit 2000) select * from sel_a order by \`labels\` desc,\`timestamp_ns\` desc", "stream": Array [], } `; @@ -12480,7 +12480,7 @@ exports[`should transpile new style 11 1`] = ` Object { "duration": 1000, "matrix": true, - "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'label') = 1) and (JSONExtractString(labels, 'label') = 'val'))), uw_rate_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,intDiv(samples.timestamp_ns, 1000000) as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\`,toFloat64OrNull(arrayFirst(x -> x.1 == 'b', labels).2) as \`unwrapped\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) and ((arrayExists(x -> x.1 == 'b', labels) = 1) and (isNotNull(unwrapped) = 1)) order by \`timestamp_ns\` desc), uw_rate_b AS (select labels as \`labels\`,SUM(unwrapped) / 1 as \`value\`,intDiv(timestamp_ns, 1000) * 1000 as \`timestamp_ns\` from uw_rate_a group by \`labels\`,\`timestamp_ns\` order by \`labels\`,\`timestamp_ns\`) select \`labels\`,intDiv(timestamp_ns, 2000) * 2000 as \`timestamp_ns\`,argMin(uw_rate_b.value, uw_rate_b.timestamp_ns) as \`value\` from uw_rate_b group by \`labels\`,\`timestamp_ns\` order by \`labels\` asc,\`timestamp_ns\` asc", + "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'label') = 1) and (JSONExtractString(labels, 'label') = 'val'))), uw_rate_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,intDiv(samples.timestamp_ns, 1000000) as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\`,toFloat64OrNull(arrayFirst(x -> x.1 == 'b', labels).2) as \`unwrapped\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` AS time_series on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) and ((arrayExists(x -> x.1 == 'b', labels) = 1) and (isNotNull(unwrapped) = 1)) order by \`timestamp_ns\` desc), uw_rate_b AS (select labels as \`labels\`,SUM(unwrapped) / 1 as \`value\`,intDiv(timestamp_ns, 1000) * 1000 as \`timestamp_ns\` from uw_rate_a group by \`labels\`,\`timestamp_ns\` order by \`labels\`,\`timestamp_ns\`) select \`labels\`,intDiv(timestamp_ns, 2000) * 2000 as \`timestamp_ns\`,argMin(uw_rate_b.value, uw_rate_b.timestamp_ns) as \`value\` from uw_rate_b group by \`labels\`,\`timestamp_ns\` order by \`labels\` asc,\`timestamp_ns\` asc", "stream": Array [], } `; @@ -12489,7 +12489,7 @@ exports[`should transpile new style 12 1`] = ` Object { "duration": 1000, "matrix": false, - "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'freq') = 1) and (JSONExtractString(labels, 'freq') = '1'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` desc limit 2000) select * from sel_a order by \`labels\` desc,\`timestamp_ns\` desc", + "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'freq') = 1) and (JSONExtractString(labels, 'freq') = '1'))), sel_a AS (select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\` from loki.samples_vX as \`samples\` left any join \`loki\`.\`time_series\` AS time_series on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` between 1638802620000000000 and 1638803220000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` desc limit 2000) select * from sel_a order by \`labels\` desc,\`timestamp_ns\` desc", "stream": Array [ [Function], [Function], @@ -12512,11 +12512,11 @@ Array [ ] `; -exports[`should transpile series 1`] = `"WITH idx_sel AS ((select \`sel_1\`.\`fingerprint\` from (select \`fingerprint\` from \`loki\`.\`time_series_gin\` where ((\`key\` = 'test_id') and (\`val\` = '123'))) as \`sel_1\`)) select DISTINCT \`fingerprint\`,\`labels\` from loki.time_series where (fingerprint IN idx_sel) and (1 == 1)"`; +exports[`should transpile series 1`] = `"WITH idx_sel AS ((select \`sel_1\`.\`fingerprint\` from (select \`fingerprint\` from \`loki\`.\`time_series_gin\` where ((\`key\` = 'test_id') and (\`val\` = '123'))) as \`sel_1\`)) select DISTINCT \`fingerprint\`,\`labels\` from loki.time_series where (\`fingerprint\` in (idx_sel)) and (1 == 1)"`; exports[`should transpile tail 1`] = ` Object { - "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (extractAllGroups(JSONExtractString(labels, 'test_id'), '(_ws)') != '[]'))) select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\` from loki.samples_v3 as \`samples\` left any join \`loki\`.\`time_series\` on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` > (toUnixTimestamp(now()) - 5) * 1000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` asc", + "query": "WITH str_sel AS (select DISTINCT \`fingerprint\` from loki.time_series where ((JSONHas(labels, 'test_id') = 1) and (extractAllGroups(JSONExtractString(labels, 'test_id'), '(_ws)') != '[]'))) select \`samples\`.\`string\` as \`string\`,\`samples\`.\`fingerprint\` as \`fingerprint\`,samples.timestamp_ns as \`timestamp_ns\`,JSONExtractKeysAndValues(time_series.labels, 'String') as \`labels\` from loki.samples_v3 as \`samples\` left any join \`loki\`.\`time_series\` AS time_series on \`samples\`.\`fingerprint\` = time_series.fingerprint where (\`samples\`.\`timestamp_ns\` > (toUnixTimestamp(now()) - 5) * 1000000000) and (\`samples\`.\`fingerprint\` in (select \`fingerprint\` from str_sel)) order by \`timestamp_ns\` asc", "stream": Array [], } `; From 5920f6646aa8a48bf2d2d7f2823f6967ec947dc0 Mon Sep 17 00:00:00 2001 From: akvlad Date: Sun, 1 Oct 2023 19:18:46 +0300 Subject: [PATCH 04/10] #fix: drop alerts for single server; ws live --- lib/db/clickhouse_alerting.js | 9 +++++---- lib/db/watcher.js | 14 +++++++++----- parser/transpiler.js | 2 +- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/lib/db/clickhouse_alerting.js b/lib/db/clickhouse_alerting.js index 7014b47f..5c99d642 100644 --- a/lib/db/clickhouse_alerting.js +++ b/lib/db/clickhouse_alerting.js @@ -156,7 +156,8 @@ module.exports.deleteAlertRule = async (ns, group, rule) => { `INSERT INTO ${DATABASE_NAME()}.settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow\n` + JSON.stringify({ fingerprint: fp, type: 'alert_rule', name: '', value: '', inserted_at: Date.now() }) ) - await axios.post(getClickhouseUrl() + '/?allow_nondeterministic_mutations=1&mutations_execute_nondeterministic_on_initiator=1', + const settings = clusterName ? '/?allow_nondeterministic_mutations=1&mutations_execute_nondeterministic_on_initiator=1' : '' + await axios.post(getClickhouseUrl() + settings, `ALTER TABLE ${DATABASE_NAME()}.settings ${onCluster} DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC')` ) } @@ -172,9 +173,9 @@ module.exports.deleteGroup = async (ns, group) => { `INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow\n` + JSON.stringify({ fingerprint: fp, type: 'alert_group', name: '', value: '', inserted_at: Date.now() }) ) - await axios.post(getClickhouseUrl() + '/?allow_nondeterministic_mutations=1&mutations_execute_nondeterministic_on_initiator=1', - `ALTER TABLE ${DATABASE_NAME()}.settings ${onCluster} DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC') -SETTINGS allow_nondeterministic_mutations=1` + const settings = clusterName ? '/?allow_nondeterministic_mutations=1&mutations_execute_nondeterministic_on_initiator=1' : '' + await axios.post(getClickhouseUrl() + settings, + `ALTER TABLE ${DATABASE_NAME()}.settings ${onCluster} DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC')` ) } diff --git a/lib/db/watcher.js b/lib/db/watcher.js index 713602a5..9dd89400 100644 --- a/lib/db/watcher.js +++ b/lib/db/watcher.js @@ -70,13 +70,13 @@ class Watcher extends EventEmitter { } async initQueryCBPoll () { - this.from = (Date.now() - 300000) * 1000000 + this.from = (Date.now() - 300000) while (this.working) { - this.to = (Date.now() - 5000) * 1000000 + this.to = (Date.now() - 1000) this.query = transpiler.transpile({ ...this.request, - start: this.from, - end: this.to + start: this.from * 1000000, + end: this.to * 1000000 }) this.query.step = this.step const resp = await queryFingerprintsScan(this.query) @@ -88,8 +88,12 @@ class Watcher extends EventEmitter { resp.once('close', resolve) resp.once('error', reject) }) - this.from = this.to this.resp = JSON.parse(this.resp).data.result + for (const stream of this.resp) { + for (const v of stream.values) { + this.from = Math.max(this.from, (parseFloat(v[0]) + 1000000) / 1000000) + } + } this.flush() this.resp = '' await new Promise((resolve) => setTimeout(resolve, 1000)) diff --git a/parser/transpiler.js b/parser/transpiler.js index cf78914f..c6043217 100644 --- a/parser/transpiler.js +++ b/parser/transpiler.js @@ -293,7 +293,7 @@ module.exports.transpileTail = (request) => { } query = module.exports.transpileLogStreamSelector(expression.rootToken, query) setQueryParam(query, sharedParamNames.timeSeriesTable, `${DATABASE_NAME()}.time_series`) - setQueryParam(query, sharedParamNames.samplesTable, `${DATABASE_NAME()}.${samplesTableName}`) + setQueryParam(query, sharedParamNames.samplesTable, `${DATABASE_NAME()}.${samplesTableName}${dist}`) setQueryParam(query, sharedParamNames.from, new Sql.Raw('(toUnixTimestamp(now()) - 5) * 1000000000')) query.order_expressions = [] query.orderBy(['timestamp_ns', 'asc']) From 9eee2ceac63feb815bb71e0598457cbc108589bc Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 2 Oct 2023 11:23:57 +0300 Subject: [PATCH 05/10] #fix: clickhouse version -> 23.8 LTS --- .github/workflows/node-clickhouse.js.yml | 2 +- qryn.js | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/node-clickhouse.js.yml b/.github/workflows/node-clickhouse.js.yml index 3e15f8da..286a778b 100644 --- a/.github/workflows/node-clickhouse.js.yml +++ b/.github/workflows/node-clickhouse.js.yml @@ -27,7 +27,7 @@ jobs: # See supported Node.js release schedule at https://nodejs.org/en/about/releases/ steps: - - uses: EpicStep/clickhouse-github-action@v1.0.0 + - run: docker run --name clickhouse -p 9000:9000 -p 8123:8123 -d clickhouse/clickhouse-server 23.8-alpine - uses: actions/checkout@v2 - name: Use Node.js ${{ matrix.node-version }} uses: actions/setup-node@v2 diff --git a/qryn.js b/qryn.js index f82f6e83..585c4a91 100755 --- a/qryn.js +++ b/qryn.js @@ -110,6 +110,7 @@ let fastify = require('fastify')({ }) done() })) + await fastify.register(require('@fastify/compress')) await fastify.register(require('fastify-url-data')) await fastify.register(require('@fastify/websocket')) From e8a9dd93d24e6bea7b3f6de91cf7acd26ba3bc5c Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 2 Oct 2023 11:26:23 +0300 Subject: [PATCH 06/10] #fix: clickhouse version -> 23.8 LTS --- .github/workflows/node-clickhouse.js.yml | 2 +- qryn.js | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/node-clickhouse.js.yml b/.github/workflows/node-clickhouse.js.yml index 286a778b..f94cf033 100644 --- a/.github/workflows/node-clickhouse.js.yml +++ b/.github/workflows/node-clickhouse.js.yml @@ -27,7 +27,7 @@ jobs: # See supported Node.js release schedule at https://nodejs.org/en/about/releases/ steps: - - run: docker run --name clickhouse -p 9000:9000 -p 8123:8123 -d clickhouse/clickhouse-server 23.8-alpine + - run: docker run --name clickhouse -p 9000:9000 -p 8123:8123 -d clickhouse/clickhouse-server:23.8-alpine - uses: actions/checkout@v2 - name: Use Node.js ${{ matrix.node-version }} uses: actions/setup-node@v2 diff --git a/qryn.js b/qryn.js index 585c4a91..f82f6e83 100755 --- a/qryn.js +++ b/qryn.js @@ -110,7 +110,6 @@ let fastify = require('fastify')({ }) done() })) - await fastify.register(require('@fastify/compress')) await fastify.register(require('fastify-url-data')) await fastify.register(require('@fastify/websocket')) From ee7350b2b0993ba62b93ed21986a886ac14de146 Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 2 Oct 2023 11:34:38 +0300 Subject: [PATCH 07/10] #fix: node version -> 16, 18 --- .github/workflows/node-clickhouse.js.yml | 2 +- qryn.js | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/node-clickhouse.js.yml b/.github/workflows/node-clickhouse.js.yml index f94cf033..55b7ac22 100644 --- a/.github/workflows/node-clickhouse.js.yml +++ b/.github/workflows/node-clickhouse.js.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: - node-version: [14.x, 16.x] + node-version: [18.x, 16.x] # See supported Node.js release schedule at https://nodejs.org/en/about/releases/ steps: diff --git a/qryn.js b/qryn.js index f82f6e83..585c4a91 100755 --- a/qryn.js +++ b/qryn.js @@ -110,6 +110,7 @@ let fastify = require('fastify')({ }) done() })) + await fastify.register(require('@fastify/compress')) await fastify.register(require('fastify-url-data')) await fastify.register(require('@fastify/websocket')) From 2e3953066faefa2314000702d54286862c3e792d Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 2 Oct 2023 11:36:39 +0300 Subject: [PATCH 08/10] #fix: node version -> 16, 18 --- .github/workflows/node-clickhouse.js.yml | 2 +- qryn.js | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/node-clickhouse.js.yml b/.github/workflows/node-clickhouse.js.yml index 55b7ac22..7f66d885 100644 --- a/.github/workflows/node-clickhouse.js.yml +++ b/.github/workflows/node-clickhouse.js.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: - node-version: [18.x, 16.x] + node-version: [18, 16.x] # See supported Node.js release schedule at https://nodejs.org/en/about/releases/ steps: diff --git a/qryn.js b/qryn.js index 585c4a91..f82f6e83 100755 --- a/qryn.js +++ b/qryn.js @@ -110,7 +110,6 @@ let fastify = require('fastify')({ }) done() })) - await fastify.register(require('@fastify/compress')) await fastify.register(require('fastify-url-data')) await fastify.register(require('@fastify/websocket')) From fffeb4d78182a50c120412e72ab745ddd4d6eaa8 Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 2 Oct 2023 11:41:14 +0300 Subject: [PATCH 09/10] #fix: tests logging --- .github/workflows/node-clickhouse.js.yml | 2 +- qryn.js | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/node-clickhouse.js.yml b/.github/workflows/node-clickhouse.js.yml index 7f66d885..c1e15ac3 100644 --- a/.github/workflows/node-clickhouse.js.yml +++ b/.github/workflows/node-clickhouse.js.yml @@ -42,4 +42,4 @@ jobs: CLICKHOUSE_DB: loki CLICKHOUSE_TSDB: loki INTEGRATION_E2E: 1 - run: node qryn.js & npm run test --forceExit + run: node qryn.js >/dev/stdout & npm run test --forceExit diff --git a/qryn.js b/qryn.js index f82f6e83..585c4a91 100755 --- a/qryn.js +++ b/qryn.js @@ -110,6 +110,7 @@ let fastify = require('fastify')({ }) done() })) + await fastify.register(require('@fastify/compress')) await fastify.register(require('fastify-url-data')) await fastify.register(require('@fastify/websocket')) From d689a12effb03674ef0ea74451c8f361493398ae Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 2 Oct 2023 11:47:44 +0300 Subject: [PATCH 10/10] #fix: force ipv4 --- .github/workflows/node-clickhouse.js.yml | 1 + qryn.js | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/node-clickhouse.js.yml b/.github/workflows/node-clickhouse.js.yml index c1e15ac3..0fee32e5 100644 --- a/.github/workflows/node-clickhouse.js.yml +++ b/.github/workflows/node-clickhouse.js.yml @@ -42,4 +42,5 @@ jobs: CLICKHOUSE_DB: loki CLICKHOUSE_TSDB: loki INTEGRATION_E2E: 1 + CLOKI_EXT_URL: 127.0.0.1:3100 run: node qryn.js >/dev/stdout & npm run test --forceExit diff --git a/qryn.js b/qryn.js index 585c4a91..f82f6e83 100755 --- a/qryn.js +++ b/qryn.js @@ -110,7 +110,6 @@ let fastify = require('fastify')({ }) done() })) - await fastify.register(require('@fastify/compress')) await fastify.register(require('fastify-url-data')) await fastify.register(require('@fastify/websocket'))