From 70dd00da43695e589755f1f9deb65b8a9ff6c18b Mon Sep 17 00:00:00 2001 From: akvlad Date: Tue, 17 Oct 2023 13:31:47 +0300 Subject: [PATCH] #fix: distributed table usage --- .../smart_optimizations/optimization_v3_2.js | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/parser/registry/smart_optimizations/optimization_v3_2.js b/parser/registry/smart_optimizations/optimization_v3_2.js index e44670e2..ede56134 100644 --- a/parser/registry/smart_optimizations/optimization_v3_2.js +++ b/parser/registry/smart_optimizations/optimization_v3_2.js @@ -1,9 +1,12 @@ -const { getDuration, dist, Aliased } = require('../common') +const { getDuration, Aliased } = require('../common') const reg = require('./log_range_agg_reg_v3_2') const Sql = require('@cloki/clickhouse-sql') const { DATABASE_NAME, checkVersion } = require('../../../lib/utils') const streamSelectorReg = require('../stream_selector_operator_registry') const aggOpReg = require('../high_level_aggregation_registry') +const { clusterName } = require('../../../common') +const logger = require('../../../lib/logger') +const _dist = clusterName ? '_dist' : '' /** * @@ -46,14 +49,15 @@ module.exports.apply = (token, fromNS, toNS, stepNS) => { : Sql.Gt('samples.timestamp_ns', fromNS) let q = (new Sql.Select()) .select(['samples.fingerprint', 'fingerprint']) - .from([`${DATABASE_NAME()}.metrics_15s`, 'samples']) + .from([`${DATABASE_NAME()}.metrics_15s${_dist}`, 'samples']) .where(tsClause) - q.join(new Aliased(`${DATABASE_NAME()}.time_series${dist}`, 'time_series'), 'left any', + q.join(new Aliased(`${DATABASE_NAME()}.time_series`, 'time_series'), 'left any', Sql.Eq('samples.fingerprint', new Sql.Raw('time_series.fingerprint'))) q.select([new Sql.Raw('any(JSONExtractKeysAndValues(time_series.labels, \'String\'))'), 'labels']) q.ctx = { - step: stepNS / 1000000000 + step: stepNS / 1000000000, + inline: !!clusterName } for (const streamSelectorRule of token.Children('log_stream_selector_rule')) { @@ -68,5 +72,7 @@ module.exports.apply = (token, fromNS, toNS, stepNS) => { q = aggOpReg[aggOp.Child('aggregation_operator_fn').value](aggOp, q) } + logger.debug(q.toString()) + return q }