Skip to content

Commit

Permalink
correct processing of {a=""} and {a!=""} cases
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Oct 3, 2024
1 parent 8ddaf1e commit a400919
Showing 1 changed file with 78 additions and 41 deletions.
119 changes: 78 additions & 41 deletions promql/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ module.exports.series = async (query, fromMs, toMs) => {
const fromS = Math.floor(fromMs / 1000)
const toS = Math.floor(toMs / 1000)
const matchers = prometheus.pqlMatchers(query)
const conds = getMatchersIdxCond(matchers[0])
const idx = getIdxSubquery(conds, fromMs, toMs)
const idx = getIdxSubqueryV2(matchers[0], fromMs, toMs)
const withIdx = new Sql.With('idx', idx, !!clusterName)
const req = (new Sql.Select())
.with(withIdx)
Expand All @@ -70,51 +69,90 @@ module.exports.series = async (query, fromMs, toMs) => {
}
}

/**
*
* @param matcher {[string]}
*/
const getMatcherIdxCond = (matcher) => {
const res = [
Sql.Eq('key', matcher[0])
]
switch (matcher[1]) {
case '=':
res.push(Sql.Eq('val', matcher[2]))
break
case '!=':
res.push(Sql.Ne('val', matcher[2]))
break
case '=~':
res.push(Sql.Eq(new Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
break
case '!~':
res.push(Sql.Ne(new Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
}
return res
}

/**
*
* @param matchers {[[string]]}
*/
const getMatchersIdxCond = (matchers) => {
const matchesCond = []
for (const matcher of matchers) {
const _matcher = [
Sql.Eq('key', matcher[0])
]
switch (matcher[1]) {
case '=':
_matcher.push(Sql.Eq('val', matcher[2]))
break
case '!=':
_matcher.push(Sql.Ne('val', matcher[2]))
break
case '=~':
_matcher.push(Sql.Eq(new Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
break
case '!~':
_matcher.push(Sql.Ne(new Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
}
matchesCond.push(Sql.And(..._matcher))
}
return matchesCond
return matchers.map(matcher => Sql.And(...getMatcherIdxCond(matcher)))
}

const getIdxSubquery = (conds, fromMs, toMs) => {
const getIdxSubqueryV2 = (matchers, fromMs, toMs) => {
const fromS = Math.floor(fromMs / 1000)
const toS = Math.floor(toMs / 1000)
return (new Sql.Select())
.select('fingerprint')
.from([DATABASE_NAME() + '.time_series_gin', 'time_series_gin'])
.where(Sql.And(
Sql.Or(...conds),
Sql.Gte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${fromS}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${toS}))`)),
new Sql.In('type', 'in', [bothType, metricType])))
.having(
Sql.Eq(
new Sql.Raw('groupBitOr(' + conds.map(
(m, i) => new Sql.Raw(`bitShiftLeft((${m})::UInt64, ${i})`)
).join('+') + ')'), (1 << conds.length) - 1)
).groupBy('fingerprint')
const nonEmptyMatchers = matchers.filter(m => m[2] !== '')
const emptyMatchers = matchers.filter(m => m[2] === '' && ['=', '!='].includes(m[1]))
let req = null
if (nonEmptyMatchers.length) {
const nonEmptyConds = getMatchersIdxCond(nonEmptyMatchers)
req = (new Sql.Select())
.select('fingerprint')
.from([DATABASE_NAME() + '.time_series_gin', 'time_series_gin'])
.where(Sql.And(
Sql.Or(...nonEmptyConds),
Sql.Gte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${fromS}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${toS}))`)),
new Sql.In('type', 'in', [bothType, metricType])))
.having(
Sql.Eq(
new Sql.Raw('groupBitOr(' + nonEmptyConds.map(
(m, i) => new Sql.Raw(`bitShiftLeft((${m})::UInt64, ${i})`)
).join('+') + ')'), (1 << nonEmptyConds.length) - 1)
).groupBy('fingerprint')
}
if (emptyMatchers.length) {
const emptyConds = emptyMatchers.map(m => {
const visitParamHas = new Sql.Raw('')
visitParamHas.toString = function () {
return `visitParamHas(labels, ${Sql.quoteVal(m[0])})`
}
switch (m[1]) {
case '=':
return Sql.Eq(visitParamHas, new Sql.Raw('0'))
case '!=':
return Sql.Ne(visitParamHas, new Sql.Raw('1'))
default:
return null
}
}).filter(m => !!m)
const emptyReq = (new Sql.Select())
.select('fingerprint')
.from(`time_series${_dist}`)
.where(Sql.And(...emptyConds))
if (nonEmptyMatchers.length) {
const withNonEmptyIdx = new Sql.With('nonEmptyIdx', req, !!clusterName)
emptyReq.with(withNonEmptyIdx)
.where(
new Sql.In('fingerprint', 'in', new Sql.WithReference(withNonEmptyIdx))
)
}
req = emptyReq
}
return req
}

module.exports.getData = async (matchers, fromMs, toMs, subqueries) => {
Expand All @@ -126,8 +164,7 @@ module.exports.getData = async (matchers, fromMs, toMs, subqueries) => {
null, db, { responseType: 'arraybuffer' })
return new Uint8Array(data.data)
}
const matches = getMatchersIdxCond(matchers)
const idx = getIdxSubquery(matches, fromMs, toMs)
const idx = getIdxSubqueryV2(matchers, fromMs, toMs)
const withIdx = new Sql.With('idx', idx, !!clusterName)
const timeSeries = (new Sql.Select())
.select(
Expand All @@ -136,7 +173,7 @@ module.exports.getData = async (matchers, fromMs, toMs, subqueries) => {
).from(DATABASE_NAME() + '.time_series')
.where(Sql.And(
new Sql.In('fingerprint', 'in', new Sql.WithReference(withIdx)),
new Sql.In('type', 'in', [bothType,metricType])))
new Sql.In('type', 'in', [bothType, metricType])))
const withTimeSeries = new Sql.With('timeSeries', timeSeries, !!clusterName)
const raw = (new Sql.Select())
.with(withIdx)
Expand Down

0 comments on commit a400919

Please sign in to comment.