Skip to content

POC Adaptive predicate push down based read plan #7524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

zhuqi-lucas
Copy link
Contributor

@zhuqi-lucas zhuqi-lucas commented May 17, 2025

Which issue does this PR close?

This PR is the Adaptive predicate push down based read plan.

Rationale for this change

Try to improve the performance for default parquet predicate pushdown, make it adaptive.

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added the parquet Changes to the parquet crate label May 17, 2025
@zhuqi-lucas
Copy link
Contributor Author

Here is the benchmark result compared to the original result without this improvement:

critcmp  original_read_plan adaptive_predicate_push_down
group                                adaptive_predicate_push_down           original_read_plan
-----                                ----------------------------           ------------------
arrow_reader_clickbench/async/Q1     1.02  1554.5±18.19µs        ? ?/sec    1.00  1520.4±57.21µs        ? ?/sec
arrow_reader_clickbench/async/Q10    1.00      8.2±0.12ms        ? ?/sec    1.02      8.4±0.34ms        ? ?/sec
arrow_reader_clickbench/async/Q11    1.00      9.7±0.41ms        ? ?/sec    1.02      9.9±0.43ms        ? ?/sec
arrow_reader_clickbench/async/Q12    1.00     23.0±0.89ms        ? ?/sec    1.06     24.4±0.24ms        ? ?/sec
arrow_reader_clickbench/async/Q13    1.00     25.8±0.73ms        ? ?/sec    1.33     34.3±0.62ms        ? ?/sec
arrow_reader_clickbench/async/Q14    1.00     24.9±0.75ms        ? ?/sec    1.36     33.9±0.88ms        ? ?/sec
arrow_reader_clickbench/async/Q19    1.00      2.5±0.14ms        ? ?/sec    1.00      2.5±0.15ms        ? ?/sec
arrow_reader_clickbench/async/Q20    1.00     88.1±1.99ms        ? ?/sec    1.01     89.0±1.61ms        ? ?/sec
arrow_reader_clickbench/async/Q21    1.00    113.9±1.96ms        ? ?/sec    1.01    114.7±1.76ms        ? ?/sec
arrow_reader_clickbench/async/Q22    1.00    220.2±2.70ms        ? ?/sec    1.01    223.3±3.88ms        ? ?/sec
arrow_reader_clickbench/async/Q23    1.02    216.6±8.94ms        ? ?/sec    1.00    212.8±3.62ms        ? ?/sec
arrow_reader_clickbench/async/Q24    1.00     29.1±1.71ms        ? ?/sec    1.22     35.4±0.94ms        ? ?/sec
arrow_reader_clickbench/async/Q27    1.00     91.7±2.11ms        ? ?/sec    1.00     91.8±2.62ms        ? ?/sec
arrow_reader_clickbench/async/Q28    1.02     93.7±3.26ms        ? ?/sec    1.00     92.2±1.68ms        ? ?/sec
arrow_reader_clickbench/async/Q30    1.00     23.4±1.04ms        ? ?/sec    1.91     44.7±1.08ms        ? ?/sec
arrow_reader_clickbench/async/Q36    1.00     87.8±4.27ms        ? ?/sec    1.08     95.0±1.67ms        ? ?/sec
arrow_reader_clickbench/async/Q37    1.00     51.8±2.34ms        ? ?/sec    1.12     57.8±0.43ms        ? ?/sec
arrow_reader_clickbench/async/Q38    1.00     19.5±0.72ms        ? ?/sec    1.03     20.1±0.34ms        ? ?/sec
arrow_reader_clickbench/async/Q39    1.02     23.4±0.58ms        ? ?/sec    1.00     23.1±0.18ms        ? ?/sec
arrow_reader_clickbench/async/Q40    1.00     17.1±0.76ms        ? ?/sec    1.98     33.9±0.91ms        ? ?/sec
arrow_reader_clickbench/async/Q41    1.00     13.6±0.28ms        ? ?/sec    1.80     24.4±0.17ms        ? ?/sec
arrow_reader_clickbench/async/Q42    1.00      7.2±0.11ms        ? ?/sec    1.19      8.7±0.09ms        ? ?/sec
arrow_reader_clickbench/sync/Q1      1.11  1700.8±43.33µs        ? ?/sec    1.00  1525.9±15.20µs        ? ?/sec
arrow_reader_clickbench/sync/Q10     1.04      8.5±0.11ms        ? ?/sec    1.00      8.2±0.09ms        ? ?/sec
arrow_reader_clickbench/sync/Q11     1.05     10.3±0.14ms        ? ?/sec    1.00      9.8±0.14ms        ? ?/sec
arrow_reader_clickbench/sync/Q12     1.00     23.1±0.17ms        ? ?/sec    1.03     23.9±0.63ms        ? ?/sec
arrow_reader_clickbench/sync/Q13     1.00     26.8±0.70ms        ? ?/sec    1.24     33.1±0.85ms        ? ?/sec
arrow_reader_clickbench/sync/Q14     1.00     25.7±0.37ms        ? ?/sec    1.28     32.8±0.38ms        ? ?/sec
arrow_reader_clickbench/sync/Q19     1.04      2.5±0.02ms        ? ?/sec    1.00      2.4±0.02ms        ? ?/sec
arrow_reader_clickbench/sync/Q20     1.02     90.2±2.81ms        ? ?/sec    1.00     88.7±2.72ms        ? ?/sec
arrow_reader_clickbench/sync/Q21     1.02    118.9±2.91ms        ? ?/sec    1.00    116.5±2.18ms        ? ?/sec
arrow_reader_clickbench/sync/Q22     1.00    230.4±3.60ms        ? ?/sec    1.00    229.5±3.06ms        ? ?/sec
arrow_reader_clickbench/sync/Q23     1.01    215.8±7.50ms        ? ?/sec    1.00    213.3±5.45ms        ? ?/sec
arrow_reader_clickbench/sync/Q24     1.00     28.2±0.36ms        ? ?/sec    1.26     35.6±0.29ms        ? ?/sec
arrow_reader_clickbench/sync/Q27     1.01     92.8±1.90ms        ? ?/sec    1.00     91.8±0.80ms        ? ?/sec
arrow_reader_clickbench/sync/Q28     1.00     91.9±2.08ms        ? ?/sec    1.00     92.3±0.57ms        ? ?/sec
arrow_reader_clickbench/sync/Q30     1.00     23.9±0.48ms        ? ?/sec    1.88     44.9±0.28ms        ? ?/sec
arrow_reader_clickbench/sync/Q36     1.00     86.7±1.89ms        ? ?/sec    1.11     96.2±0.63ms        ? ?/sec
arrow_reader_clickbench/sync/Q37     1.00     50.1±0.36ms        ? ?/sec    1.19     59.7±1.30ms        ? ?/sec
arrow_reader_clickbench/sync/Q38     1.00     19.9±0.35ms        ? ?/sec    1.03     20.4±0.59ms        ? ?/sec
arrow_reader_clickbench/sync/Q39     1.00     21.0±0.55ms        ? ?/sec    1.01     21.3±0.20ms        ? ?/sec
arrow_reader_clickbench/sync/Q40     1.00     16.6±0.12ms        ? ?/sec    2.09     34.6±0.24ms        ? ?/sec
arrow_reader_clickbench/sync/Q41     1.00     13.7±0.13ms        ? ?/sec    1.87     25.6±0.70ms        ? ?/sec
arrow_reader_clickbench/sync/Q42     1.00      7.8±0.24ms        ? ?/sec    1.19      9.3±0.06ms        ? ?/sec

