Skip to content

Commit

Permalink
Merge branch 'master' into lmangani-golang-xnet
Browse files Browse the repository at this point in the history
  • Loading branch information
lmangani authored Jun 5, 2024
2 parents 4602989 + 78d5eb1 commit 9aae3fc
Show file tree
Hide file tree
Showing 16 changed files with 199 additions and 45 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* **Secure**: Retain total control of data, using **ClickHouse**, **DuckDB** or **InfluxDB** IOx with **S3** object storage
* **Indepentent**: Opensource, Community powered, Anti lock-in alternative to Vendor controlled stacks

![lgtm_vs_qryn](https://github.com/metrico/qryn/assets/1423657/2e9071ba-c578-49fc-be1d-d91944a5891e)
<!-- ![lgtm_vs_qryn](https://github.com/metrico/qryn/assets/1423657/2e9071ba-c578-49fc-be1d-d91944a5891e) -->


<br>
Expand Down
5 changes: 4 additions & 1 deletion jest.config.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
const path = require('path')
module.exports = {
setupFilesAfterEnv: [path.join(__dirname, '/test/jest.setup.js')]
setupFilesAfterEnv: [path.join(__dirname, '/test/jest.setup.js')],
moduleNameMapper: {
'^axios$': 'axios/dist/node/axios.cjs'
}
}
17 changes: 16 additions & 1 deletion lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -1363,12 +1363,27 @@ const samplesReadTable = {
*/
const rawRequest = async (query, data, database, config) => {
try {
if (data && !(Buffer.isBuffer(data) || data instanceof Uint8Array || typeof data === 'string')) {
throw new Error('data must be Buffer, Uint8Array or String: currently the data is: ' + typeof data)
}
if (typeof data === 'string') {
data = Buffer.from(data, 'utf8')
}
if (typeof query !== 'string') {
throw new Error('query must be String: currently the query is: ' + typeof query)
}
const getParams = [
(database ? `database=${encodeURIComponent(database)}` : null),
(data ? `query=${encodeURIComponent(query)}` : null)
].filter(p => p)
const url = `${getClickhouseUrl()}/${getParams.length ? `?${getParams.join('&')}` : ''}`
return await axios.post(url, data || query, config)
config = {
...(config || {}),
method: 'post',
url: url,
data: data || query
}
return await axios(config)
} catch (e) {
logger.error('rawRequest error: ' + query)
e.response?.data && logger.error(e.response.data.toString())
Expand Down
10 changes: 5 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"@qxip/influx-line-protocol-parser": "^0.2.1",
"@qxip/plugnplay": "^3.3.1",
"@stricjs/router": "^5.0.6",
"axios": "^0.28.0",
"axios": "^1.6.8",
"bnf": "^1.0.1",
"csv-writer": "^1.6.0",
"date-fns": "^2.27.0",
Expand Down
2 changes: 1 addition & 1 deletion parsers.js
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ const parsers = {
const parser = find(parsers._parsers, [contentType, req.routeOptions.method, req.routeOptions.url]) ||
find(parsers._parsers, ['*', req.routeOptions.method, req.routeOptions.url])
if (!parser) {
throw new Error('undefined parser')
throw new Error(`undefined parser for ${contentType} ${req.routeOptions.method} ${req.routeOptions.url}`)
}
return await parser(req, payload)
},
Expand Down
2 changes: 1 addition & 1 deletion test/e2e
Submodule e2e updated from 85e344 to 8847ca
28 changes: 13 additions & 15 deletions traceql/clickhouse_transpiler/attr_condition.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { getCompareFn, durationToNs, unquote } = require('./shared')
const { getCompareFn, durationToNs, unquote, bitSet } = require('./shared')
const Sql = require('@cloki/clickhouse-sql')
module.exports = class Builder {
constructor () {
Expand Down Expand Up @@ -71,6 +71,18 @@ module.exports = class Builder {
const having = self.getCond(self.conds)
self.aggregator(sel)
sel.conditions = Sql.And(sel.conditions, Sql.Or(...self.where))
if (Array.isArray(ctx.randomFilter) && Array.isArray(ctx.cachedTraceIds) && ctx.cachedTraceIds.length > 0) {
sel.conditions = Sql.And(
sel.conditions,
Sql.Or(
Sql.Eq(new Sql.Raw(`cityHash64(trace_id) % ${ctx.randomFilter[0]}`), Sql.val(ctx.randomFilter[1])),
new Sql.In('trace_id', 'in', ctx.cachedTraceIds.map(traceId => new Sql.Raw(`unhex('${traceId}')`)))
))
} else if (Array.isArray(ctx.randomFilter)) {
sel.conditions = Sql.And(
sel.conditions,
Sql.Eq(new Sql.Raw(`cityHash64(trace_id) % ${ctx.randomFilter[0]}`), Sql.val(ctx.randomFilter[1])))
}
sel.having(having)
return sel
}
Expand Down Expand Up @@ -248,20 +260,6 @@ function groupBitOr (left, alias) {
return res
}

/**
*
* @param terms
* @returns {SQLObject}
*/
function bitSet (terms) {
const res = new Sql.Raw('')
res.terms = terms
res.toString = () => {
return res.terms.map((t, i) => `bitShiftLeft(toUInt64(${t.toString()}), ${i})`).join('+')
}
return res
}

/**
*
* @param attr {string}
Expand Down
19 changes: 19 additions & 0 deletions traceql/clickhouse_transpiler/attr_condition_eval.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
const attrCondition = require('./attr_condition')
const {bitSet} = require('./shared')
const Sql = require('@cloki/clickhouse-sql')
module.exports = class Builder extends attrCondition {
build () {
const self = this
const superBuild = super.build()
/** @type {BuiltProcessFn} */
const res = (ctx) => {
const sel = superBuild(ctx)
sel.having_conditions = []
sel.aggregations = [bitSet(self.sqlConditions)]
sel.select_list = [[new Sql.Raw('count()'), 'count']]
sel.order_expressions = []
return sel
}
return res
}
}
4 changes: 1 addition & 3 deletions traceql/clickhouse_transpiler/group_by.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ module.exports = standardBuilder((sel, ctx) => {
.with(withMain)
.select(
['trace_id', 'trace_id'],
[new Sql.Raw('groupArray(span_id)'), 'span_id'],
[new Sql.Raw('groupArray(duration)'), 'duration'],
[new Sql.Raw('groupArray(timestamp_ns)'), 'timestamp_ns']
[new Sql.Raw('groupArray(100)(span_id)'), 'span_id']
).from(new Sql.WithReference(withMain))
.groupBy('trace_id')
.orderBy([new Sql.Raw('max(index_search.timestamp_ns)'), 'desc'])
Expand Down
23 changes: 22 additions & 1 deletion traceql/clickhouse_transpiler/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const AttrConditionPlanner = require('./attr_condition')
const AttrConditionEvalPlanner = require('./attr_condition_eval')
const InitIndexPlanner = require('./init')
const IndexGroupByPlanner = require('./group_by')
const AggregatorPlanner = require('./aggregator')
Expand All @@ -8,10 +9,17 @@ const TracesDataPlanner = require('./traces_data')
/**
* @param script {Token}
*/
module.exports = (script) => {
module.exports.transpile = (script) => {
return new Planner(script).plan()
}

/**
* @param script {Token}
*/
module.exports.evaluateCmpl = (script) => {
return new Planner(script).planEval()
}

class Planner {
/**
*
Expand Down Expand Up @@ -53,6 +61,19 @@ class Planner {
return res
}

planEval () {
this.check()
this.analyze()
const res = (new AttrConditionEvalPlanner())
.withTerms(this.termIdx)
.withConditions(this.cond)
.withAggregatedAttr(this.aggregatedAttr)
.withMain((new InitIndexPlanner()).build())
.build()

return res
}

check () {
if (this.script.Children('SYNTAX').length > 1) {
throw new Error('more than one selector is not supported')
Expand Down
6 changes: 4 additions & 2 deletions traceql/clickhouse_transpiler/init.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ const { standardBuilder } = require('./shared')
* limit: number,
* isCluster: boolean,
* tracesTable: string,
* tracesDistTable: string
* tracesDistTable: string,
* randomFilter: number[]|undefined,
* cachedTraceIds: string[]|undefined,
* }} Context
*/
/**
Expand All @@ -21,7 +23,7 @@ const { standardBuilder } = require('./shared')
*/
module.exports = standardBuilder((sel, ctx) => {
return (new Sql.Select()).select(['trace_id', 'trace_id'],
[new Sql.Raw('lower(hex(span_id))'), 'span_id'],
[new Sql.Raw('span_id'), 'span_id'],
[new Sql.Raw('any(duration)'), 'duration'],
[new Sql.Raw('any(timestamp_ns)'), 'timestamp_ns'])
.from([ctx.tracesAttrsTable, 'traces_idx'])
Expand Down
14 changes: 14 additions & 0 deletions traceql/clickhouse_transpiler/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,17 @@ module.exports.standardBuilder = (fn) => {
}
}
}

/**
*
* @param terms {SQLObject[]}
* @returns {SQLObject}
*/
module.exports.bitSet = (terms) => {
const res = new Sql.Raw('')
res.terms = terms
res.toString = () => {
return res.terms.map((t, i) => `bitShiftLeft(toUInt64(${t.toString()}), ${i})`).join('+')
}
return res
}
18 changes: 9 additions & 9 deletions traceql/clickhouse_transpiler/traces_data.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ const processFn = (sel, ctx) => {
const withTraceIds = new Sql.With('trace_ids', (new Sql.Select())
.select('trace_id')
.from(new Sql.WithReference(withMain)))
const withTraceIdsSpanIds = new Sql.With('trace_span_ids', (new Sql.Select())
.select('trace_id', 'span_id')
.from(new Sql.WithReference(withMain))
.join('span_id', 'array'))
return (new Sql.Select())
.with(withMain, withTraceIds)
.with(withMain, withTraceIds, withTraceIdsSpanIds)
.select(
[new Sql.Raw('lower(hex(traces.trace_id))'), 'trace_id'],
[new Sql.Raw('any(index_grouped.span_id)'), 'span_id'],
[new Sql.Raw('any(index_grouped.duration)'), 'duration'],
[new Sql.Raw('any(index_grouped.timestamp_ns)'), 'timestamp_ns'],
[new Sql.Raw(`arrayMap(x -> lower(hex(x)), groupArrayIf(traces.span_id, (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)}))`), 'span_id'],
[new Sql.Raw(`groupArrayIf(traces.duration_ns, (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)})`), 'duration'],
[new Sql.Raw(`groupArrayIf(traces.timestamp_ns, (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)})`), 'timestamp_ns'],
[new Sql.Raw('min(traces.timestamp_ns)'), 'start_time_unix_nano'],
[new Sql.Raw(
'toFloat64(max(traces.timestamp_ns + traces.duration_ns) - min(traces.timestamp_ns)) / 1000000'
), 'duration_ms'],
[new Sql.Raw('argMin(traces.name, traces.timestamp_ns)', 'root_service_name'), 'root_service_name']
).from([table, 'traces']).join(
new Sql.WithReference(withMain),
'left any',
Sql.Eq(new Sql.Raw('traces.trace_id'), new Sql.Raw('index_grouped.trace_id'))
).where(Sql.And(
).from([table, 'traces']).where(Sql.And(
new Sql.In(new Sql.Raw('traces.trace_id'), 'in', new Sql.WithReference(withTraceIds))
)).groupBy('traces.trace_id')
.orderBy(['start_time_unix_nano', 'desc'])
Expand Down
90 changes: 87 additions & 3 deletions traceql/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const parser = require('./parser')
const transpiler = require('./clickhouse_transpiler')
const { transpile, evaluateCmpl } = require('./clickhouse_transpiler')
const logger = require('../lib/logger')
const { DATABASE_NAME } = require('../lib/utils')
const { clusterName } = require('../common')
Expand All @@ -23,10 +23,94 @@ const search = async (query, limit, from, to) => {
tracesAttrsTable: `${_dbname}.tempo_traces_attrs_gin`,
from: from,
to: to,
limit: limit
limit: limit,
randomFilter: null
}
const scrpit = parser.ParseScript(query)
const planner = transpiler(scrpit.rootToken)
const complexity = await evaluateComplexity(ctx, scrpit.rootToken)
let res = []
if (complexity > 10000000) {
res = await processComplexResult(ctx, scrpit.rootToken, complexity)
} else {
res = await processSmallResult(ctx, scrpit.rootToken)
}
res.forEach(t =>
t.spanSets.forEach(
ss => ss.spans.sort(
(a, b) => b.startTimeUnixNano.localeCompare(a.startTimeUnixNano))
)
)
return res
}

/**
*
* @param ctx {Context}
* @param script {Token}
*/
const evaluateComplexity = async (ctx, script) => {
const evaluator = evaluateCmpl(script)
const sql = evaluator(ctx)
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
return response.data.data.reduce((acc, row) => Math.max(acc, row.count), 0)
}

/**
*
* @param ctx {Context}
* @param script {Token}
* @param complexity {number}
*/
async function processComplexResult (ctx, script, complexity) {
const planner = transpile(script)
const maxFilter = Math.floor(complexity / 10000000)
let traces = []
for (let i = 0; i < maxFilter; i++) {
ctx.randomFilter = [maxFilter, i]
const sql = planner(ctx)
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
if (response.data.data.length === parseInt(ctx.limit)) {
const minStart = response.data.data.reduce((acc, row) =>
acc === 0 ? row.start_time_unix_nano : Math.min(acc, row.start_time_unix_nano), 0
)
ctx.from = new Date(Math.floor(minStart / 1000000))
ctx.randomFilter = null
complexity = await evaluateComplexity(ctx, script)
if (complexity <= 10000000) {
return await processSmallResult(ctx, script)
}
ctx.randomFilter = [maxFilter, i]
}
ctx.cachedTraceIds = response.data.data.map(row => row.trace_id)
traces = response.data.data.map(row => ({
traceID: row.trace_id,
rootServiceName: row.root_service_name,
rootTraceName: row.root_trace_name,
startTimeUnixNano: row.start_time_unix_nano,
durationMs: row.duration_ms,
spanSets: [
{
spans: row.span_id.map((spanId, i) => ({
spanID: spanId,
startTimeUnixNano: row.timestamp_ns[i],
durationNanos: row.duration[i],
attributes: []
})),
matched: row.span_id.length
}
]
}))
}
return traces
}

/**
*
* @param ctx {Context}
* @param script {Token}
*/
async function processSmallResult (ctx, script) {
const planner = transpile(script)
const sql = planner(ctx)
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
const traces = response.data.data.map(row => ({
Expand Down
2 changes: 1 addition & 1 deletion traceql/traceql.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ cmp ::= "="|"!="|"<="|">="|"<"|">"
cmp_val ::= <number> [<measurement>]
measurement ::= "ns"|"us"|"ms"|"s"|"m"|"h"|"d"

label_name ::= ("." | <ALPHA> | "_") *("." | <ALPHA> | "_" | <DIGITS>)
label_name ::= ("." | <ALPHA> | "-" | "_") *("." | <ALPHA> | "_" | "-" | <DIGITS>)
number ::= ["-"] <DIGITS> ["." <DIGITS>]

attr_selector ::= <label_name> <OWSP> <op> <OWSP> <value>
Expand Down

0 comments on commit 9aae3fc

Please sign in to comment.