diff --git a/CHANGELOG.md b/CHANGELOG.md index cb3c177074..0c4aa60fae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,7 @@ * [ENHANCEMENT] Distributor: Initialize ha_tracker cache before ha_tracker and distributor reach running state and begin serving writes. #9826 #9976 * [ENHANCEMENT] Ingester: `-ingest-storage.kafka.max-buffered-bytes` to limit the memory for buffered records when using concurrent fetching. #9892 * [ENHANCEMENT] Querier: improve performance and memory consumption of queries that select many series. #9914 +* [ENHANCEMENT] Query-Frontend: prune ` and on() (vector(x)==y)` style queries. Triggered by https://github.com/prometheus/prometheus/pull/15245. #10026 * [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508 * [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508 * [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508 diff --git a/pkg/frontend/querymiddleware/astmapper/pruning_new.go b/pkg/frontend/querymiddleware/astmapper/pruning_new.go new file mode 100644 index 0000000000..43ec7e6ccb --- /dev/null +++ b/pkg/frontend/querymiddleware/astmapper/pruning_new.go @@ -0,0 +1,119 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package astmapper + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/prometheus/promql/parser" +) + +func NewQueryPrunerV2(ctx context.Context, logger log.Logger) ASTMapper { + pruner := newQueryPrunerV2(ctx, logger) + return NewASTExprMapper(pruner) +} + +type queryPrunerV2 struct { + ctx context.Context + logger log.Logger +} + +func newQueryPrunerV2(ctx context.Context, logger log.Logger) *queryPrunerV2 { + return &queryPrunerV2{ + ctx: ctx, + logger: logger, + } +} + +func (pruner *queryPrunerV2) MapExpr(expr parser.Expr) (mapped parser.Expr, finished bool, err error) { + if err := pruner.ctx.Err(); err != nil { + return nil, false, err + } + + e, ok := expr.(*parser.BinaryExpr) + if !ok { + return expr, false, nil + } + + if e.Op != parser.LAND || e.VectorMatching == nil || + !e.VectorMatching.On || len(e.VectorMatching.MatchingLabels) != 0 { + // Return if not " and on() " + return expr, false, nil + } + + isConst, isEmpty := pruner.isConst(e.RHS) + if !isConst { + return expr, false, nil + } + if isEmpty { + // The right hand side is empty, so the whole expression is empty due to + // "and on()", return the right hand side. + return e.RHS, false, nil + } + // The right hand side is const no empty, so the whole expression is just the + // left side. + return e.LHS, false, nil +} + +func (pruner *queryPrunerV2) isConst(expr parser.Expr) (isConst, isEmpty bool) { + var lhs, rhs parser.Expr + switch e := expr.(type) { + case *parser.ParenExpr: + return pruner.isConst(e.Expr) + case *parser.BinaryExpr: + if e.Op != parser.EQLC || e.ReturnBool { + return false, false + } + lhs = e.LHS + rhs = e.RHS + default: + return false, false + } + + lIsVector, lValue := pruner.isConstVector(lhs) + if lIsVector { + rIsConst, rValue := pruner.isNumber(rhs) + if rIsConst { + return true, rValue != lValue + } + return false, false + } + var lIsConst bool + lIsConst, lValue = pruner.isNumber(lhs) + if !lIsConst { + return false, false + } + rIsVector, rValue := pruner.isConstVector(rhs) + if !rIsVector { + return false, false + } + return true, lValue != rValue +} + +func (pruner *queryPrunerV2) isConstVector(expr parser.Expr) (isVector bool, value float64) { + switch e := expr.(type) { + case *parser.ParenExpr: + return pruner.isConstVector(e.Expr) + case *parser.Call: + if e.Func.Name != "vector" || len(e.Args) != 1 { + return false, 0 + } + lit, ok := e.Args[0].(*parser.NumberLiteral) + if !ok { + return false, 0 + } + return true, lit.Val + } + return false, 0 +} + +func (pruner *queryPrunerV2) isNumber(expr parser.Expr) (isNumber bool, value float64) { + switch e := expr.(type) { + case *parser.ParenExpr: + return pruner.isNumber(e.Expr) + case *parser.NumberLiteral: + return true, e.Val + } + return false, 0 +} diff --git a/pkg/frontend/querymiddleware/astmapper/pruning_new_test.go b/pkg/frontend/querymiddleware/astmapper/pruning_new_test.go new file mode 100644 index 0000000000..446ae97795 --- /dev/null +++ b/pkg/frontend/querymiddleware/astmapper/pruning_new_test.go @@ -0,0 +1,179 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package astmapper + +import ( + "context" + "testing" + + "github.com/go-kit/log" + "github.com/prometheus/prometheus/promql/parser" + "github.com/stretchr/testify/require" +) + +func TestQueryPrunerV2(t *testing.T) { + pruner := NewQueryPrunerV2(context.Background(), log.NewNopLogger()) + + for _, tt := range []struct { + in string + out string + }{ + // Non-prunable expressions. + { + `foo`, + `foo`, + }, + { + `foo[1m]`, + `foo[1m]`, + }, + { + `foo{bar="baz"}[1m]`, + `foo{bar="baz"}[1m]`, + }, + { + `foo{bar="baz"}`, + `foo{bar="baz"}`, + }, + { + `quantile(0.9,foo)`, + `quantile(0.9,foo)`, + }, + { + `histogram_quantile(0.5, rate(bar1{baz="blip"}[30s]))`, + `histogram_quantile(0.5, rate(bar1{baz="blip"}[30s]))`, + }, + { + `count(up)`, + `count(up)`, + }, + { + `avg(rate(foo[1m]))`, + `avg(rate(foo[1m]))`, + }, + { + `vector(0)`, + `vector(0)`, + }, + { + `up < -Inf`, + `up < -Inf`, + }, + { + `avg(rate(foo[1m])) < (-Inf)`, + `avg(rate(foo[1m])) < (-Inf)`, + }, + { + `Inf * -1`, + `Inf * -1`, + }, + { + `avg(rate(foo[1m])) < (-1 * +Inf)`, + `avg(rate(foo[1m])) < (-1 * +Inf)`, + }, + { + `(-1 * -Inf) < avg(rate(foo[1m]))`, + `(-1 * -Inf) < avg(rate(foo[1m]))`, + }, + { + `vector(0) < -Inf or avg(rate(foo[1m]))`, + `vector(0) < -Inf or avg(rate(foo[1m]))`, + }, + { + `avg(rate(foo[1m])) or vector(0) < -Inf`, + `avg(rate(foo[1m])) or vector(0) < -Inf`, + }, + { + `avg(rate(foo[1m])) or (vector(0) < -Inf)`, + `avg(rate(foo[1m])) or (vector(0) < -Inf)`, + }, + { + `((-1 * -Inf) < avg(rate(foo[2m]))) or avg(rate(foo[1m]))`, + `((-1 * -Inf) < avg(rate(foo[2m]))) or avg(rate(foo[1m]))`, + }, + { + `(avg(rate(foo[2m])) < (+1 * -Inf)) or avg(rate(foo[1m]))`, + `(avg(rate(foo[2m])) < (+1 * -Inf)) or avg(rate(foo[1m]))`, + }, + { + `((-1 * -Inf) == avg(rate(foo[2m]))) or avg(rate(foo[1m]))`, + `((-1 * -Inf) == avg(rate(foo[2m]))) or avg(rate(foo[1m]))`, + }, + { + `avg(rate(foo[1m])) unless ((-1 * -Inf) < avg(rate(foo[2m])))`, + `avg(rate(foo[1m])) unless ((-1 * -Inf) < avg(rate(foo[2m])))`, + }, + { + `((2 * +Inf) < avg(rate(foo[2m]))) or avg(rate(foo[1m]))`, + `((2 * +Inf) < avg(rate(foo[2m]))) or avg(rate(foo[1m]))`, + }, + { + `(((-1 * -Inf) < avg(rate(foo[3m]))) unless avg(rate(foo[2m]))) or avg(rate(foo[1m]))`, + `(((-1 * -Inf) < avg(rate(foo[3m]))) unless avg(rate(foo[2m]))) or avg(rate(foo[1m]))`, + }, + { + `((((-1 * -Inf) < avg(rate(foo[4m]))) unless avg(rate(foo[3m]))) and avg(rate(foo[2m]))) or avg(rate(foo[1m]))`, + `((((-1 * -Inf) < avg(rate(foo[4m]))) unless avg(rate(foo[3m]))) and avg(rate(foo[2m]))) or avg(rate(foo[1m]))`, + }, + { + `avg(rate(foo[1m])) or avg(rate(bar[1m]))`, + `avg(rate(foo[1m])) or avg(rate(bar[1m]))`, + }, + { // The const expression is on the wrong side. + `(vector(0) == 1) and on() (avg(rate(foo[1m])))`, + `(vector(0) == 1) and on() (avg(rate(foo[1m])))`, + }, + { // Matching on labels. + `(avg(rate(foo[1m]))) and on(id) (vector(0) == 1)`, + `(avg(rate(foo[1m]))) and on(id) (vector(0) == 1)`, + }, + { // Not "on" expression. + `(avg(rate(foo[1m]))) and ignoring() (vector(0) == 1)`, + `(avg(rate(foo[1m]))) and ignoring() (vector(0) == 1)`, + }, + // Pruned expressions. + { + `(avg(rate(foo[1m]))) and on() (vector(0) == 1)`, + `(vector(0) == 1)`, + }, + { + `(avg(rate(foo[1m]))) and on() (vector(1) == 1)`, + `(avg(rate(foo[1m])))`, + }, + { + `(avg(rate(foo[1m]))) and on() (vector(3) == 4.5)`, + `(vector(3) == 4.5)`, + }, + { + `(avg(rate(foo[1m]))) and on() (vector(5.5) == 5.5)`, + `(avg(rate(foo[1m])))`, + }, + { + // "and on()" is not on top level, "or" has lower precedence. + `(avg(rate(foo[1m]))) and on() (vector(0) == 1) or avg(rate(bar[1m]))`, + `(vector(0) == 1) or avg(rate(bar[1m]))`, + }, + { + // "and on()" is not on top level, due to left-right associativity. + `(avg(rate(foo[1m]))) and on() (vector(0) == 1) and avg(rate(bar[1m]))`, + `(vector(0) == 1) and avg(rate(bar[1m]))`, + }, + { + `(avg(rate(foo[1m]))) and on() (vector(0) == 1) or avg(rate(bar[1m])) and on() (vector(1) == 1)`, + `(vector(0) == 1) or avg(rate(bar[1m]))`, + }, + } { + tt := tt + + t.Run(tt.in, func(t *testing.T) { + expr, err := parser.ParseExpr(tt.in) + require.NoError(t, err) + out, err := parser.ParseExpr(tt.out) + require.NoError(t, err) + + mapped, err := pruner.Map(expr) + require.NoError(t, err) + require.Equal(t, out.String(), mapped.String()) + }) + } +} diff --git a/pkg/frontend/querymiddleware/prune_new.go b/pkg/frontend/querymiddleware/prune_new.go new file mode 100644 index 0000000000..7d1d0b8c87 --- /dev/null +++ b/pkg/frontend/querymiddleware/prune_new.go @@ -0,0 +1,74 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package querymiddleware + +import ( + "context" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/prometheus/promql/parser" + + apierror "github.com/grafana/mimir/pkg/api/error" + "github.com/grafana/mimir/pkg/frontend/querymiddleware/astmapper" + "github.com/grafana/mimir/pkg/util/spanlogger" +) + +type pruneMiddlewareV2 struct { + next MetricsQueryHandler + logger log.Logger +} + +// newPruneMiddleware creates a middleware that optimises queries by rewriting them to prune away +// unnecessary computations. +func newPruneMiddlewareV2(logger log.Logger) MetricsQueryMiddleware { + return MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler { + return &pruneMiddlewareV2{ + next: next, + logger: logger, + } + }) +} + +func (p *pruneMiddlewareV2) Do(ctx context.Context, r MetricsQueryRequest) (Response, error) { + log := spanlogger.FromContext(ctx, p.logger) + + prunedQuery, success, err := p.pruneQuery(ctx, r.GetQuery()) + if err != nil { + level.Warn(log).Log("msg", "failed to prune the input query, falling back to the original query", "query", r.GetQuery(), "err", err) + + return p.next.Do(ctx, r) + } + + if !success { + level.Debug(log).Log("msg", "query pruning had no effect", "query", r.GetQuery()) + return p.next.Do(ctx, r) + } + + level.Debug(log).Log("msg", "query has been rewritten by pruning", "original", r.GetQuery(), "rewritten", prunedQuery) + + updatedReq, err := r.WithQuery(prunedQuery) + if err != nil { + return nil, err + } + + return p.next.Do(ctx, updatedReq) +} + +func (p *pruneMiddlewareV2) pruneQuery(ctx context.Context, query string) (string, bool, error) { + // Parse the query. + expr, err := parser.ParseExpr(query) + if err != nil { + return "", false, apierror.New(apierror.TypeBadData, DecorateWithParamName(err, "query").Error()) + } + origQueryString := expr.String() + + mapper := astmapper.NewQueryPrunerV2(ctx, p.logger) + prunedQuery, err := mapper.Map(expr) + if err != nil { + return "", false, err + } + prunedQueryString := prunedQuery.String() + + return prunedQueryString, origQueryString != prunedQueryString, nil +} diff --git a/pkg/frontend/querymiddleware/prune_new_test.go b/pkg/frontend/querymiddleware/prune_new_test.go new file mode 100644 index 0000000000..e972053992 --- /dev/null +++ b/pkg/frontend/querymiddleware/prune_new_test.go @@ -0,0 +1,90 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package querymiddleware + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/user" + "github.com/prometheus/prometheus/promql/promqltest" + "github.com/stretchr/testify/require" +) + +func TestQueryPruningV2(t *testing.T) { + numSamples := 100 + seriesName := `test_float` + replacer := strings.NewReplacer("", seriesName, "", fmt.Sprintf("%d", numSamples)) + data := replacer.Replace(` + load 1m + {series="1"} 0+1x + {series="2"} 0+2x + {series="3"} 0+3x + {series="4"} 0+4x + {series="5"} 0+5x + `) + queryable := promqltest.LoadedStorage(t, data) + + const step = 20 * time.Second + + engine := newEngine() + pruningware := newPruneMiddlewareV2( + log.NewNopLogger(), + ) + downstream := &downstreamHandler{ + engine: engine, + queryable: queryable, + } + + type template struct { + query string + IsEmpty bool + } + + templates := []template{ + {`(avg(rate(%s[1m]))) and on() (vector(0) == 1)`, true}, + {`(avg(rate(%s[1m]))) and on() (vector(1) == 1)`, false}, + {`avg(rate(%s[1m])) or (avg(rate(test_float[1m]))) and on() (vector(0) == 1)`, false}, + } + for _, template := range templates { + t.Run(template.query, func(t *testing.T) { + query := fmt.Sprintf(template.query, seriesName) + req := &PrometheusRangeQueryRequest{ + path: "/query_range", + start: 0, + end: int64(numSamples) * time.Minute.Milliseconds(), + step: step.Milliseconds(), + queryExpr: parseQuery(t, query), + } + + injectedContext := user.InjectOrgID(context.Background(), "test") + + // Run the query without pruning. + expectedRes, err := downstream.Do(injectedContext, req) + require.Nil(t, err) + + if !template.IsEmpty { + // Ensure the query produces some results. + require.NotEmpty(t, expectedRes.(*PrometheusResponse).Data.Result) + requireValidSamples(t, expectedRes.(*PrometheusResponse).Data.Result) + } + + // Run the query with pruning. + prunedRes, err := pruningware.Wrap(downstream).Do(injectedContext, req) + require.Nil(t, err) + + if !template.IsEmpty { + // Ensure the query produces some results. + require.NotEmpty(t, prunedRes.(*PrometheusResponse).Data.Result) + requireValidSamples(t, prunedRes.(*PrometheusResponse).Data.Result) + } + + // Ensure the results are approximately equal. + approximatelyEqualsSamples(t, expectedRes.(*PrometheusResponse), prunedRes.(*PrometheusResponse)) + }) + } +} diff --git a/pkg/frontend/querymiddleware/roundtrip.go b/pkg/frontend/querymiddleware/roundtrip.go index 2df612e699..93367fdc5f 100644 --- a/pkg/frontend/querymiddleware/roundtrip.go +++ b/pkg/frontend/querymiddleware/roundtrip.go @@ -437,15 +437,18 @@ func newQueryMiddlewares( if cfg.PrunedQueries { pruneMiddleware := newPruneMiddleware(log) + pruneMiddlewareV2 := newPruneMiddlewareV2(log) queryRangeMiddleware = append( queryRangeMiddleware, newInstrumentMiddleware("pruning", metrics), pruneMiddleware, + pruneMiddlewareV2, ) queryInstantMiddleware = append( queryInstantMiddleware, newInstrumentMiddleware("pruning", metrics), pruneMiddleware, + pruneMiddlewareV2, ) } diff --git a/pkg/frontend/querymiddleware/roundtrip_test.go b/pkg/frontend/querymiddleware/roundtrip_test.go index 65f5216ffc..c504e8706a 100644 --- a/pkg/frontend/querymiddleware/roundtrip_test.go +++ b/pkg/frontend/querymiddleware/roundtrip_test.go @@ -587,6 +587,7 @@ func TestMiddlewaresConsistency(t *testing.T) { "splitInstantQueryByIntervalMiddleware", // Not applicable because specific to instant queries. "stepAlignMiddleware", // Not applicable because remote read requests don't take step in account when running in Mimir. "pruneMiddleware", // No query pruning support. + "pruneMiddlewareV2", // No query pruning support. "experimentalFunctionsMiddleware", // No blocking for PromQL experimental functions as it is executed remotely. }, },