if skipped != front.row_count {
return Err(general_err!(
if total < 10 * select_count {
let mut bitmap_builder = BooleanBufferBuilder::new(total);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since many of these read selections come from a BooleanBuffer originally (the result of evaluating a ArrowPredicate), I wonder what you think about potentially avoiding creating RowSelections in the ReadPlan

Something like

enum DecodeRows {
  /// Decode exactly the rows accoding to the row selection
  RowSelection(RowSelection),
  /// Decode all rows, and then apply a filter to keep only the ones that matter
  Bitmask(BooleanBuffer)
}
  
impl ReadPlan {
  // instead of Selection, have DecodeRows
  decode_plan: VecDeque<DecodeRows>;
}

And then the trick would be some sort of heuristic / adaptive approach to turn the result of the ArrowPredicate into a DecodeRows plan

let raw = RowSelection::from_filters(&filters);
Ok(match input_selection {
Some(selection) => selection.and_then(&raw),
None => raw,
})

🤔

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since many of these read selections come from a BooleanBuffer originally (the result of evaluating a ArrowPredicate), I wonder what you think about potentially avoiding creating RowSelections in the ReadPlan

Something like

enum DecodeRows {
  /// Decode exactly the rows accoding to the row selection
  RowSelection(RowSelection),
  /// Decode all rows, and then apply a filter to keep only the ones that matter
  Bitmask(BooleanBuffer)
}
  
impl ReadPlan {
  // instead of Selection, have DecodeRows
  decode_plan: VecDeque<DecodeRows>;
}

And then the trick would be some sort of heuristic / adaptive approach to turn the result of the ArrowPredicate into a DecodeRows plan

let raw = RowSelection::from_filters(&filters);
Ok(match input_selection {
Some(selection) => selection.and_then(&raw),
None => raw,
})

🤔

Thank you @alamb for this suggestion, i was thinking use this way.

But i don't use it for this PR, because the adaptive level is batch level for this PR, for example:

We have batch size 8192, we only do the adaptive for each batch level, and the default is to use range selector, so we will be more accurate for each small level to choose:

8192(2000read 2000skip 4192 read) => keep range
8192(200read 200 skip ....200read..) avg =200 => keep range
8192(5 read 10 skip 10 skip, 5 read, 5read...) => avg < 10 so nee to change the 8192 small window to bitmap
....

So we only change for small window with default range selector to change to bitmap, it's not heavy for most cases.

The adaptive window size is the batch size, not for the total row group len.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8192(5 read 10 skip 10 skip, 5 read, 5read...) => avg < 10 so nee to change the 8192 small window to bitmap

Yeah, what I am thinking is somehow avoid creating the pattern of 5 read 10 skip 5 read 10 skip ... in the first place.

I haven't thought through exactly how to do that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @alamb for checking, i also add doc for latest PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants