diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index a704fe5f..1ade9f05 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -112,7 +112,7 @@ if (isMainThread && !bun()) { const recordCache = require('record-cache') const { parseMs, DATABASE_NAME } = require('../utils') let id = 0 -function getThrottlerId() { +function getThrottlerId () { id = (id + 1) % 1e6 return id } @@ -176,7 +176,7 @@ const initialize = async function (dbName) { ch = new ClickHouse(tmp) if (!isOmitTablesCreation()) { const maintain = require('./maintain/index') - await maintain.upgrade(dbName) + await maintain.upgrade({ name: dbName, storage_policy: storagePolicy }) await maintain.rotate([{ db: dbName, samples_days: rotationSamples, diff --git a/lib/db/maintain/index.js b/lib/db/maintain/index.js index f1689154..02cc358b 100644 --- a/lib/db/maintain/index.js +++ b/lib/db/maintain/index.js @@ -1,8 +1,7 @@ const hb = require('handlebars') const client = require('../clickhouse') const logger = require('../../logger') -const {samplesOrderingRule, clusterName} = require('../../../common') -const scripts = require('./scripts') +const { samplesOrderingRule, clusterName } = require('../../../common') const getEnv = () => { return { CLICKHOUSE_DB: 'cloki', @@ -14,16 +13,20 @@ const getEnv = () => { /** * - * @param db {string} + * @param db {{name: string, storage_policy: string}} * @returns {Promise} */ module.exports.upgrade = async (db) => { const scripts = require('./scripts') - await upgradeSingle(db, 1, scripts.overall) - await upgradeSingle(db, 2, scripts.traces) + await upgradeSingle(db.name, 1, scripts.overall, db.storage_policy) + await upgradeSingle(db.name, 2, scripts.traces, db.storage_policy) + if (db.storage_policy) { + await client.addSetting('rotate', 'v3_storage_policy', db.storage_policy, db.name) + await client.addSetting('rotate', 'v1_traces_storage_policy', db.storage_policy, db.name) + } if (clusterName) { - await upgradeSingle(db, 3, scripts.overall_dist) - await upgradeSingle(db, 4, scripts.traces_dist) + await upgradeSingle(db.name, 3, scripts.overall_dist, db.storage_policy) + await upgradeSingle(db.name, 4, scripts.traces_dist, db.storage_policy) } } @@ -33,22 +36,23 @@ let isDBCreated = false * @param db {string} * @param key {number} * @param scripts {string[]} + * @param storagePolicy {string} */ -const upgradeSingle = async (db, key, scripts) => { +const upgradeSingle = async (db, key, scripts, storagePolicy) => { const _upgradeRequest = (request, useDefaultDB, updateVer) => { - return upgradeRequest(db, request, useDefaultDB, updateVer) + return upgradeRequest({ db, useDefaultDB, updateVer, storage_policy: storagePolicy }, request) } if (!isDBCreated) { isDBCreated = true await _upgradeRequest('CREATE DATABASE IF NOT EXISTS {{DB}} {{{OnCluster}}}') if (clusterName) { await _upgradeRequest('CREATE TABLE IF NOT EXISTS {{DB}}._ver {{{OnCluster}}} (k UInt64, ver UInt64) ' + - 'ENGINE={{ReplacingMergeTree}}(ver) ORDER BY k', true) + 'ENGINE={{ReplacingMergeTree}}(ver) ORDER BY k {{{CREATE_SETTINGS}}}', true) await _upgradeRequest('CREATE TABLE IF NOT EXISTS {{DB}}.ver {{{OnCluster}}} (k UInt64, ver UInt64) ' + 'ENGINE=Distributed(\'{{CLUSTER}}\',\'{{DB}}\', \'_ver\', rand())', true) } else { await _upgradeRequest('CREATE TABLE IF NOT EXISTS {{DB}}.ver {{{OnCluster}}} (k UInt64, ver UInt64) ' + - 'ENGINE={{ReplacingMergeTree}}(ver) ORDER BY k', true) + 'ENGINE={{ReplacingMergeTree}}(ver) ORDER BY k {{{CREATE_SETTINGS}}}', true) } } let ver = await client.rawRequest(`SELECT max(ver) as ver FROM ver WHERE k = ${key} FORMAT JSON`, @@ -62,29 +66,27 @@ const upgradeSingle = async (db, key, scripts) => { } /** - * @param db {string} database to update - * @param request {string} request tpl - * @param useDefaultDB {boolean} use db as default db - * @param updateVer {{key: number, to: number}} update ver table + * @param opts {{db: string, useDefaultDB: boolean, updateVer: {key: number, to: number}, storage_policy: string}} + * @param request {string} database to update * @returns {Promise} */ -const upgradeRequest = async (db, request, useDefaultDB, updateVer) => { +const upgradeRequest = async (opts, request) => { const tpl = hb.compile(request) request = tpl({ ...getEnv(), - DB: db, + DB: opts.db, CLUSTER: clusterName || '', SAMPLES_ORDER_RUL: samplesOrderingRule(), OnCluster: clusterName ? `ON CLUSTER \`${clusterName}\`` : '', MergeTree: clusterName ? 'ReplicatedMergeTree' : 'MergeTree', ReplacingMergeTree: clusterName ? 'ReplicatedReplacingMergeTree' : 'ReplacingMergeTree', - AggregatingMergeTree: clusterName ? 'ReplicatedAggregatingMergeTree' : 'AggregatingMergeTree' + AggregatingMergeTree: clusterName ? 'ReplicatedAggregatingMergeTree' : 'AggregatingMergeTree', + CREATE_SETTINGS: opts.storage_policy ? `SETTINGS storage_policy='${opts.storage_policy}'` : '' }) - console.log(request) - await client.rawRequest(request, null, useDefaultDB ? db : undefined) - if (updateVer) { - console.log(`INSERT INTO ver (k, ver) VALUES (${updateVer.key}, ${updateVer.to})`) - await client.rawRequest(`INSERT INTO ver (k, ver) VALUES (${updateVer.key}, ${updateVer.to})`, null, db) + await client.rawRequest(request, null, opts.useDefaultDB ? opts.db : undefined) + if (opts.updateVer) { + await client.rawRequest(`INSERT INTO ver (k, ver) VALUES (${opts.updateVer.key}, ${opts.updateVer.to})`, + null, opts.db) } } @@ -102,7 +104,7 @@ module.exports.rotate = async (opts) => { { type: 'rotate', name: 'v1_traces_storage_policy' } ], db.db) const _update = (req) => { - return upgradeRequest(db.db, req, true) + return upgradeRequest({ db: db.db, useDefaultDB: true }, req) } if (db.samples_days + '' !== settings.v3_samples_days) { const alterTable = 'ALTER TABLE samples_v3 {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' diff --git a/lib/db/maintain/scripts.js b/lib/db/maintain/scripts.js index 6376e36b..f70d71d8 100644 --- a/lib/db/maintain/scripts.js +++ b/lib/db/maintain/scripts.js @@ -1,6 +1,6 @@ module.exports.overall = [ `CREATE TABLE IF NOT EXISTS {{DB}}.time_series {{{OnCluster}}} (date Date,fingerprint UInt64,labels String, name String) - ENGINE = {{ReplacingMergeTree}}(date) PARTITION BY date ORDER BY fingerprint`, + ENGINE = {{ReplacingMergeTree}}(date) PARTITION BY date ORDER BY fingerprint {{{CREATE_SETTINGS}}}`, `CREATE TABLE IF NOT EXISTS {{DB}}.samples_v3 {{{OnCluster}}} ( @@ -10,11 +10,11 @@ module.exports.overall = [ string String ) ENGINE = {{MergeTree}} PARTITION BY toStartOfDay(toDateTime(timestamp_ns / 1000000000)) - ORDER BY ({{SAMPLES_ORDER_RUL}})`, + ORDER BY ({{SAMPLES_ORDER_RUL}}) {{{CREATE_SETTINGS}}}`, `CREATE TABLE IF NOT EXISTS {{DB}}.settings {{{OnCluster}}} (fingerprint UInt64, type String, name String, value String, inserted_at DateTime64(9, 'UTC')) - ENGINE = {{ReplacingMergeTree}}(inserted_at) ORDER BY fingerprint`, + ENGINE = {{ReplacingMergeTree}}(inserted_at) ORDER BY fingerprint {{{CREATE_SETTINGS}}}`, 'DROP TABLE IF EXISTS {{DB}}.samples_read {{{OnCluster}}}', @@ -34,7 +34,7 @@ module.exports.overall = [ key String, val String, fingerprint UInt64 - ) ENGINE = {{ReplacingMergeTree}}() PARTITION BY date ORDER BY (key, val, fingerprint)`, + ) ENGINE = {{ReplacingMergeTree}}() PARTITION BY date ORDER BY (key, val, fingerprint) {{{CREATE_SETTINGS}}}`, `CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.time_series_gin_view {{{OnCluster}}} TO time_series_gin AS SELECT date, pairs.1 as key, pairs.2 as val, fingerprint @@ -54,7 +54,7 @@ module.exports.overall = [ bytes SimpleAggregateFunction(sum, Float64) ) ENGINE = {{AggregatingMergeTree}} PARTITION BY toDate(toDateTime(intDiv(timestamp_ns, 1000000000))) -ORDER BY (fingerprint, timestamp_ns);`, +ORDER BY (fingerprint, timestamp_ns) {{{CREATE_SETTINGS}}};`, `CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.metrics_15s_mv {{{OnCluster}}} TO metrics_15s AS SELECT fingerprint, @@ -85,7 +85,7 @@ module.exports.traces = [ payload_type Int8, payload String ) Engine = {{MergeTree}}() ORDER BY (oid, trace_id, timestamp_ns) - PARTITION BY (oid, toDate(FROM_UNIXTIME(intDiv(timestamp_ns, 1000000000))));`, + PARTITION BY (oid, toDate(FROM_UNIXTIME(intDiv(timestamp_ns, 1000000000)))) {{{CREATE_SETTINGS}}};`, `CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_attrs_gin {{{OnCluster}}} ( oid String, @@ -98,7 +98,7 @@ module.exports.traces = [ duration Int64 ) Engine = {{ReplacingMergeTree}}() PARTITION BY date - ORDER BY (oid, date, key, val, timestamp_ns, trace_id, span_id);`, + ORDER BY (oid, date, key, val, timestamp_ns, trace_id, span_id) {{{CREATE_SETTINGS}}};`, `CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_kv {{{OnCluster}}} ( oid String, @@ -108,7 +108,7 @@ module.exports.traces = [ val String ) Engine = {{ReplacingMergeTree}}() PARTITION BY (oid, date) - ORDER BY (oid, date, key, val_id)`, + ORDER BY (oid, date, key, val_id) {{{CREATE_SETTINGS}}}`, `CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.tempo_traces_kv_mv {{{OnCluster}}} TO tempo_traces_kv AS SELECT oid, date, key, cityHash64(val) % 10000 as val_id, val FROM tempo_traces_attrs_gin`,