Skip to content

Commit

Permalink
Merge pull request #398 from metrico/397_fix
Browse files Browse the repository at this point in the history
397 fix
  • Loading branch information
akvlad authored Dec 4, 2023
2 parents 40b6987 + 1e5825b commit 059110c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 34 deletions.
4 changes: 2 additions & 2 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ if (isMainThread && !bun()) {
const recordCache = require('record-cache')
const { parseMs, DATABASE_NAME } = require('../utils')
let id = 0
function getThrottlerId() {
function getThrottlerId () {
id = (id + 1) % 1e6
return id
}
Expand Down Expand Up @@ -176,7 +176,7 @@ const initialize = async function (dbName) {
ch = new ClickHouse(tmp)
if (!isOmitTablesCreation()) {
const maintain = require('./maintain/index')
await maintain.upgrade(dbName)
await maintain.upgrade({ name: dbName, storage_policy: storagePolicy })
await maintain.rotate([{
db: dbName,
samples_days: rotationSamples,
Expand Down
50 changes: 26 additions & 24 deletions lib/db/maintain/index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
const hb = require('handlebars')
const client = require('../clickhouse')
const logger = require('../../logger')
const {samplesOrderingRule, clusterName} = require('../../../common')
const scripts = require('./scripts')
const { samplesOrderingRule, clusterName } = require('../../../common')
const getEnv = () => {
return {
CLICKHOUSE_DB: 'cloki',
Expand All @@ -14,16 +13,20 @@ const getEnv = () => {

/**
*
* @param db {string}
* @param db {{name: string, storage_policy: string}}
* @returns {Promise<void>}
*/
module.exports.upgrade = async (db) => {
const scripts = require('./scripts')
await upgradeSingle(db, 1, scripts.overall)
await upgradeSingle(db, 2, scripts.traces)
await upgradeSingle(db.name, 1, scripts.overall, db.storage_policy)
await upgradeSingle(db.name, 2, scripts.traces, db.storage_policy)
if (db.storage_policy) {
await client.addSetting('rotate', 'v3_storage_policy', db.storage_policy, db.name)
await client.addSetting('rotate', 'v1_traces_storage_policy', db.storage_policy, db.name)
}
if (clusterName) {
await upgradeSingle(db, 3, scripts.overall_dist)
await upgradeSingle(db, 4, scripts.traces_dist)
await upgradeSingle(db.name, 3, scripts.overall_dist, db.storage_policy)
await upgradeSingle(db.name, 4, scripts.traces_dist, db.storage_policy)
}
}

Expand All @@ -33,22 +36,23 @@ let isDBCreated = false
* @param db {string}
* @param key {number}
* @param scripts {string[]}
* @param storagePolicy {string}
*/
const upgradeSingle = async (db, key, scripts) => {
const upgradeSingle = async (db, key, scripts, storagePolicy) => {
const _upgradeRequest = (request, useDefaultDB, updateVer) => {
return upgradeRequest(db, request, useDefaultDB, updateVer)
return upgradeRequest({ db, useDefaultDB, updateVer, storage_policy: storagePolicy }, request)
}
if (!isDBCreated) {
isDBCreated = true
await _upgradeRequest('CREATE DATABASE IF NOT EXISTS {{DB}} {{{OnCluster}}}')
if (clusterName) {
await _upgradeRequest('CREATE TABLE IF NOT EXISTS {{DB}}._ver {{{OnCluster}}} (k UInt64, ver UInt64) ' +
'ENGINE={{ReplacingMergeTree}}(ver) ORDER BY k', true)
'ENGINE={{ReplacingMergeTree}}(ver) ORDER BY k {{{CREATE_SETTINGS}}}', true)
await _upgradeRequest('CREATE TABLE IF NOT EXISTS {{DB}}.ver {{{OnCluster}}} (k UInt64, ver UInt64) ' +
'ENGINE=Distributed(\'{{CLUSTER}}\',\'{{DB}}\', \'_ver\', rand())', true)
} else {
await _upgradeRequest('CREATE TABLE IF NOT EXISTS {{DB}}.ver {{{OnCluster}}} (k UInt64, ver UInt64) ' +
'ENGINE={{ReplacingMergeTree}}(ver) ORDER BY k', true)
'ENGINE={{ReplacingMergeTree}}(ver) ORDER BY k {{{CREATE_SETTINGS}}}', true)
}
}
let ver = await client.rawRequest(`SELECT max(ver) as ver FROM ver WHERE k = ${key} FORMAT JSON`,
Expand All @@ -62,29 +66,27 @@ const upgradeSingle = async (db, key, scripts) => {
}

/**
* @param db {string} database to update
* @param request {string} request tpl
* @param useDefaultDB {boolean} use db as default db
* @param updateVer {{key: number, to: number}} update ver table
* @param opts {{db: string, useDefaultDB: boolean, updateVer: {key: number, to: number}, storage_policy: string}}
* @param request {string} database to update
* @returns {Promise<void>}
*/
const upgradeRequest = async (db, request, useDefaultDB, updateVer) => {
const upgradeRequest = async (opts, request) => {
const tpl = hb.compile(request)
request = tpl({
...getEnv(),
DB: db,
DB: opts.db,
CLUSTER: clusterName || '',
SAMPLES_ORDER_RUL: samplesOrderingRule(),
OnCluster: clusterName ? `ON CLUSTER \`${clusterName}\`` : '',
MergeTree: clusterName ? 'ReplicatedMergeTree' : 'MergeTree',
ReplacingMergeTree: clusterName ? 'ReplicatedReplacingMergeTree' : 'ReplacingMergeTree',
AggregatingMergeTree: clusterName ? 'ReplicatedAggregatingMergeTree' : 'AggregatingMergeTree'
AggregatingMergeTree: clusterName ? 'ReplicatedAggregatingMergeTree' : 'AggregatingMergeTree',
CREATE_SETTINGS: opts.storage_policy ? `SETTINGS storage_policy='${opts.storage_policy}'` : ''
})
console.log(request)
await client.rawRequest(request, null, useDefaultDB ? db : undefined)
if (updateVer) {
console.log(`INSERT INTO ver (k, ver) VALUES (${updateVer.key}, ${updateVer.to})`)
await client.rawRequest(`INSERT INTO ver (k, ver) VALUES (${updateVer.key}, ${updateVer.to})`, null, db)
await client.rawRequest(request, null, opts.useDefaultDB ? opts.db : undefined)
if (opts.updateVer) {
await client.rawRequest(`INSERT INTO ver (k, ver) VALUES (${opts.updateVer.key}, ${opts.updateVer.to})`,
null, opts.db)
}
}

Expand All @@ -102,7 +104,7 @@ module.exports.rotate = async (opts) => {
{ type: 'rotate', name: 'v1_traces_storage_policy' }
], db.db)
const _update = (req) => {
return upgradeRequest(db.db, req, true)
return upgradeRequest({ db: db.db, useDefaultDB: true }, req)
}
if (db.samples_days + '' !== settings.v3_samples_days) {
const alterTable = 'ALTER TABLE samples_v3 {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192'
Expand Down
16 changes: 8 additions & 8 deletions lib/db/maintain/scripts.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module.exports.overall = [
`CREATE TABLE IF NOT EXISTS {{DB}}.time_series {{{OnCluster}}} (date Date,fingerprint UInt64,labels String, name String)
ENGINE = {{ReplacingMergeTree}}(date) PARTITION BY date ORDER BY fingerprint`,
ENGINE = {{ReplacingMergeTree}}(date) PARTITION BY date ORDER BY fingerprint {{{CREATE_SETTINGS}}}`,

`CREATE TABLE IF NOT EXISTS {{DB}}.samples_v3 {{{OnCluster}}}
(
Expand All @@ -10,11 +10,11 @@ module.exports.overall = [
string String
) ENGINE = {{MergeTree}}
PARTITION BY toStartOfDay(toDateTime(timestamp_ns / 1000000000))
ORDER BY ({{SAMPLES_ORDER_RUL}})`,
ORDER BY ({{SAMPLES_ORDER_RUL}}) {{{CREATE_SETTINGS}}}`,

`CREATE TABLE IF NOT EXISTS {{DB}}.settings {{{OnCluster}}}
(fingerprint UInt64, type String, name String, value String, inserted_at DateTime64(9, 'UTC'))
ENGINE = {{ReplacingMergeTree}}(inserted_at) ORDER BY fingerprint`,
ENGINE = {{ReplacingMergeTree}}(inserted_at) ORDER BY fingerprint {{{CREATE_SETTINGS}}}`,

'DROP TABLE IF EXISTS {{DB}}.samples_read {{{OnCluster}}}',

Expand All @@ -34,7 +34,7 @@ module.exports.overall = [
key String,
val String,
fingerprint UInt64
) ENGINE = {{ReplacingMergeTree}}() PARTITION BY date ORDER BY (key, val, fingerprint)`,
) ENGINE = {{ReplacingMergeTree}}() PARTITION BY date ORDER BY (key, val, fingerprint) {{{CREATE_SETTINGS}}}`,

`CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.time_series_gin_view {{{OnCluster}}} TO time_series_gin
AS SELECT date, pairs.1 as key, pairs.2 as val, fingerprint
Expand All @@ -54,7 +54,7 @@ module.exports.overall = [
bytes SimpleAggregateFunction(sum, Float64)
) ENGINE = {{AggregatingMergeTree}}
PARTITION BY toDate(toDateTime(intDiv(timestamp_ns, 1000000000)))
ORDER BY (fingerprint, timestamp_ns);`,
ORDER BY (fingerprint, timestamp_ns) {{{CREATE_SETTINGS}}};`,

`CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.metrics_15s_mv {{{OnCluster}}} TO metrics_15s AS
SELECT fingerprint,
Expand Down Expand Up @@ -85,7 +85,7 @@ module.exports.traces = [
payload_type Int8,
payload String
) Engine = {{MergeTree}}() ORDER BY (oid, trace_id, timestamp_ns)
PARTITION BY (oid, toDate(FROM_UNIXTIME(intDiv(timestamp_ns, 1000000000))));`,
PARTITION BY (oid, toDate(FROM_UNIXTIME(intDiv(timestamp_ns, 1000000000)))) {{{CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_attrs_gin {{{OnCluster}}} (
oid String,
Expand All @@ -98,7 +98,7 @@ module.exports.traces = [
duration Int64
) Engine = {{ReplacingMergeTree}}()
PARTITION BY date
ORDER BY (oid, date, key, val, timestamp_ns, trace_id, span_id);`,
ORDER BY (oid, date, key, val, timestamp_ns, trace_id, span_id) {{{CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_kv {{{OnCluster}}} (
oid String,
Expand All @@ -108,7 +108,7 @@ module.exports.traces = [
val String
) Engine = {{ReplacingMergeTree}}()
PARTITION BY (oid, date)
ORDER BY (oid, date, key, val_id)`,
ORDER BY (oid, date, key, val_id) {{{CREATE_SETTINGS}}}`,

`CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.tempo_traces_kv_mv {{{OnCluster}}} TO tempo_traces_kv AS
SELECT oid, date, key, cityHash64(val) % 10000 as val_id, val FROM tempo_traces_attrs_gin`,
Expand Down

0 comments on commit 059110c

Please sign in to comment.