Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Sep 4, 2024
1 parent ea4b257 commit e1b1f32
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 40 deletions.
46 changes: 46 additions & 0 deletions parser/registry/common.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const { hashLabels, parseLabels } = require('../../common')
const { getPlg } = require('../../plugins/engine')
const Sql = require('@cloki/clickhouse-sql')
const { DATABASE_NAME } = require('../../lib/utils')
const clusterName = require('../../common').clusterName
module.exports.dist = clusterName ? '_dist' : ''

Expand Down Expand Up @@ -413,3 +414,48 @@ module.exports.patchCol = (query, name, patcher) => {
return col
})
}

module.exports.preJoinLabels = (token, query, dist) => {
const from = query.getParam(module.exports.sharedParamNames.from)
const to = query.getParam(module.exports.sharedParamNames.to)
const sqlFrom = new Sql.Raw()
sqlFrom.toString = () => {
let fromNs = 0
if (from.get()) {
fromNs = from.get()
}
return `toDate(fromUnixTimestamp(intDiv(${fromNs}, 1000000000)))`
}
const sqlTo = new Sql.Raw()
sqlTo.toString = () => {
let toNs = 0
if (to.get()) {
toNs = to.get()
}
return `toDate(fromUnixTimestamp(intDiv(${toNs}, 1000000000)))`
}
let withIdxSel = query.with().idx_sel
let inRightSide = new Sql.WithReference(withIdxSel)
if (!withIdxSel) {
withIdxSel = query.with().str_sel
inRightSide = new Sql.Select()
.select('fingerprint')
.from(new Sql.WithReference(withIdxSel))
}
dist = dist || ''
const timeSeriesReq = new Sql.Select()
.select('fingerprint', 'labels')
.from([`${DATABASE_NAME()}.time_series${dist}`, 'time_series'])
.where(new Sql.And(
new Sql.In('time_series.fingerprint', 'in', inRightSide),
Sql.Gte(new Sql.Raw('date'), sqlFrom),
Sql.Lte(new Sql.Raw('date'), sqlTo)
))
timeSeriesReq._toString = timeSeriesReq.toString
timeSeriesReq.toString = () => {
return `(${timeSeriesReq._toString()})`
}
query.join(new module.exports.Aliased(timeSeriesReq, 'time_series'), 'left any',
Sql.Eq('samples.fingerprint', new Sql.Raw('time_series.fingerprint')))
query.select([new Sql.Raw('JSONExtractKeysAndValues(time_series.labels, \'String\')'), 'labels'])
}
7 changes: 3 additions & 4 deletions parser/registry/smart_optimizations/optimization_v3_2.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { getDuration, Aliased } = require('../common')
const { getDuration, preJoinLabels, dist } = require('../common')
const reg = require('./log_range_agg_reg_v3_2')
const Sql = require('@cloki/clickhouse-sql')
const { DATABASE_NAME, checkVersion } = require('../../../lib/utils')
Expand Down Expand Up @@ -51,15 +51,14 @@ module.exports.apply = (token, fromNS, toNS, stepNS) => {
.select(['samples.fingerprint', 'fingerprint'])
.from([`${DATABASE_NAME()}.metrics_15s${_dist}`, 'samples'])
.where(tsClause)
q.join(new Aliased(`${DATABASE_NAME()}.time_series`, 'time_series'), 'left any',
Sql.Eq('samples.fingerprint', new Sql.Raw('time_series.fingerprint')))
q.select([new Sql.Raw('any(JSONExtractKeysAndValues(time_series.labels, \'String\'))'), 'labels'])

q.ctx = {
step: stepNS / 1000000000,
inline: !!clusterName
}

preJoinLabels(token, q, dist)

for (const streamSelectorRule of token.Children('log_stream_selector_rule')) {
q = streamSelectorReg[streamSelectorRule.Child('operator').value](streamSelectorRule, q)
}
Expand Down
42 changes: 23 additions & 19 deletions parser/transpiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const lineFormat = require('./registry/line_format')
const parserRegistry = require('./registry/parser_registry')
const unwrap = require('./registry/unwrap')
const unwrapRegistry = require('./registry/unwrap_registry')
const { durationToMs, sharedParamNames, getStream, Aliased } = require('./registry/common')
const { durationToMs, sharedParamNames, getStream, preJoinLabels } = require('./registry/common')
const compiler = require('./bnf')
const {
parseMs,
Expand Down Expand Up @@ -75,11 +75,6 @@ module.exports.initQuery = (joinLabels, types) => {
.addParam(to)
.addParam(limit)
.addParam(matrix)
if (joinLabels) {
q.join(new Aliased(`${DATABASE_NAME()}.time_series`, 'time_series'), 'left any',
Sql.Eq('samples.fingerprint', new Sql.Raw('time_series.fingerprint')))
q.select([new Sql.Raw('JSONExtractKeysAndValues(time_series.labels, \'String\')'), 'labels'])
}
return q
}

Expand All @@ -88,7 +83,7 @@ module.exports.initQuery = (joinLabels, types) => {
* @param types {[number] || undefined}
* @returns {Select}
*/
module.exports.initQueryV3_2 = (joinLabels, types) => {
/*module.exports.initQueryV3_2 = (joinLabels, types) => {
types = types || [bothType, logType]
const from = new Sql.Parameter(sharedParamNames.from)
const to = new Sql.Parameter(sharedParamNames.to)
Expand All @@ -108,12 +103,13 @@ module.exports.initQueryV3_2 = (joinLabels, types) => {
.addParam(from)
.addParam(to)
if (joinLabels) {
//TODO: fix join
q.join(new Aliased(`${DATABASE_NAME()}.time_series${dist}`, 'time_series'), 'left any',
Sql.Eq('samples.fingerprint', new Sql.Raw('time_series.fingerprint')))
q.select([new Sql.Raw('JSONExtractKeysAndValues(time_series.labels, \'String\')'), 'labels'])
}
return q
}
}*/

/**
*
Expand Down Expand Up @@ -193,6 +189,8 @@ module.exports.transpile = (request) => {
end
}
}
joinLabels && doStreamSelectorOperatorRegistry(token, query)
joinLabels && preJoinLabels(token, query)
matrixOp = matrixOp || (token.Child('summary') && 'summary')
switch (matrixOp) {
case 'aggregation_operator':
Expand Down Expand Up @@ -223,10 +221,9 @@ module.exports.transpile = (request) => {
.orderBy(['labels', order], ['timestamp_ns', order])
setQueryParam(query, sharedParamNames.limit, limit)
if (!joinLabels) {
query.join(new Aliased(`${DATABASE_NAME()}.time_series${dist}`, 'time_series'), 'left any',
Sql.Eq('sel_a.fingerprint', new Sql.Raw('time_series.fingerprint')))
query.select([new Sql.Raw('JSONExtractKeysAndValues(time_series.labels, \'String\')'), 'labels'],
new Sql.Raw('sel_a.*'))
query.from([new Sql.WithReference(query.with().sel_a), 'samples'])
preJoinLabels(token, query, dist)
query.select(new Sql.Raw('samples.*'))
}
}
if (token.Child('agg_statement') && token.Child('compared_agg_statement_cmp')) {
Expand Down Expand Up @@ -381,7 +378,9 @@ module.exports.transpileTail = (request) => {
}
}

let query = module.exports.initQuery(true)
let query = module.exports.initQuery(false)
doStreamSelectorOperatorRegistry(expression.rootToken, query)
preJoinLabels(expression.rootToken, query, dist)
query.ctx = {
...(query.ctx || {}),
legacy: true
Expand All @@ -393,7 +392,6 @@ module.exports.transpileTail = (request) => {
query.order_expressions = []
query.orderBy(['timestamp_ns', 'asc'])
query.limit(undefined, undefined)
//logger.debug(query.toString())
return {
query: request.rawRequest ? query : query.toString(),
stream: getStream(query)
Expand Down Expand Up @@ -496,11 +494,7 @@ module.exports.transpileLogRangeAggregation = (token, query) => {
* @returns {Sql.Select}
*/
module.exports.transpileLogStreamSelector = (token, query) => {
const rules = token.Children('log_stream_selector_rule')
for (const rule of rules) {
const op = rule.Child('operator').value
query = streamSelectorOperatorRegistry[op](rule, query)
}
doStreamSelectorOperatorRegistry(token, query)
for (const pipeline of token.Children('log_pipeline')) {
if (pipeline.Child('line_filter_expression')) {
const op = pipeline.Child('line_filter_operator').value
Expand Down Expand Up @@ -619,3 +613,13 @@ const whereBuilder = (clause) => {
const _clause = clause.slice(1).map(c => Array.isArray(c) ? `(${whereBuilder(c)})` : c)
return _clause.join(` ${op} `)
}

const doStreamSelectorOperatorRegistry = (token, query) => {
if (!query.with().idx_sel && !query.with().str_sel) {
const rules = token.Children('log_stream_selector_rule')
for (const rule of rules) {
const op = rule.Child('operator').value
query = streamSelectorOperatorRegistry[op](rule, query)
}
}
}
Loading

0 comments on commit e1b1f32

Please sign in to comment.