Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Summary #394

Merged
merged 9 commits into from
Nov 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion parser/logql.bnf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<SYNTAX> ::= <log_stream_selector> | <agg_statement> | <user_macro>
<SYNTAX> ::= <log_stream_selector> | <agg_statement> | <summary> | <user_macro>

log_stream_fp_selector ::= "{" <OWSP> <log_stream_selector_rule> *(<OWSP> "," <OWSP> <log_stream_selector_rule>) <OWSP> "}"
log_stream_selector ::= <log_stream_fp_selector> <OWSP> *(<OWSP><log_pipeline>)
Expand Down Expand Up @@ -75,3 +75,5 @@ parameterized_unwrapped_expression_fn ::= <parameterized_unwrapped_registry>
parameterized_expression ::= <parameterized_expression_fn><OWSP>"("<OWSP><parameter_value><OWSP>","<OWSP>(<agg_statement>|<parameterized_unwrapped_expression>)<OWSP>")" [<OWSP> <compared_agg_statement_cmp>]
parameter_value ::= <NUMBER>
parameterized_expression_fn ::= <parameterized_aggregation_registry>

summary ::= "summary" <OWSP> "(" <OWSP> <log_stream_selector> <OWSP> ")"
93 changes: 89 additions & 4 deletions parser/transpiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const { QrynBadRequest } = require('../lib/handlers/errors')
const optimizations = require('./registry/smart_optimizations')
const clusterName = require('../common').clusterName
const dist = clusterName ? '_dist' : ''
const wasm = require('../wasm_parts/main')

/**
* @param joinLabels {boolean}
Expand Down Expand Up @@ -154,9 +155,9 @@ module.exports.transpile = (request) => {
}
const joinLabels = ['unwrap_function', 'log_range_aggregation', 'aggregation_operator',
'agg_statement', 'user_macro', 'parser_expression', 'label_filter_pipeline',
'line_format_expression', 'labels_format_expression'].some(t => token.Child(t))
'line_format_expression', 'labels_format_expression', 'summary'].some(t => token.Child(t))
let query = module.exports.initQuery(joinLabels)
const limit = request.limit ? request.limit : 2000
let limit = request.limit ? request.limit : 2000
const order = request.direction === 'forward' ? 'asc' : 'desc'
query.orderBy(...query.orderBy().map(o => [o[0], order]))
const readTable = samplesReadTableName(start)
Expand All @@ -167,7 +168,7 @@ module.exports.transpile = (request) => {
inline: !!clusterName
}
let duration = null
const matrixOp = [
let matrixOp = [
'aggregation_operator',
'unwrap_function',
'log_range_aggregation',
Expand All @@ -182,6 +183,7 @@ module.exports.transpile = (request) => {
end
}
}
matrixOp = matrixOp || (token.Child('summary') && 'summary')
switch (matrixOp) {
case 'aggregation_operator':
query = module.exports.transpileAggregationOperator(token, query)
Expand All @@ -195,6 +197,11 @@ module.exports.transpile = (request) => {
case 'parameterized_unwrapped_expression':
query = module.exports.transpileParameterizedUnwrappedExpression(token, query)
break
case 'summary':
query.ctx.matrix = false
query = module.exports.transpileSummary(token, query, request.limit || 2000)
setQueryParam(query, sharedParamNames.limit, undefined)
break
default:
// eslint-disable-next-line no-case-declarations
const _query = module.exports.transpileLogStreamSelector(token, query)
Expand Down Expand Up @@ -225,7 +232,7 @@ module.exports.transpile = (request) => {
setQueryParam(query, sharedParamNames.from, start + '000000')
setQueryParam(query, sharedParamNames.to, end + '000000')
setQueryParam(query, 'isMatrix', query.ctx.matrix)
logger.debug(query.toString())
console.log(query.toString())
return {
query: request.rawQuery ? query : query.toString(),
matrix: !!query.ctx.matrix,
Expand All @@ -234,6 +241,84 @@ module.exports.transpile = (request) => {
}
}

/**
*
* @param request {{
* query: string,
* limit: number,
* direction: string,
* start: string,
* end: string,
* step: string,
* stream?: (function(DataStream): DataStream)[],
* rawQuery: boolean
* }}
* @returns {{query: string, stream: (function (DataStream): DataStream)[], matrix: boolean, duration: number | undefined}}
*/
module.exports.transpileSummaryETL = (request) => {
const expression = compiler.ParseScript(request.query.trim())
const root = expression.rootToken
if (!root.Child('summary')) {
throw new QrynBadRequest('request should be a summary expression')
}
const selector = root.Child('log_stream_selector')
const _request = {
...request,
query: selector.value,
rawQuery: true
}
const byWithout = root.Child('by_without').value
const labels = "['" + root.Child('label_list').Children('label').map(l => l.value).join("','") + "']"
const exp = byWithout === 'by' ? '== 1' : '!= 1'

const query = module.exports.transpile(_request)
query.query = (new Sql.Select())
.select(
[new Sql.Raw(`arrayFilter(x -> has(${labels}, x.1) ${exp}, labels)`), 'labels'],
[new Sql.Raw('cityHash64(labels)'), 'fingerprint'],
'string',
'timestamp_ns')
.from(query.query)
return {
...query,
query: query.query.toString()
}
}

