Skip to content

Commit

Permalink
frontend: add new query pruning
Browse files Browse the repository at this point in the history
The current method of excluding/including sub query results in PromQL
by comparing to -Inf or +Inf is no longer valid after
prometheus/prometheus#15245
due to comparison of native histograms to a float with < or > result
in Jeanette's warning, not empty set.

To ease migrating to the correct version, I'm not removing the old
prune code yet, just adding the new method.

The new method uses logical AND operation to intersect the sub query with
either a const vector() or an empty vector(). E.g.

subquery and on() (vector(1)==1)
subquery and on() (vector(-1)==1)

which become:

subquery
(vector(-1)==1)

Signed-off-by: György Krajcsovits <[email protected]>
  • Loading branch information
krajorama committed Nov 26, 2024
1 parent aaae0dd commit 98d8813
Show file tree
Hide file tree
Showing 7 changed files with 467 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
* [ENHANCEMENT] Distributor: Initialize ha_tracker cache before ha_tracker and distributor reach running state and begin serving writes. #9826 #9976
* [ENHANCEMENT] Ingester: `-ingest-storage.kafka.max-buffered-bytes` to limit the memory for buffered records when using concurrent fetching. #9892
* [ENHANCEMENT] Querier: improve performance and memory consumption of queries that select many series. #9914
* [ENHANCEMENT] Query-Frontend: prune `<subquery> and on() (vector(x)==y)` style queries. Triggered by https://github.com/prometheus/prometheus/pull/15245. #10026
* [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508
* [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508
* [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508
Expand Down
119 changes: 119 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/pruning_new.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// SPDX-License-Identifier: AGPL-3.0-only

package astmapper

import (
"context"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/promql/parser"
)

func NewQueryPrunerV2(ctx context.Context, logger log.Logger) ASTMapper {
pruner := newQueryPrunerV2(ctx, logger)
return NewASTExprMapper(pruner)
}

type queryPrunerV2 struct {
ctx context.Context
logger log.Logger
}

func newQueryPrunerV2(ctx context.Context, logger log.Logger) *queryPrunerV2 {
return &queryPrunerV2{
ctx: ctx,
logger: logger,
}
}

func (pruner *queryPrunerV2) MapExpr(expr parser.Expr) (mapped parser.Expr, finished bool, err error) {
if err := pruner.ctx.Err(); err != nil {
return nil, false, err
}

e, ok := expr.(*parser.BinaryExpr)
if !ok {
return expr, false, nil
}

if e.Op != parser.LAND || e.VectorMatching == nil ||
!e.VectorMatching.On || len(e.VectorMatching.MatchingLabels) != 0 {
// Return if not "<lhs> and on() <rhs>"
return expr, false, nil
}

isConst, isEmpty := pruner.isConst(e.RHS)
if !isConst {
return expr, false, nil
}
if isEmpty {
// The right hand side is empty, so the whole expression is empty due to
// "and on()", return the right hand side.
return e.RHS, false, nil
}
// The right hand side is const no empty, so the whole expression is just the
// left side.
return e.LHS, false, nil
}

func (pruner *queryPrunerV2) isConst(expr parser.Expr) (isConst, isEmpty bool) {
var lhs, rhs parser.Expr
switch e := expr.(type) {
case *parser.ParenExpr:
return pruner.isConst(e.Expr)
case *parser.BinaryExpr:
if e.Op != parser.EQLC || e.ReturnBool {
return false, false
}
lhs = e.LHS
rhs = e.RHS
default:
return false, false
}

lIsVector, lValue := pruner.isConstVector(lhs)
if lIsVector {
rIsConst, rValue := pruner.isNumber(rhs)
if rIsConst {
return true, rValue != lValue
}
return false, false
}
var lIsConst bool
lIsConst, lValue = pruner.isNumber(lhs)
if !lIsConst {
return false, false
}
rIsVector, rValue := pruner.isConstVector(rhs)
if !rIsVector {
return false, false
}
return true, lValue != rValue
}

func (pruner *queryPrunerV2) isConstVector(expr parser.Expr) (isVector bool, value float64) {
switch e := expr.(type) {
case *parser.ParenExpr:
return pruner.isConstVector(e.Expr)
case *parser.Call:
if e.Func.Name != "vector" || len(e.Args) != 1 {
return false, 0
}
lit, ok := e.Args[0].(*parser.NumberLiteral)
if !ok {
return false, 0
}
return true, lit.Val
}
return false, 0
}

func (pruner *queryPrunerV2) isNumber(expr parser.Expr) (isNumber bool, value float64) {
switch e := expr.(type) {
case *parser.ParenExpr:
return pruner.isNumber(e.Expr)
case *parser.NumberLiteral:
return true, e.Val
}
return false, 0
}
179 changes: 179 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/pruning_new_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// SPDX-License-Identifier: AGPL-3.0-only

package astmapper

import (
"context"
"testing"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/require"
)

func TestQueryPrunerV2(t *testing.T) {
pruner := NewQueryPrunerV2(context.Background(), log.NewNopLogger())

for _, tt := range []struct {
in string
out string
}{
// Non-prunable expressions.
{
`foo`,
`foo`,
},
{
`foo[1m]`,
`foo[1m]`,
},
{
`foo{bar="baz"}[1m]`,
`foo{bar="baz"}[1m]`,
},
{
`foo{bar="baz"}`,
`foo{bar="baz"}`,
},
{
`quantile(0.9,foo)`,
`quantile(0.9,foo)`,
},
{
`histogram_quantile(0.5, rate(bar1{baz="blip"}[30s]))`,
`histogram_quantile(0.5, rate(bar1{baz="blip"}[30s]))`,
},
{
`count(up)`,
`count(up)`,
},
{
`avg(rate(foo[1m]))`,
`avg(rate(foo[1m]))`,
},
{
`vector(0)`,
`vector(0)`,
},
{
`up < -Inf`,
`up < -Inf`,
},
{
`avg(rate(foo[1m])) < (-Inf)`,
`avg(rate(foo[1m])) < (-Inf)`,
},
{
`Inf * -1`,
`Inf * -1`,
},
{
`avg(rate(foo[1m])) < (-1 * +Inf)`,
`avg(rate(foo[1m])) < (-1 * +Inf)`,
},
{
`(-1 * -Inf) < avg(rate(foo[1m]))`,
`(-1 * -Inf) < avg(rate(foo[1m]))`,
},
{
`vector(0) < -Inf or avg(rate(foo[1m]))`,
`vector(0) < -Inf or avg(rate(foo[1m]))`,
},
{
`avg(rate(foo[1m])) or vector(0) < -Inf`,
`avg(rate(foo[1m])) or vector(0) < -Inf`,
},
{
`avg(rate(foo[1m])) or (vector(0) < -Inf)`,
`avg(rate(foo[1m])) or (vector(0) < -Inf)`,
},
{
`((-1 * -Inf) < avg(rate(foo[2m]))) or avg(rate(foo[1m]))`,
`((-1 * -Inf) < avg(rate(foo[2m]))) or avg(rate(foo[1m]))`,
},
{
`(avg(rate(foo[2m])) < (+1 * -Inf)) or avg(rate(foo[1m]))`,
`(avg(rate(foo[2m])) < (+1 * -Inf)) or avg(rate(foo[1m]))`,
},
{
`((-1 * -Inf) == avg(rate(foo[2m]))) or avg(rate(foo[1m]))`,
`((-1 * -Inf) == avg(rate(foo[2m]))) or avg(rate(foo[1m]))`,
},
{
`avg(rate(foo[1m])) unless ((-1 * -Inf) < avg(rate(foo[2m])))`,
`avg(rate(foo[1m])) unless ((-1 * -Inf) < avg(rate(foo[2m])))`,
},
{
`((2 * +Inf) < avg(rate(foo[2m]))) or avg(rate(foo[1m]))`,
`((2 * +Inf) < avg(rate(foo[2m]))) or avg(rate(foo[1m]))`,
},
{
`(((-1 * -Inf) < avg(rate(foo[3m]))) unless avg(rate(foo[2m]))) or avg(rate(foo[1m]))`,
`(((-1 * -Inf) < avg(rate(foo[3m]))) unless avg(rate(foo[2m]))) or avg(rate(foo[1m]))`,
},
{
`((((-1 * -Inf) < avg(rate(foo[4m]))) unless avg(rate(foo[3m]))) and avg(rate(foo[2m]))) or avg(rate(foo[1m]))`,
`((((-1 * -Inf) < avg(rate(foo[4m]))) unless avg(rate(foo[3m]))) and avg(rate(foo[2m]))) or avg(rate(foo[1m]))`,
},
{
`avg(rate(foo[1m])) or avg(rate(bar[1m]))`,
`avg(rate(foo[1m])) or avg(rate(bar[1m]))`,
},
{ // The const expression is on the wrong side.
`(vector(0) == 1) and on() (avg(rate(foo[1m])))`,
`(vector(0) == 1) and on() (avg(rate(foo[1m])))`,
},
{ // Matching on labels.
`(avg(rate(foo[1m]))) and on(id) (vector(0) == 1)`,
`(avg(rate(foo[1m]))) and on(id) (vector(0) == 1)`,
},
{ // Not "on" expression.
`(avg(rate(foo[1m]))) and ignoring() (vector(0) == 1)`,
`(avg(rate(foo[1m]))) and ignoring() (vector(0) == 1)`,
},
// Pruned expressions.
{
`(avg(rate(foo[1m]))) and on() (vector(0) == 1)`,
`(vector(0) == 1)`,
},
{
`(avg(rate(foo[1m]))) and on() (vector(1) == 1)`,
`(avg(rate(foo[1m])))`,
},
{
`(avg(rate(foo[1m]))) and on() (vector(3) == 4.5)`,
`(vector(3) == 4.5)`,
},
{
`(avg(rate(foo[1m]))) and on() (vector(5.5) == 5.5)`,
`(avg(rate(foo[1m])))`,
},
{
// "and on()" is not on top level, "or" has lower precedence.
`(avg(rate(foo[1m]))) and on() (vector(0) == 1) or avg(rate(bar[1m]))`,
`(vector(0) == 1) or avg(rate(bar[1m]))`,
},
{
// "and on()" is not on top level, due to left-right associativity.
`(avg(rate(foo[1m]))) and on() (vector(0) == 1) and avg(rate(bar[1m]))`,
`(vector(0) == 1) and avg(rate(bar[1m]))`,
},
{
`(avg(rate(foo[1m]))) and on() (vector(0) == 1) or avg(rate(bar[1m])) and on() (vector(1) == 1)`,
`(vector(0) == 1) or avg(rate(bar[1m]))`,
},
} {
tt := tt

t.Run(tt.in, func(t *testing.T) {
expr, err := parser.ParseExpr(tt.in)
require.NoError(t, err)
out, err := parser.ParseExpr(tt.out)
require.NoError(t, err)

mapped, err := pruner.Map(expr)
require.NoError(t, err)
require.Equal(t, out.String(), mapped.String())
})
}
}
74 changes: 74 additions & 0 deletions pkg/frontend/querymiddleware/prune_new.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querymiddleware

import (
"context"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/promql/parser"

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/frontend/querymiddleware/astmapper"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

type pruneMiddlewareV2 struct {
next MetricsQueryHandler
logger log.Logger
}

// newPruneMiddleware creates a middleware that optimises queries by rewriting them to prune away
// unnecessary computations.
func newPruneMiddlewareV2(logger log.Logger) MetricsQueryMiddleware {
return MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler {
return &pruneMiddlewareV2{
next: next,
logger: logger,
}
})
}

func (p *pruneMiddlewareV2) Do(ctx context.Context, r MetricsQueryRequest) (Response, error) {
log := spanlogger.FromContext(ctx, p.logger)

prunedQuery, success, err := p.pruneQuery(ctx, r.GetQuery())
if err != nil {
level.Warn(log).Log("msg", "failed to prune the input query, falling back to the original query", "query", r.GetQuery(), "err", err)

return p.next.Do(ctx, r)
}

if !success {
level.Debug(log).Log("msg", "query pruning had no effect", "query", r.GetQuery())
return p.next.Do(ctx, r)
}

level.Debug(log).Log("msg", "query has been rewritten by pruning", "original", r.GetQuery(), "rewritten", prunedQuery)

updatedReq, err := r.WithQuery(prunedQuery)
if err != nil {
return nil, err
}

return p.next.Do(ctx, updatedReq)
}

func (p *pruneMiddlewareV2) pruneQuery(ctx context.Context, query string) (string, bool, error) {
// Parse the query.
expr, err := parser.ParseExpr(query)
if err != nil {
return "", false, apierror.New(apierror.TypeBadData, DecorateWithParamName(err, "query").Error())
}
origQueryString := expr.String()

mapper := astmapper.NewQueryPrunerV2(ctx, p.logger)
prunedQuery, err := mapper.Map(expr)
if err != nil {
return "", false, err
}
prunedQueryString := prunedQuery.String()

return prunedQueryString, origQueryString != prunedQueryString, nil
}
Loading

0 comments on commit 98d8813

Please sign in to comment.