diff --git a/pkg/api/queryapi/query_api.go b/pkg/api/queryapi/query_api.go index 3e4f7bb49c..ef9ef4e280 100644 --- a/pkg/api/queryapi/query_api.go +++ b/pkg/api/queryapi/query_api.go @@ -16,9 +16,9 @@ import ( "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/httputil" v1 "github.com/prometheus/prometheus/web/api/v1" - "github.com/thanos-io/promql-engine/logicalplan" "github.com/weaveworks/common/httpgrpc" + "github.com/cortexproject/cortex/pkg/distributed_execution" "github.com/cortexproject/cortex/pkg/engine" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/util" @@ -110,7 +110,7 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { byteLP := []byte(r.PostFormValue("plan")) if len(byteLP) != 0 { - logicalPlan, err := logicalplan.Unmarshal(byteLP) + logicalPlan, err := distributed_execution.Unmarshal(byteLP) if err != nil { return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil} } @@ -183,7 +183,7 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { byteLP := []byte(r.PostFormValue("plan")) if len(byteLP) != 0 { - logicalPlan, err := logicalplan.Unmarshal(byteLP) + logicalPlan, err := distributed_execution.Unmarshal(byteLP) if err != nil { return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil} } diff --git a/pkg/distributed_execution/distributed_optimizer.go b/pkg/distributed_execution/distributed_optimizer.go new file mode 100644 index 0000000000..2cf6128818 --- /dev/null +++ b/pkg/distributed_execution/distributed_optimizer.go @@ -0,0 +1,40 @@ +package distributed_execution + +import ( + "fmt" + + "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/promql-engine/logicalplan" +) + +// This is a simplified implementation that only handles binary aggregation cases +// Future versions of the distributed optimizer are expected to: +// - Support more complex query patterns +// - Incorporate diverse optimization strategies +// - Extend support to node types beyond binary operations + +type DistributedOptimizer struct{} + +func (d *DistributedOptimizer) Optimize(root logicalplan.Node) (logicalplan.Node, annotations.Annotations, error) { + warns := annotations.New() + + if root == nil { + return nil, *warns, fmt.Errorf("nil root node") + } + + logicalplan.TraverseBottomUp(nil, &root, func(parent, current *logicalplan.Node) bool { + + if (*current).Type() == logicalplan.BinaryNode { + ch := (*current).Children() + + for _, child := range ch { + temp := (*child).Clone() + *child = NewRemoteNode() + *(*child).Children()[0] = temp + } + } + + return false + }) + return root, *warns, nil +} diff --git a/pkg/distributed_execution/distributed_optimizer_test.go b/pkg/distributed_execution/distributed_optimizer_test.go new file mode 100644 index 0000000000..2c955f4832 --- /dev/null +++ b/pkg/distributed_execution/distributed_optimizer_test.go @@ -0,0 +1,125 @@ +package distributed_execution + +import ( + "testing" + "time" + + "github.com/prometheus/prometheus/promql/parser" + "github.com/stretchr/testify/require" + "github.com/thanos-io/promql-engine/logicalplan" + "github.com/thanos-io/promql-engine/query" +) + +func TestDistributedOptimizer(t *testing.T) { + now := time.Now() + testCases := []struct { + name string + query string + start time.Time + end time.Time + step time.Duration + remoteExecCount int + }{ + { + name: "binary operation with aggregations", + query: "sum(rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])) + sum(rate(node_memory_Active_bytes[5m]))", + start: now, + end: now, + step: time.Minute, + remoteExecCount: 2, + }, + { + name: "multiple binary operations with aggregations", + query: "sum(rate(http_requests_total{job=\"api\"}[5m])) + sum(rate(http_requests_total{job=\"web\"}[5m])) - sum(rate(http_requests_total{job=\"cache\"}[5m]))", + start: now, + end: now, + step: time.Minute, + remoteExecCount: 4, + }, + { + name: "subquery with aggregation", + query: "sum(rate(container_network_transmit_bytes_total[5m:1m]))", + start: now, + end: now, + step: time.Minute, + remoteExecCount: 0, + }, + { + name: "function applied on binary operation", + query: "rate(http_requests_total[5m]) + rate(http_errors_total[5m]) > bool 0", + start: now, + end: now, + step: time.Minute, + remoteExecCount: 4, + }, + { + name: "numerical binary query", + query: "(1 + 1) + (1 + 1)", + start: now, + end: now, + step: time.Minute, + remoteExecCount: 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + lp, _, err := CreateTestLogicalPlan(tc.query, tc.start, tc.end, tc.step) + require.NoError(t, err) + + d := DistributedOptimizer{} + newRoot, _, err := d.Optimize((*lp).Root()) + require.NoError(t, err) + + remoteNodeCount := 0 + logicalplan.TraverseBottomUp(nil, &newRoot, func(parent, current *logicalplan.Node) bool { + if RemoteNode == (*current).Type() { + remoteNodeCount++ + } + return false + }) + require.Equal(t, tc.remoteExecCount, remoteNodeCount) + }) + } +} + +func getStartAndEnd(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) { + if step == 0 { + return start, start + } + return start, end +} + +func CreateTestLogicalPlan(qs string, start time.Time, end time.Time, step time.Duration) (*logicalplan.Plan, query.Options, error) { + + start, end = getStartAndEnd(start, end, step) + + qOpts := query.Options{ + Start: start, + End: end, + Step: step, + StepsBatch: 10, + NoStepSubqueryIntervalFn: func(duration time.Duration) time.Duration { + return 0 + }, + LookbackDelta: 0, + EnablePerStepStats: false, + } + + expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr() + if err != nil { + return nil, qOpts, err + } + + planOpts := logicalplan.PlanOptions{ + DisableDuplicateLabelCheck: false, + } + + logicalPlan, err := logicalplan.NewFromAST(expr, &qOpts, planOpts) + if err != nil { + return nil, qOpts, err + } + optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers) + + return &optimizedPlan, qOpts, nil +} diff --git a/pkg/distributed_execution/fragment_key.go b/pkg/distributed_execution/fragment_key.go new file mode 100644 index 0000000000..f41133fda1 --- /dev/null +++ b/pkg/distributed_execution/fragment_key.go @@ -0,0 +1,21 @@ +package distributed_execution + +type FragmentKey struct { + queryID uint64 + fragmentID uint64 +} + +func MakeFragmentKey(queryID uint64, fragmentID uint64) *FragmentKey { + return &FragmentKey{ + queryID: queryID, + fragmentID: fragmentID, + } +} + +func (f FragmentKey) GetQueryID() uint64 { + return f.queryID +} + +func (f FragmentKey) GetFragmentID() uint64 { + return f.fragmentID +} diff --git a/pkg/distributed_execution/remote_node.go b/pkg/distributed_execution/remote_node.go new file mode 100644 index 0000000000..fb862044c1 --- /dev/null +++ b/pkg/distributed_execution/remote_node.go @@ -0,0 +1,72 @@ +package distributed_execution + +import ( + "encoding/json" + "fmt" + + "github.com/prometheus/prometheus/promql/parser" + "github.com/thanos-io/promql-engine/logicalplan" +) + +type NodeType = logicalplan.NodeType +type Node = logicalplan.Node + +const ( + RemoteNode = "RemoteNode" +) + +// (to verify interface implementations) +var _ logicalplan.Node = (*Remote)(nil) + +type Remote struct { + Op parser.ItemType + Expr Node `json:"-"` + + FragmentKey FragmentKey + FragmentAddr string +} + +func NewRemoteNode() Node { + return &Remote{ + // initialize the fragment key pointer first + FragmentKey: FragmentKey{}, + } +} +func (r *Remote) Clone() Node { + return &Remote{Op: r.Op, Expr: r.Expr.Clone(), FragmentKey: r.FragmentKey, FragmentAddr: r.FragmentAddr} +} +func (r *Remote) Children() []*Node { + return []*Node{&r.Expr} +} +func (r *Remote) String() string { + return fmt.Sprintf("%s%s", r.Op.String(), r.Expr.String()) +} +func (r *Remote) ReturnType() parser.ValueType { + return r.Expr.ReturnType() +} +func (r *Remote) Type() NodeType { return RemoteNode } + +type remote struct { + QueryID uint64 + FragmentID uint64 + FragmentAddr string +} + +func (r *Remote) MarshalJSON() ([]byte, error) { + return json.Marshal(remote{ + QueryID: r.FragmentKey.queryID, + FragmentID: r.FragmentKey.fragmentID, + FragmentAddr: r.FragmentAddr, + }) +} + +func (r *Remote) UnmarshalJSON(data []byte) error { + re := remote{} + if err := json.Unmarshal(data, &re); err != nil { + return err + } + + r.FragmentKey = *MakeFragmentKey(re.QueryID, re.FragmentID) + r.FragmentAddr = re.FragmentAddr + return nil +} diff --git a/pkg/distributed_execution/remote_node_test.go b/pkg/distributed_execution/remote_node_test.go new file mode 100644 index 0000000000..a09694215e --- /dev/null +++ b/pkg/distributed_execution/remote_node_test.go @@ -0,0 +1,76 @@ +package distributed_execution + +import ( + "encoding/json" + "testing" + + "github.com/prometheus/prometheus/promql/parser" + "github.com/stretchr/testify/require" + "github.com/thanos-io/promql-engine/logicalplan" +) + +func TestRemoteNode(t *testing.T) { + t.Run("NewRemoteNode creates valid node", func(t *testing.T) { + node := NewRemoteNode() + require.NotNil(t, node) + require.IsType(t, &Remote{}, node) + require.Equal(t, (&Remote{}).Type(), node.Type()) + }) + + t.Run("Clone creates correct copy", func(t *testing.T) { + original := &Remote{ + Op: parser.ADD, + FragmentKey: FragmentKey{queryID: 1, fragmentID: 2}, + FragmentAddr: "[IP_ADDRESS]:9090", + Expr: &logicalplan.NumberLiteral{Val: 42}, + } + + cloned := original.Clone() + require.NotNil(t, cloned) + + remote, ok := cloned.(*Remote) + require.True(t, ok) + require.Equal(t, original.Op, remote.Op) + require.Equal(t, original.FragmentKey, remote.FragmentKey) + require.Equal(t, original.FragmentAddr, remote.FragmentAddr) + require.Equal(t, original.Expr.String(), remote.Expr.String()) + }) + + t.Run("JSON marshaling/unmarshaling", func(t *testing.T) { + original := &Remote{ + FragmentKey: *MakeFragmentKey(1, 2), + FragmentAddr: "[IP_ADDRESS]:9090", + } + + data, err := json.Marshal(original) + require.NoError(t, err) + + var unmarshaled Remote + err = json.Unmarshal(data, &unmarshaled) + require.NoError(t, err) + + require.Equal(t, original.FragmentKey.queryID, unmarshaled.FragmentKey.queryID) + require.Equal(t, original.FragmentKey.fragmentID, unmarshaled.FragmentKey.fragmentID) + require.Equal(t, original.FragmentAddr, unmarshaled.FragmentAddr) + }) + + t.Run("Children returns correct nodes", func(t *testing.T) { + expr := &logicalplan.NumberLiteral{Val: 42} + node := &Remote{ + Expr: expr, + } + + children := node.Children() + require.Len(t, children, 1) + require.Equal(t, expr, *children[0]) + }) + + t.Run("ReturnType matches expression type", func(t *testing.T) { + expr := &logicalplan.NumberLiteral{Val: 42} + node := &Remote{ + Expr: expr, + } + + require.Equal(t, expr.ReturnType(), node.ReturnType()) + }) +} diff --git a/pkg/distributed_execution/unmarshal.go b/pkg/distributed_execution/unmarshal.go new file mode 100644 index 0000000000..0fdf4c2741 --- /dev/null +++ b/pkg/distributed_execution/unmarshal.go @@ -0,0 +1,192 @@ +package distributed_execution + +import ( + "bytes" + "encoding/json" + "math" + + "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/promql-engine/logicalplan" +) + +type jsonNode struct { + Type logicalplan.NodeType `json:"type"` + Data json.RawMessage `json:"data"` + Children []json.RawMessage `json:"children,omitempty"` +} + +const ( + nanVal = `"NaN"` + infVal = `"+Inf"` + negInfVal = `"-Inf"` +) + +func Unmarshal(data []byte) (logicalplan.Node, error) { + return unmarshalNode(data) +} + +func unmarshalNode(data []byte) (logicalplan.Node, error) { + t := jsonNode{} + if err := json.Unmarshal(data, &t); err != nil { + return nil, err + } + + switch t.Type { + case logicalplan.VectorSelectorNode: + v := &logicalplan.VectorSelector{} + if err := json.Unmarshal(t.Data, v); err != nil { + return nil, err + } + var err error + for i, m := range v.LabelMatchers { + v.LabelMatchers[i], err = labels.NewMatcher(m.Type, m.Name, m.Value) + if err != nil { + return nil, err + } + } + return v, nil + case logicalplan.MatrixSelectorNode: + m := &logicalplan.MatrixSelector{} + if err := json.Unmarshal(t.Data, m); err != nil { + return nil, err + } + vs, err := unmarshalNode(t.Children[0]) + if err != nil { + return nil, err + } + m.VectorSelector = vs.(*logicalplan.VectorSelector) + return m, nil + case logicalplan.AggregationNode: + a := &logicalplan.Aggregation{} + if err := json.Unmarshal(t.Data, a); err != nil { + return nil, err + } + var err error + a.Expr, err = unmarshalNode(t.Children[0]) + if err != nil { + return nil, err + } + if len(t.Children) > 1 { + a.Param, err = unmarshalNode(t.Children[1]) + if err != nil { + return nil, err + } + } + return a, nil + case logicalplan.BinaryNode: + b := &logicalplan.Binary{} + if err := json.Unmarshal(t.Data, b); err != nil { + return nil, err + } + var err error + b.LHS, err = unmarshalNode(t.Children[0]) + if err != nil { + return nil, err + } + b.RHS, err = unmarshalNode(t.Children[1]) + if err != nil { + return nil, err + } + return b, nil + case logicalplan.FunctionNode: + f := &logicalplan.FunctionCall{} + if err := json.Unmarshal(t.Data, f); err != nil { + return nil, err + } + for _, c := range t.Children { + child, err := unmarshalNode(c) + if err != nil { + return nil, err + } + f.Args = append(f.Args, child) + } + return f, nil + case logicalplan.NumberLiteralNode: + n := &logicalplan.NumberLiteral{} + if bytes.Equal(t.Data, []byte(infVal)) { + n.Val = math.Inf(1) + } else if bytes.Equal(t.Data, []byte(negInfVal)) { + n.Val = math.Inf(-1) + } else if bytes.Equal(t.Data, []byte(nanVal)) { + n.Val = math.NaN() + } else { + if err := json.Unmarshal(t.Data, n); err != nil { + return nil, err + } + } + return n, nil + case logicalplan.StringLiteralNode: + s := &logicalplan.StringLiteral{} + if err := json.Unmarshal(t.Data, s); err != nil { + return nil, err + } + return s, nil + case logicalplan.SubqueryNode: + s := &logicalplan.Subquery{} + if err := json.Unmarshal(t.Data, s); err != nil { + return nil, err + } + var err error + s.Expr, err = unmarshalNode(t.Children[0]) + if err != nil { + return nil, err + } + return s, nil + case logicalplan.CheckDuplicateNode: + c := &logicalplan.CheckDuplicateLabels{} + if err := json.Unmarshal(t.Data, c); err != nil { + return nil, err + } + var err error + c.Expr, err = unmarshalNode(t.Children[0]) + if err != nil { + return nil, err + } + return c, nil + case logicalplan.StepInvariantNode: + s := &logicalplan.StepInvariantExpr{} + if err := json.Unmarshal(t.Data, s); err != nil { + return nil, err + } + var err error + s.Expr, err = unmarshalNode(t.Children[0]) + if err != nil { + return nil, err + } + return s, nil + case logicalplan.ParensNode: + p := &logicalplan.Parens{} + if err := json.Unmarshal(t.Data, p); err != nil { + return nil, err + } + var err error + p.Expr, err = unmarshalNode(t.Children[0]) + if err != nil { + return nil, err + } + return p, nil + case logicalplan.UnaryNode: + u := &logicalplan.Unary{} + if err := json.Unmarshal(t.Data, u); err != nil { + return nil, err + } + var err error + u.Expr, err = unmarshalNode(t.Children[0]) + if err != nil { + return nil, err + } + return u, nil + case RemoteNode: + r := &Remote{} + if err := json.Unmarshal(t.Data, r); err != nil { + return nil, err + } + var err error + r.Expr, err = unmarshalNode(t.Children[0]) + if err != nil { + return nil, err + } + return r, nil + } + return nil, nil +} diff --git a/pkg/distributed_execution/unmarshal_test.go b/pkg/distributed_execution/unmarshal_test.go new file mode 100644 index 0000000000..1f294f7bc7 --- /dev/null +++ b/pkg/distributed_execution/unmarshal_test.go @@ -0,0 +1,70 @@ +package distributed_execution + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/thanos-io/promql-engine/logicalplan" +) + +func TestUnmarshalWithLogicalPlan(t *testing.T) { + t.Run("unmarshal complex query plan", func(t *testing.T) { + start := time.Now() + end := start.Add(1 * time.Hour) + step := 15 * time.Second + + testCases := []struct { + name string + query string + }{ + { + name: "binary operation", + query: "http_requests_total + rate(node_cpu_seconds_total[5m])", + }, + { + name: "aggregation", + query: "sum(rate(http_requests_total[5m])) by (job)", + }, + { + name: "complex query", + query: "sum(rate(http_requests_total{job='prometheus'}[5m])) by (job) / sum(rate(node_cpu_seconds_total[5m])) by (job)", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + plan, _, err := CreateTestLogicalPlan(tc.query, start, end, step) + require.NoError(t, err) + require.NotNil(t, plan) + + data, err := logicalplan.Marshal((*plan).Root()) + require.NoError(t, err) + + node, err := Unmarshal(data) + require.NoError(t, err) + require.NotNil(t, node) + + // the logical plan node before and after marshal/unmarshal should be the same + verifyNodeStructure(t, (*plan).Root(), node) + }) + } + }) +} + +func verifyNodeStructure(t *testing.T, expected logicalplan.Node, actual logicalplan.Node) { + require.Equal(t, expected.Type(), actual.Type()) + require.Equal(t, expected.String(), actual.String()) + require.Equal(t, expected.ReturnType(), actual.ReturnType()) + + expectedChildren := expected.Children() + actualChildren := actual.Children() + + require.Equal(t, len(expectedChildren), len(actualChildren)) + + for i := 0; i < len(expectedChildren); i++ { + if expectedChildren[i] != nil && actualChildren[i] != nil { + verifyNodeStructure(t, *expectedChildren[i], *actualChildren[i]) + } + } +} diff --git a/pkg/querier/tripperware/distributed_query.go b/pkg/querier/tripperware/distributed_query.go index 5439a3dc69..bb6ac4ccc5 100644 --- a/pkg/querier/tripperware/distributed_query.go +++ b/pkg/querier/tripperware/distributed_query.go @@ -9,6 +9,8 @@ import ( "github.com/thanos-io/promql-engine/logicalplan" "github.com/thanos-io/promql-engine/query" "github.com/weaveworks/common/httpgrpc" + + "github.com/cortexproject/cortex/pkg/distributed_execution" ) const ( @@ -70,7 +72,14 @@ func (d distributedQueryMiddleware) newLogicalPlan(qs string, start time.Time, e } optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers) - return &optimizedPlan, nil + dOptimizer := distributed_execution.DistributedOptimizer{} + dOptimizedPlanNode, _, err := dOptimizer.Optimize(optimizedPlan.Root()) + if err != nil { + return nil, err + } + lp := logicalplan.New(dOptimizedPlanNode, &qOpts, planOpts) + + return &lp, nil } func (d distributedQueryMiddleware) Do(ctx context.Context, r Request) (Response, error) {