class Subquery extends Sql.Raw {
constructor (sel) {
super()
this.sel = sel
}

toString () {
return '(' + this.sel + ')'
}
}

module.exports.transpileSummary = (token, query, limit) => {
query = module.exports.transpileLogStreamSelector(token.Child('log_stream_selector'), query)
query.limit()
query.ctx = query.ctx || {}
query.ctx.stream = query.ctx.stream || []
const withQ = new Sql.With('sum_a', query)
const guessLevelCHExp = 'map(\'\', \'unknown\', \'debu\', \'debug\', \'info\', \'info\', \'warn\', \'warning\', \'erro\', \'error\', \'crit\', \'critical\', \'fata\', \'fatal\', \'I\', \'info\', \'W\', \'warning\', \'E\', \'error\', \'F\', \'fatal\')[arrayFirst(x -> notEmpty(x) , [lowerUTF8(regexpExtract(sum_a.string, \'(?i)(^|\\\\s|[\\]);|:,.])([\\[(<\\\']|Level=)?(debu|info|warn|erro|crit|fata)\', 3)), extract(sum_a.string, \'^([IWEF])[0-9]{4}(\\\\s|\\\\p{P})\')])]'
query = (new Sql.Select()).with(withQ).select(
[query.getParam(sharedParamNames.to), 'timestamp_ns'],
[new Sql.Raw('[(\'level\', _level)]::Array(Tuple(String,String))'), 'labels'],
[new Sql.Raw("format('{} ({}%): {}', toString(_c), toString(round(toFloat64(_c) / _overall * 100, 3)), min(sum_a.string))"), 'string'],
[new Sql.Raw('0'), 'value'],
'_c',
[new Subquery((new Sql.Select()).select(new Sql.Raw('count()')).from(new Sql.WithReference(withQ))), '_overall']
).from(new Sql.WithReference(withQ))
.groupBy(new Sql.Raw(
'(arrayReduce(\'sum\', arrayMap(x -> cityHash64(lowerUTF8(x[2])), extractAllGroupsVertical(sum_a.string, \'(^|\\\\p{P}|\\\\s)([a-zA-Z]+)(\\\\p{P}|$|\\\\s)\')) as a),' +
' arrayReduce(\'groupBitXor\', a), toUInt64(arrayProduct(arrayMap(x -> x*2+1, a))), ' + guessLevelCHExp + ' as _level)'))
.orderBy([new Sql.Raw('count() as _c'), 'DESC'])
.limit(limit || 2000)
return query
}

/**
*
* @param query {Select}
Expand Down
4 changes: 4 additions & 0 deletions test/transpiler.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -381,3 +381,7 @@ it('should transpile series', () => {
const res = transpiler.transpileSeries(['{test_id="123"}'])
expect(res).toMatchSnapshot()
})

it('should transpile summary', () => {
console.log(transpiler.transpile({query: 'summary({sender="logtest"})'}).query)
})
Loading