Skip to content

Commit

Permalink
Merge pull request #394 from metrico/summary
Browse files Browse the repository at this point in the history
Summary
  • Loading branch information
akvlad authored Nov 26, 2023
2 parents 027f4c3 + 5603718 commit d44ea39
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 5 deletions.
4 changes: 3 additions & 1 deletion parser/logql.bnf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<SYNTAX> ::= <log_stream_selector> | <agg_statement> | <user_macro>
<SYNTAX> ::= <log_stream_selector> | <agg_statement> | <summary> | <user_macro>

log_stream_fp_selector ::= "{" <OWSP> <log_stream_selector_rule> *(<OWSP> "," <OWSP> <log_stream_selector_rule>) <OWSP> "}"
log_stream_selector ::= <log_stream_fp_selector> <OWSP> *(<OWSP><log_pipeline>)
Expand Down Expand Up @@ -75,3 +75,5 @@ parameterized_unwrapped_expression_fn ::= <parameterized_unwrapped_registry>
parameterized_expression ::= <parameterized_expression_fn><OWSP>"("<OWSP><parameter_value><OWSP>","<OWSP>(<agg_statement>|<parameterized_unwrapped_expression>)<OWSP>")" [<OWSP> <compared_agg_statement_cmp>]
parameter_value ::= <NUMBER>
parameterized_expression_fn ::= <parameterized_aggregation_registry>

summary ::= "summary" <OWSP> "(" <OWSP> <log_stream_selector> <OWSP> ")"
93 changes: 89 additions & 4 deletions parser/transpiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const { QrynBadRequest } = require('../lib/handlers/errors')
const optimizations = require('./registry/smart_optimizations')
const clusterName = require('../common').clusterName
const dist = clusterName ? '_dist' : ''
const wasm = require('../wasm_parts/main')

/**
* @param joinLabels {boolean}
Expand Down Expand Up @@ -154,9 +155,9 @@ module.exports.transpile = (request) => {
}
const joinLabels = ['unwrap_function', 'log_range_aggregation', 'aggregation_operator',
'agg_statement', 'user_macro', 'parser_expression', 'label_filter_pipeline',
'line_format_expression', 'labels_format_expression'].some(t => token.Child(t))
'line_format_expression', 'labels_format_expression', 'summary'].some(t => token.Child(t))
let query = module.exports.initQuery(joinLabels)
const limit = request.limit ? request.limit : 2000
let limit = request.limit ? request.limit : 2000
const order = request.direction === 'forward' ? 'asc' : 'desc'
query.orderBy(...query.orderBy().map(o => [o[0], order]))
const readTable = samplesReadTableName(start)
Expand All @@ -167,7 +168,7 @@ module.exports.transpile = (request) => {
inline: !!clusterName
}
let duration = null
const matrixOp = [
let matrixOp = [
'aggregation_operator',
'unwrap_function',
'log_range_aggregation',
Expand All @@ -182,6 +183,7 @@ module.exports.transpile = (request) => {
end
}
}
matrixOp = matrixOp || (token.Child('summary') && 'summary')
switch (matrixOp) {
case 'aggregation_operator':
query = module.exports.transpileAggregationOperator(token, query)
Expand All @@ -195,6 +197,11 @@ module.exports.transpile = (request) => {
case 'parameterized_unwrapped_expression':
query = module.exports.transpileParameterizedUnwrappedExpression(token, query)
break
case 'summary':
query.ctx.matrix = false
query = module.exports.transpileSummary(token, query, request.limit || 2000)
setQueryParam(query, sharedParamNames.limit, undefined)
break
default:
// eslint-disable-next-line no-case-declarations
const _query = module.exports.transpileLogStreamSelector(token, query)
Expand Down Expand Up @@ -225,7 +232,7 @@ module.exports.transpile = (request) => {
setQueryParam(query, sharedParamNames.from, start + '000000')
setQueryParam(query, sharedParamNames.to, end + '000000')
setQueryParam(query, 'isMatrix', query.ctx.matrix)
logger.debug(query.toString())
console.log(query.toString())
return {
query: request.rawQuery ? query : query.toString(),
matrix: !!query.ctx.matrix,
Expand All @@ -234,6 +241,84 @@ module.exports.transpile = (request) => {
}
}

/**
*
* @param request {{
* query: string,
* limit: number,
* direction: string,
* start: string,
* end: string,
* step: string,
* stream?: (function(DataStream): DataStream)[],
* rawQuery: boolean
* }}
* @returns {{query: string, stream: (function (DataStream): DataStream)[], matrix: boolean, duration: number | undefined}}
*/
module.exports.transpileSummaryETL = (request) => {
const expression = compiler.ParseScript(request.query.trim())
const root = expression.rootToken
if (!root.Child('summary')) {
throw new QrynBadRequest('request should be a summary expression')
}
const selector = root.Child('log_stream_selector')
const _request = {
...request,
query: selector.value,
rawQuery: true
}
const byWithout = root.Child('by_without').value
const labels = "['" + root.Child('label_list').Children('label').map(l => l.value).join("','") + "']"
const exp = byWithout === 'by' ? '== 1' : '!= 1'

const query = module.exports.transpile(_request)
query.query = (new Sql.Select())
.select(
[new Sql.Raw(`arrayFilter(x -> has(${labels}, x.1) ${exp}, labels)`), 'labels'],
[new Sql.Raw('cityHash64(labels)'), 'fingerprint'],
'string',
'timestamp_ns')
.from(query.query)
return {
...query,
query: query.query.toString()
}
}

class Subquery extends Sql.Raw {
constructor (sel) {
super()
this.sel = sel
}

toString () {
return '(' + this.sel + ')'
}
}

module.exports.transpileSummary = (token, query, limit) => {
query = module.exports.transpileLogStreamSelector(token.Child('log_stream_selector'), query)
query.limit()
query.ctx = query.ctx || {}
query.ctx.stream = query.ctx.stream || []
const withQ = new Sql.With('sum_a', query)
const guessLevelCHExp = 'map(\'\', \'unknown\', \'debu\', \'debug\', \'info\', \'info\', \'warn\', \'warning\', \'erro\', \'error\', \'crit\', \'critical\', \'fata\', \'fatal\', \'I\', \'info\', \'W\', \'warning\', \'E\', \'error\', \'F\', \'fatal\')[arrayFirst(x -> notEmpty(x) , [lowerUTF8(regexpExtract(sum_a.string, \'(?i)(^|\\\\s|[\\]);|:,.])([\\[(<\\\']|Level=)?(debu|info|warn|erro|crit|fata)\', 3)), extract(sum_a.string, \'^([IWEF])[0-9]{4}(\\\\s|\\\\p{P})\')])]'
query = (new Sql.Select()).with(withQ).select(
[query.getParam(sharedParamNames.to), 'timestamp_ns'],
[new Sql.Raw('[(\'level\', _level)]::Array(Tuple(String,String))'), 'labels'],
[new Sql.Raw("format('{} ({}%): {}', toString(_c), toString(round(toFloat64(_c) / _overall * 100, 3)), min(sum_a.string))"), 'string'],
[new Sql.Raw('0'), 'value'],
'_c',
[new Subquery((new Sql.Select()).select(new Sql.Raw('count()')).from(new Sql.WithReference(withQ))), '_overall']
).from(new Sql.WithReference(withQ))
.groupBy(new Sql.Raw(
'(arrayReduce(\'sum\', arrayMap(x -> cityHash64(lowerUTF8(x[2])), extractAllGroupsVertical(sum_a.string, \'(^|\\\\p{P}|\\\\s)([a-zA-Z]+)(\\\\p{P}|$|\\\\s)\')) as a),' +
' arrayReduce(\'groupBitXor\', a), toUInt64(arrayProduct(arrayMap(x -> x*2+1, a))), ' + guessLevelCHExp + ' as _level)'))
.orderBy([new Sql.Raw('count() as _c'), 'DESC'])
.limit(limit || 2000)
return query
}

/**
*
* @param query {Select}
Expand Down
4 changes: 4 additions & 0 deletions test/transpiler.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -381,3 +381,7 @@ it('should transpile series', () => {
const res = transpiler.transpileSeries(['{test_id="123"}'])
expect(res).toMatchSnapshot()
})

it('should transpile summary', () => {
console.log(transpiler.transpile({query: 'summary({sender="logtest"})'}).query)
})

0 comments on commit d44ea39

Please sign in to comment.