diff --git a/traceql/clickhouse_transpiler/attr_condition.js b/traceql/clickhouse_transpiler/attr_condition.js index 33e45535..131cacd6 100644 --- a/traceql/clickhouse_transpiler/attr_condition.js +++ b/traceql/clickhouse_transpiler/attr_condition.js @@ -1,4 +1,4 @@ -const { getCompareFn, durationToNs, unquote } = require('./shared') +const { getCompareFn, durationToNs, unquote, bitSet } = require('./shared') const Sql = require('@cloki/clickhouse-sql') module.exports = class Builder { constructor () { @@ -71,6 +71,18 @@ module.exports = class Builder { const having = self.getCond(self.conds) self.aggregator(sel) sel.conditions = Sql.And(sel.conditions, Sql.Or(...self.where)) + if (Array.isArray(ctx.randomFilter) && Array.isArray(ctx.cachedTraceIds)) { + sel.conditions = Sql.And( + sel.conditions, + Sql.Or( + Sql.Eq(new Sql.Raw(`cityHash64(trace_id) % ${ctx.randomFilter[0]}`), Sql.val(ctx.randomFilter[1])), + new Sql.In('trace_id', 'in', ctx.cachedTraceIds.map(traceId => new Sql.Raw(`unhex('${traceId}')`))) + )) + } else if (Array.isArray(ctx.randomFilter)) { + sel.conditions = Sql.And( + sel.conditions, + Sql.Eq(new Sql.Raw(`cityHash64(trace_id) % ${ctx.randomFilter[0]}`), Sql.val(ctx.randomFilter[1]))) + } sel.having(having) return sel } @@ -248,20 +260,6 @@ function groupBitOr (left, alias) { return res } -/** - * - * @param terms - * @returns {SQLObject} - */ -function bitSet (terms) { - const res = new Sql.Raw('') - res.terms = terms - res.toString = () => { - return res.terms.map((t, i) => `bitShiftLeft(toUInt64(${t.toString()}), ${i})`).join('+') - } - return res -} - /** * * @param attr {string} diff --git a/traceql/clickhouse_transpiler/attr_condition_eval.js b/traceql/clickhouse_transpiler/attr_condition_eval.js new file mode 100644 index 00000000..43cf2a02 --- /dev/null +++ b/traceql/clickhouse_transpiler/attr_condition_eval.js @@ -0,0 +1,19 @@ +const attrCondition = require('./attr_condition') +const {bitSet} = require('./shared') +const Sql = require('@cloki/clickhouse-sql') +module.exports = class Builder extends attrCondition { + build () { + const self = this + const superBuild = super.build() + /** @type {BuiltProcessFn} */ + const res = (ctx) => { + const sel = superBuild(ctx) + sel.having_conditions = [] + sel.aggregations = [bitSet(self.sqlConditions)] + sel.select_list = [[new Sql.Raw('count()'), 'count']] + sel.order_expressions = [] + return sel + } + return res + } +} diff --git a/traceql/clickhouse_transpiler/index.js b/traceql/clickhouse_transpiler/index.js index e43373cb..f34c009f 100644 --- a/traceql/clickhouse_transpiler/index.js +++ b/traceql/clickhouse_transpiler/index.js @@ -1,4 +1,5 @@ const AttrConditionPlanner = require('./attr_condition') +const AttrConditionEvalPlanner = require('./attr_condition_eval') const InitIndexPlanner = require('./init') const IndexGroupByPlanner = require('./group_by') const AggregatorPlanner = require('./aggregator') @@ -8,10 +9,17 @@ const TracesDataPlanner = require('./traces_data') /** * @param script {Token} */ -module.exports = (script) => { +module.exports.transpile = (script) => { return new Planner(script).plan() } +/** + * @param script {Token} + */ +module.exports.evaluateCmpl = (script) => { + return new Planner(script).planEval() +} + class Planner { /** * @@ -53,6 +61,19 @@ class Planner { return res } + planEval () { + this.check() + this.analyze() + const res = (new AttrConditionEvalPlanner()) + .withTerms(this.termIdx) + .withConditions(this.cond) + .withAggregatedAttr(this.aggregatedAttr) + .withMain((new InitIndexPlanner()).build()) + .build() + + return res + } + check () { if (this.script.Children('SYNTAX').length > 1) { throw new Error('more than one selector is not supported') diff --git a/traceql/clickhouse_transpiler/init.js b/traceql/clickhouse_transpiler/init.js index ff63f4fe..c8a6a902 100644 --- a/traceql/clickhouse_transpiler/init.js +++ b/traceql/clickhouse_transpiler/init.js @@ -9,7 +9,9 @@ const { standardBuilder } = require('./shared') * limit: number, * isCluster: boolean, * tracesTable: string, - * tracesDistTable: string + * tracesDistTable: string, + * randomFilter: number[]|undefined, + * cachedTraceIds: string[]|undefined, * }} Context */ /** diff --git a/traceql/clickhouse_transpiler/shared.js b/traceql/clickhouse_transpiler/shared.js index dacbac44..8869d9d6 100644 --- a/traceql/clickhouse_transpiler/shared.js +++ b/traceql/clickhouse_transpiler/shared.js @@ -82,3 +82,17 @@ module.exports.standardBuilder = (fn) => { } } } + +/** + * + * @param terms {SQLObject[]} + * @returns {SQLObject} + */ +module.exports.bitSet = (terms) => { + const res = new Sql.Raw('') + res.terms = terms + res.toString = () => { + return res.terms.map((t, i) => `bitShiftLeft(toUInt64(${t.toString()}), ${i})`).join('+') + } + return res +} diff --git a/traceql/index.js b/traceql/index.js index c987dce5..3e8293e6 100644 --- a/traceql/index.js +++ b/traceql/index.js @@ -1,5 +1,5 @@ const parser = require('./parser') -const transpiler = require('./clickhouse_transpiler') +const { transpile, evaluateCmpl } = require('./clickhouse_transpiler') const logger = require('../lib/logger') const { DATABASE_NAME } = require('../lib/utils') const { clusterName } = require('../common') @@ -23,10 +23,87 @@ const search = async (query, limit, from, to) => { tracesAttrsTable: `${_dbname}.tempo_traces_attrs_gin`, from: from, to: to, - limit: limit + limit: limit, + randomFilter: null } const scrpit = parser.ParseScript(query) - const planner = transpiler(scrpit.rootToken) + const complexity = await evaluateComplexity(ctx, scrpit.rootToken) + if (complexity > 10000000) { + return await processComplexResult(ctx, scrpit.rootToken, complexity) + } + return await processSmallResult(ctx, scrpit.rootToken) +} + +/** + * + * @param ctx {Context} + * @param script {Token} + */ +const evaluateComplexity = async (ctx, script) => { + const evaluator = evaluateCmpl(script) + const sql = evaluator(ctx) + const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME()) + return response.data.data.reduce((acc, row) => Math.max(acc, row.count), 0) +} + +/** + * + * @param ctx {Context} + * @param script {Token} + * @param complexity {number} + */ +async function processComplexResult (ctx, script, complexity) { + const planner = transpile(script) + const maxFilter = Math.floor(complexity / 10000000) + let traces = [] + for (let i = 0; i < maxFilter; i++) { + ctx.randomFilter = [maxFilter, i] + let sql = planner(ctx) + let response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME()) + if (response.data.data.length === parseInt(ctx.limit)) { + const minStart = response.data.data.reduce((acc, row) => + acc === 0 ? row.start_time_unix_nano : Math.min(acc, row.start_time_unix_nano), 0 + ) + ctx.from = new Date(Math.floor(minStart / 1000000)) + ctx.randomFilter = null + complexity = await evaluateComplexity(ctx, script) + if (complexity <= 10000000) { + return await processSmallResult(ctx, script) + } + ctx.randomFilter = [maxFilter, i] + } + ctx.cachedTraceIds = response.data.data.map(row => row.trace_id) + sql = planner(ctx) + response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME()) + traces = response.data.data.map(row => ({ + traceID: row.trace_id, + rootServiceName: row.root_service_name, + rootTraceName: row.root_trace_name, + startTimeUnixNano: row.start_time_unix_nano, + durationMs: row.duration_ms, + spanSets: [ + { + spans: row.span_id.map((spanId, i) => ({ + spanID: spanId, + startTimeUnixNano: row.timestamp_ns[i], + durationNanos: row.duration[i], + attributes: [] + })), + matched: row.span_id.length + } + ] + })) + } + return traces +} + +/** + * + * @param ctx {Context} + * @param script {Token} + */ +async function processSmallResult (ctx, script) { + const planner = transpile(script) const sql = planner(ctx) const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME()) const traces = response.data.data.map(row => ({