Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(engine): Wireframe basic prototype of query executor #16935

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

rfratto
Copy link
Member

@rfratto rfratto commented Mar 27, 2025

This PR introduces an extremely basic wireframe for the query executor. The query executor uses a stream of Apache Arrow records.

Currently, it only supports the Limit plan node.

Additionally, the PR includes an exploration of how to write tests for plan nodes which accept Arrow streams and transform them.

Just to help make the code more visible in Github, the first commit only contains update to vendor; you can look at the second commit to see the prototype.

rfratto added 2 commits March 27, 2025 10:44
This commit introduces a basic prototypal wireframe for the new engine's
query executor, with initial support for the Limit plan node.

The wireframe includes work on a general methodology for how to write
tests for plan nodes that transform a stream of Arrow records.
Comment on lines +32 to +37
// Result denotes a single result from the executor. A result can either be an
// [arrow.Record], or an error.
type Result struct {
val arrow.Record
err error
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about calling this Batch or ResultBatch?

Comment on lines +37 to +40
iters := make([]iter.Seq[Result], len(children))
for i, child := range children {
iters[i] = e.processNode(ctx, child)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my experience with the old engine this is a little tricky because it's not clear when nodes are evaluated and results are created. Is it lazy or not? E.g. the underlying method could aggregate all results into a slice and return that or it could create a result on each yield call.

Comment on lines +43 to +48
switch n := n.(type) {
case *physical.DataObjScan:
e.processDataObjScan(n, iters)(yield)
case *physical.Limit:
e.processLimit(n, iters)(yield)
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about using a visitor pattern like in logsl/syntax/visit.go instead?

Originally I thought pattern matching would do as with the walk implementation. However, there were a few issues with that approach.

  1. The Go compiler does not check for exhaustiveness in pattern matches. This lead to bugs.
  2. The walk order would be somewhat random.
  3. The visitor would give a structure for keeping intermediate results.

I have to admit that I'm not totally convinced the visitor is a great choice especially when one is only interested in specific node types. It did work well for the clone method, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just crossed my mind. How do you feel about a stack based iteration to remove the recursion?

limit = int64(n.Limit)
)

for r := range input[0] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this for over all inputs or maybe flattened inputs?

for r := range input[0] {
rec, err := r.Value()
if err != nil {
yield(errorResult(fmt.Errorf("error reading record: %w", err)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the call stack look like? Isn't it inside out or is Go reversing it?

"fmt"
"iter"

"github.com/apache/arrow-go/v18/arrow/memory"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've mentioned it to @chaudum in a 1:1: I'm a little critical regarding the Apache Arrow and its Go implementation. We've done a hackathon with it and came to two conclusions

  1. The implementation and especially its SIMD based compute kernels weren't ready (that was end of 2023).
  2. Loki's main bottlenecks are string processing and allocations. Numeric vectorization that Arrow etc bring have little impact. I do not know how well the Arrow is with strings and allocations, though.

That sad I do not want to impose and am pretty sure you've considered more options that I have. I just wanted to share my inside 🙂

iters[i] = e.processNode(ctx, child)
}

return func(yield func(Result) bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally find it very hard to reason how iterators work :/

Comment on lines +68 to +71
if len(input) != 1 {
yield(errorResult(errors.New("limit nodes must have exactly one input")))
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it non-intuitive that the validation happens only when the iterator is actually iterated over.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants