diff --git a/parser/logql.bnf b/parser/logql.bnf index dd09bcf1..7e169c81 100644 --- a/parser/logql.bnf +++ b/parser/logql.bnf @@ -1,4 +1,4 @@ - ::= | | + ::= | | | log_stream_fp_selector ::= "{" *( "," ) "}" log_stream_selector ::= *() @@ -75,3 +75,5 @@ parameterized_unwrapped_expression_fn ::= parameterized_expression ::= "("","(|)")" [ ] parameter_value ::= parameterized_expression_fn ::= + +summary ::= "summary" "(" ")" diff --git a/parser/transpiler.js b/parser/transpiler.js index c6043217..a9fd7037 100644 --- a/parser/transpiler.js +++ b/parser/transpiler.js @@ -28,6 +28,7 @@ const { QrynBadRequest } = require('../lib/handlers/errors') const optimizations = require('./registry/smart_optimizations') const clusterName = require('../common').clusterName const dist = clusterName ? '_dist' : '' +const wasm = require('../wasm_parts/main') /** * @param joinLabels {boolean} @@ -154,9 +155,9 @@ module.exports.transpile = (request) => { } const joinLabels = ['unwrap_function', 'log_range_aggregation', 'aggregation_operator', 'agg_statement', 'user_macro', 'parser_expression', 'label_filter_pipeline', - 'line_format_expression', 'labels_format_expression'].some(t => token.Child(t)) + 'line_format_expression', 'labels_format_expression', 'summary'].some(t => token.Child(t)) let query = module.exports.initQuery(joinLabels) - const limit = request.limit ? request.limit : 2000 + let limit = request.limit ? request.limit : 2000 const order = request.direction === 'forward' ? 'asc' : 'desc' query.orderBy(...query.orderBy().map(o => [o[0], order])) const readTable = samplesReadTableName(start) @@ -167,7 +168,7 @@ module.exports.transpile = (request) => { inline: !!clusterName } let duration = null - const matrixOp = [ + let matrixOp = [ 'aggregation_operator', 'unwrap_function', 'log_range_aggregation', @@ -182,6 +183,7 @@ module.exports.transpile = (request) => { end } } + matrixOp = matrixOp || (token.Child('summary') && 'summary') switch (matrixOp) { case 'aggregation_operator': query = module.exports.transpileAggregationOperator(token, query) @@ -195,6 +197,11 @@ module.exports.transpile = (request) => { case 'parameterized_unwrapped_expression': query = module.exports.transpileParameterizedUnwrappedExpression(token, query) break + case 'summary': + query.ctx.matrix = false + query = module.exports.transpileSummary(token, query, request.limit || 2000) + setQueryParam(query, sharedParamNames.limit, undefined) + break default: // eslint-disable-next-line no-case-declarations const _query = module.exports.transpileLogStreamSelector(token, query) @@ -225,7 +232,7 @@ module.exports.transpile = (request) => { setQueryParam(query, sharedParamNames.from, start + '000000') setQueryParam(query, sharedParamNames.to, end + '000000') setQueryParam(query, 'isMatrix', query.ctx.matrix) - logger.debug(query.toString()) + console.log(query.toString()) return { query: request.rawQuery ? query : query.toString(), matrix: !!query.ctx.matrix, @@ -234,6 +241,84 @@ module.exports.transpile = (request) => { } } +/** + * + * @param request {{ + * query: string, + * limit: number, + * direction: string, + * start: string, + * end: string, + * step: string, + * stream?: (function(DataStream): DataStream)[], + * rawQuery: boolean + * }} + * @returns {{query: string, stream: (function (DataStream): DataStream)[], matrix: boolean, duration: number | undefined}} + */ +module.exports.transpileSummaryETL = (request) => { + const expression = compiler.ParseScript(request.query.trim()) + const root = expression.rootToken + if (!root.Child('summary')) { + throw new QrynBadRequest('request should be a summary expression') + } + const selector = root.Child('log_stream_selector') + const _request = { + ...request, + query: selector.value, + rawQuery: true + } + const byWithout = root.Child('by_without').value + const labels = "['" + root.Child('label_list').Children('label').map(l => l.value).join("','") + "']" + const exp = byWithout === 'by' ? '== 1' : '!= 1' + + const query = module.exports.transpile(_request) + query.query = (new Sql.Select()) + .select( + [new Sql.Raw(`arrayFilter(x -> has(${labels}, x.1) ${exp}, labels)`), 'labels'], + [new Sql.Raw('cityHash64(labels)'), 'fingerprint'], + 'string', + 'timestamp_ns') + .from(query.query) + return { + ...query, + query: query.query.toString() + } +} + +class Subquery extends Sql.Raw { + constructor (sel) { + super() + this.sel = sel + } + + toString () { + return '(' + this.sel + ')' + } +} + +module.exports.transpileSummary = (token, query, limit) => { + query = module.exports.transpileLogStreamSelector(token.Child('log_stream_selector'), query) + query.limit() + query.ctx = query.ctx || {} + query.ctx.stream = query.ctx.stream || [] + const withQ = new Sql.With('sum_a', query) + const guessLevelCHExp = 'map(\'\', \'unknown\', \'debu\', \'debug\', \'info\', \'info\', \'warn\', \'warning\', \'erro\', \'error\', \'crit\', \'critical\', \'fata\', \'fatal\', \'I\', \'info\', \'W\', \'warning\', \'E\', \'error\', \'F\', \'fatal\')[arrayFirst(x -> notEmpty(x) , [lowerUTF8(regexpExtract(sum_a.string, \'(?i)(^|\\\\s|[\\]);|:,.])([\\[(<\\\']|Level=)?(debu|info|warn|erro|crit|fata)\', 3)), extract(sum_a.string, \'^([IWEF])[0-9]{4}(\\\\s|\\\\p{P})\')])]' + query = (new Sql.Select()).with(withQ).select( + [query.getParam(sharedParamNames.to), 'timestamp_ns'], + [new Sql.Raw('[(\'level\', _level)]::Array(Tuple(String,String))'), 'labels'], + [new Sql.Raw("format('{} ({}%): {}', toString(_c), toString(round(toFloat64(_c) / _overall * 100, 3)), min(sum_a.string))"), 'string'], + [new Sql.Raw('0'), 'value'], + '_c', + [new Subquery((new Sql.Select()).select(new Sql.Raw('count()')).from(new Sql.WithReference(withQ))), '_overall'] + ).from(new Sql.WithReference(withQ)) + .groupBy(new Sql.Raw( + '(arrayReduce(\'sum\', arrayMap(x -> cityHash64(lowerUTF8(x[2])), extractAllGroupsVertical(sum_a.string, \'(^|\\\\p{P}|\\\\s)([a-zA-Z]+)(\\\\p{P}|$|\\\\s)\')) as a),' + + ' arrayReduce(\'groupBitXor\', a), toUInt64(arrayProduct(arrayMap(x -> x*2+1, a))), ' + guessLevelCHExp + ' as _level)')) + .orderBy([new Sql.Raw('count() as _c'), 'DESC']) + .limit(limit || 2000) + return query +} + /** * * @param query {Select} diff --git a/test/transpiler.test.js b/test/transpiler.test.js index 9838ac53..fc451999 100644 --- a/test/transpiler.test.js +++ b/test/transpiler.test.js @@ -381,3 +381,7 @@ it('should transpile series', () => { const res = transpiler.transpileSeries(['{test_id="123"}']) expect(res).toMatchSnapshot() }) + +it('should transpile summary', () => { + console.log(transpiler.transpile({query: 'summary({sender="logtest"})'}).query) +})