Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dist support skip_unavailable_shards and custom settings #579

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const { getClickhouseUrl } = require('./clickhouse_options')

// External Storage Policy for Tables (S3, MINIO)
const storagePolicy = process.env.STORAGE_POLICY || false
// Clickhouse Distributed Engine setting to skip unavailable shards
const skipUnavailableShards = process.env.SKIP_UNAVAILABLE_SHARDS || false

const { StringStream, DataStream } = require('scramjet')

Expand Down Expand Up @@ -188,7 +190,7 @@ const initialize = async function (dbName) {
}
if (!isOmitTablesCreation()) {
const maintain = require('./maintain/index')
await maintain.upgrade({ name: dbName, storage_policy: storagePolicy })
await maintain.upgrade({ name: dbName, storage_policy: storagePolicy, skip_unavailable_shards: skipUnavailableShards })
await maintain.rotate([{
db: dbName,
samples_days: rotationSamples,
Expand Down
24 changes: 13 additions & 11 deletions lib/db/maintain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ const getEnv = () => {

/**
*
* @param db {{name: string, storage_policy: string}}
* @param db {{name: string, storage_policy: string, skip_unavailable_shards: boolean}}
* @returns {Promise<void>}
*/
module.exports.upgrade = async (db) => {
await upgradeSingle(db.name, 1, scripts.overall, db.storage_policy)
await upgradeSingle(db.name, 2, scripts.traces, db.storage_policy)
await upgradeSingle(db.name, 5, scripts.profiles, db.storage_policy)
await upgradeSingle(db.name, 1, scripts.overall, db.storage_policy, false)
await upgradeSingle(db.name, 2, scripts.traces, db.storage_policy, false)
await upgradeSingle(db.name, 5, scripts.profiles, db.storage_policy, false)
if (db.storage_policy) {
await client.addSetting('rotate', 'v3_storage_policy', db.storage_policy, db.name)
await client.addSetting('rotate', 'v1_traces_storage_policy', db.storage_policy, db.name)
}
if (clusterName) {
await upgradeSingle(db.name, 3, scripts.overall_dist, db.storage_policy)
await upgradeSingle(db.name, 4, scripts.traces_dist, db.storage_policy)
await upgradeSingle(db.name, 6, scripts.profiles_dist, db.storage_policy)
await upgradeSingle(db.name, 3, scripts.overall_dist, db.storage_policy, db.skip_unavailable_shards)
await upgradeSingle(db.name, 4, scripts.traces_dist, db.storage_policy, db.skip_unavailable_shards)
await upgradeSingle(db.name, 6, scripts.profiles_dist, db.storage_policy, db.skip_unavailable_shards)
}
}

Expand All @@ -39,10 +39,11 @@ let isDBCreated = false
* @param key {number}
* @param scripts {string[]}
* @param storagePolicy {string}
* @param skipUnavailableShards {boolean}
*/
const upgradeSingle = async (db, key, scripts, storagePolicy) => {
const upgradeSingle = async (db, key, scripts, storagePolicy, skipUnavailableShards) => {
const _upgradeRequest = (request, useDefaultDB, updateVer) => {
return upgradeRequest({ db, useDefaultDB, updateVer, storage_policy: storagePolicy }, request)
return upgradeRequest({ db, useDefaultDB, updateVer, storage_policy: storagePolicy, skip_unavailable_shards: skipUnavailableShards }, request)
}
if (!isDBCreated) {
isDBCreated = true
Expand All @@ -68,7 +69,7 @@ const upgradeSingle = async (db, key, scripts, storagePolicy) => {
}

/**
* @param opts {{db: string, useDefaultDB: boolean, updateVer: {key: number, to: number}, storage_policy: string}}
* @param opts {{db: string, useDefaultDB: boolean, updateVer: {key: number, to: number}, storage_policy: string, skip_unavailable_shards: boolean}}
* @param request {string} database to update
* @returns {Promise<void>}
*/
Expand All @@ -83,7 +84,8 @@ const upgradeRequest = async (opts, request) => {
MergeTree: clusterName ? 'ReplicatedMergeTree' : 'MergeTree',
ReplacingMergeTree: clusterName ? 'ReplicatedReplacingMergeTree' : 'ReplacingMergeTree',
AggregatingMergeTree: clusterName ? 'ReplicatedAggregatingMergeTree' : 'AggregatingMergeTree',
CREATE_SETTINGS: opts.storage_policy ? `SETTINGS storage_policy='${opts.storage_policy}'` : ''
CREATE_SETTINGS: opts.storage_policy ? `SETTINGS storage_policy='${opts.storage_policy}'` : '',
DIST_CREATE_SETTINGS: opts.skip_unavailable_shards ? `SETTINGS skip_unavailable_shards=1` : ''
})
await client.rawRequest(request, null, opts.useDefaultDB ? opts.db : undefined)
if (opts.updateVer) {
Expand Down
22 changes: 11 additions & 11 deletions lib/db/maintain/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ module.exports.overall_dist = [
\`count\` AggregateFunction(count),
\`sum\` SimpleAggregateFunction(sum, Float64),
\`bytes\` SimpleAggregateFunction(sum, Float64)
) ENGINE = Distributed('{{CLUSTER}}', '{{DB}}', 'metrics_15s', fingerprint)`,
) ENGINE = Distributed('{{CLUSTER}}', '{{DB}}', 'metrics_15s', fingerprint) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.samples_v3_dist {{{OnCluster}}} (
\`fingerprint\` UInt64,
Expand All @@ -221,22 +221,22 @@ module.exports.overall_dist = [
\`fingerprint\` UInt64,
\`labels\` String,
\`name\` String
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series', fingerprint);`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series', fingerprint) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.settings_dist {{{OnCluster}}} (
\`fingerprint\` UInt64,
\`type\` String,
\`name\` String,
\`value\` String,
\`inserted_at\` DateTime64(9, 'UTC')
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'settings', rand());`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'settings', rand()) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.time_series_gin_dist {{{OnCluster}}} (
date Date,
key String,
val String,
fingerprint UInt64
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series_gin', rand());`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series_gin', rand()) {{{DIST_CREATE_SETTINGS}}};`,

'ALTER TABLE {{DB}}.metrics_15s_dist {{{OnCluster}}} ADD COLUMN IF NOT EXISTS `type` UInt8;',

Expand All @@ -254,7 +254,7 @@ module.exports.traces_dist = [
key String,
val_id String,
val String
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_kv', sipHash64(oid, key));`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_kv', sipHash64(oid, key)) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_dist {{{OnCluster}}} (
oid String,
Expand All @@ -267,7 +267,7 @@ module.exports.traces_dist = [
service_name String,
payload_type Int8,
payload String,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces', sipHash64(oid, trace_id));`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces', sipHash64(oid, trace_id)) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_attrs_gin_dist {{{OnCluster}}} (
oid String,
Expand All @@ -278,7 +278,7 @@ module.exports.traces_dist = [
span_id FixedString(8),
timestamp_ns Int64,
duration Int64
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_attrs_gin', sipHash64(oid, trace_id));`
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_attrs_gin', sipHash64(oid, trace_id)) {{{DIST_CREATE_SETTINGS}}};`
]

module.exports.profiles = [
Expand Down Expand Up @@ -440,15 +440,15 @@ module.exports.profiles_dist = [
payload_type LowCardinality(String),
payload String,
values_agg Array(Tuple(String, Int64, Int32))
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles', fingerprint);`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles', fingerprint) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_dist {{{OnCluster}}} (
date Date,
type_id LowCardinality(String),
service_name LowCardinality(String),
fingerprint UInt64 CODEC(DoubleDelta, ZSTD(1)),
tags Array(Tuple(String, String)) CODEC(ZSTD(1))
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series',fingerprint);`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series',fingerprint) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_gin_dist {{{OnCluster}}} (
date Date,
Expand All @@ -457,14 +457,14 @@ module.exports.profiles_dist = [
type_id LowCardinality(String),
service_name LowCardinality(String),
fingerprint UInt64 CODEC(DoubleDelta, ZSTD(1))
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_gin',fingerprint);`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_gin',fingerprint) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_keys_dist {{{OnCluster}}} (
date Date,
key String,
val String,
val_id UInt64
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand());`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand()) {{{DIST_CREATE_SETTINGS}}};`,

`ALTER TABLE {{DB}}.profiles_dist {{{OnCluster}}}
ADD COLUMN IF NOT EXISTS \`tree\` Array(Tuple(UInt64, UInt64, UInt64, Array(Tuple(String, Int64, Int64)))),
Expand Down
Loading