Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Filter Pushdown Rule #140

Merged
merged 46 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8f09e0e
Add recursive logic, and add filter/sort transpose logic
jurplel Feb 23, 2024
e84a576
change a couple notes
jurplel Feb 23, 2024
097029a
Modularize projection rewriting code from ProjectionPullUpJoin rule (…
jurplel Mar 12, 2024
b5e881f
Hardcoded filter/sort test
jurplel Mar 17, 2024
67e9632
push past projection w/ broken test
jurplel Mar 17, 2024
330cdac
Implement filter merge
jurplel Mar 17, 2024
64d5572
More prog on join
jurplel Mar 18, 2024
6020a31
merge
jurplel Mar 18, 2024
bc100ed
Publicize flattened_nested_logical
jurplel Mar 18, 2024
abbd6b7
Fix filter merge
jurplel Mar 18, 2024
f78f37b
Join progress
jurplel Mar 18, 2024
9c9c515
Add dummy cost model
jurplel Mar 21, 2024
f4a8506
dummmy catalog + dummy optimizer method
jurplel Mar 21, 2024
e7d4c35
Join test start
jurplel Mar 22, 2024
609233d
Filter pushdown is beginning to work
jurplel Mar 23, 2024
bb3d1a2
Working testing infra + all tests working!
jurplel Mar 25, 2024
9cb0894
Fix projection mistake
jurplel Mar 25, 2024
038db19
restructure predicate traversal
jurplel Mar 25, 2024
5023c2a
Push filter past agg
jurplel Mar 25, 2024
dacc205
Merge w/ main
jurplel Mar 25, 2024
924649b
Remove print
jurplel Mar 25, 2024
16650a3
two stage heuristic
AveryQi115 Mar 26, 2024
72e1484
Merge remote-tracking branch 'origin/main' into bowad/filter_pushdown
jurplel Mar 26, 2024
7d1768e
merge w/ two-stage
jurplel Mar 26, 2024
5d71f67
Revise testing infra and port filter pushdown to it
jurplel Mar 26, 2024
927717f
Make categorize_conds_helper more flexible
jurplel Mar 26, 2024
2e906c8
Fix crashes related to List
jurplel Mar 26, 2024
88397f4
Improve generation of singleton expressions
jurplel Mar 26, 2024
7f2f942
Update tpc-h sqlplannertest with filter pushdown run
jurplel Mar 26, 2024
52f9408
non-merging merge
jurplel Mar 26, 2024
c701b14
More polish + rewrite a join helper as rewrite_column_refs
jurplel Mar 27, 2024
73e4e23
rewrite one more thing as rewrite_column_refs
jurplel Mar 27, 2024
4b4322a
clippy fix
jurplel Mar 27, 2024
b407d8a
fmt fix
jurplel Mar 27, 2024
db56f27
Fmt fix again
jurplel Mar 27, 2024
e60e3eb
Additional documentation for rewrite_column_refs
jurplel Mar 27, 2024
5cae5b3
Remove old TODOs in filter_pushdown.rs
jurplel Mar 27, 2024
9fdaa25
Separate filter pushdown rules (includes helper modifications)
jurplel Mar 28, 2024
e84ffd9
merge for some reason
jurplel Mar 28, 2024
6119dbb
Get children from groups, now
jurplel Mar 28, 2024
53098e6
Change rules to cost-based
jurplel Mar 28, 2024
d3b1882
Missed a spot with group conversions
jurplel Mar 28, 2024
5e9b482
Address most comments
jurplel Mar 30, 2024
647614d
Finally address yilang412 comments
jurplel Apr 9, 2024
ca108ca
Merge remote-tracking branch 'origin/main' into bowad/filter_pushdown
jurplel Apr 9, 2024
d5bc7f9
Merge
jurplel Apr 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 8 additions & 33 deletions optd-datafusion-bridge/src/into_optd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,6 @@ use optd_datafusion_repr::properties::schema::Schema as OPTDSchema;

use crate::OptdPlanContext;

// flatten_nested_logical is a helper function to flatten nested logical operators with same op type
// eg. (a AND (b AND c)) => ExprList([a, b, c])
// (a OR (b OR c)) => ExprList([a, b, c])
// It assume the children of the input expr_list are already flattened
// and can only be used in bottom up manner
fn flatten_nested_logical(op: LogOpType, expr_list: ExprList) -> ExprList {
// conv_into_optd_expr is building the children bottom up so there is no need to
// call flatten_nested_logical recursively
let mut new_expr_list = Vec::new();
for child in expr_list.to_vec() {
if let OptRelNodeTyp::LogOp(child_op) = child.typ() {
if child_op == op {
let child_log_op_expr = LogOpExpr::from_rel_node(child.into_rel_node()).unwrap();
new_expr_list.extend(child_log_op_expr.children().to_vec());
continue;
}
}
new_expr_list.push(child.clone());
}
ExprList::new(new_expr_list)
}

impl OptdPlanContext<'_> {
fn conv_into_optd_table_scan(&mut self, node: &logical_plan::TableScan) -> Result<PlanNode> {
let table_name = node.table_name.to_string();
Expand Down Expand Up @@ -73,14 +51,16 @@ impl OptdPlanContext<'_> {
Operator::And => {
let op = LogOpType::And;
let expr_list = ExprList::new(vec![left, right]);
let expr_list = flatten_nested_logical(op, expr_list);
return Ok(LogOpExpr::new(op, expr_list).into_expr());
return Ok(
LogOpExpr::new_flattened_nested_logical(op, expr_list).into_expr()
);
}
Operator::Or => {
let op = LogOpType::Or;
let expr_list = ExprList::new(vec![left, right]);
let expr_list = flatten_nested_logical(op, expr_list);
return Ok(LogOpExpr::new(op, expr_list).into_expr());
return Ok(
LogOpExpr::new_flattened_nested_logical(op, expr_list).into_expr()
);
}
_ => {}
}
Expand Down Expand Up @@ -331,13 +311,8 @@ impl OptdPlanContext<'_> {
} else {
let expr_list = ExprList::new(log_ops);
// the expr from filter is already flattened in conv_into_optd_expr
let expr_list = flatten_nested_logical(LogOpType::And, expr_list);
Ok(LogicalJoin::new(
left,
right,
LogOpExpr::new(LogOpType::And, expr_list).into_expr(),
join_type,
))
let log_op = LogOpExpr::new_flattened_nested_logical(LogOpType::And, expr_list);
Ok(LogicalJoin::new(left, right, log_op.into_expr(), join_type))
}
}

Expand Down
25 changes: 23 additions & 2 deletions optd-datafusion-repr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ use properties::{
};
use rules::{
EliminateDuplicatedAggExprRule, EliminateDuplicatedSortExprRule, EliminateFilterRule,
EliminateJoinRule, EliminateLimitRule, HashJoinRule, JoinAssocRule, JoinCommuteRule,
PhysicalConversionRule, ProjectionPullUpJoin, SimplifyFilterRule, SimplifyJoinCondRule,
EliminateJoinRule, EliminateLimitRule, FilterAggTransposeRule, FilterCrossJoinTransposeRule,
FilterInnerJoinTransposeRule, FilterMergeRule, FilterProjectTransposeRule,
FilterSortTransposeRule, HashJoinRule, JoinAssocRule, JoinCommuteRule, PhysicalConversionRule,
ProjectionPullUpJoin, SimplifyFilterRule, SimplifyJoinCondRule,
};

pub use optd_core::rel_node::Value;
Expand All @@ -34,6 +36,8 @@ mod explain;
pub mod plan_nodes;
pub mod properties;
pub mod rules;
#[cfg(test)]
mod testing;

pub struct DatafusionOptimizer {
hueristic_optimizer: HeuristicsOptimizer<OptRelNodeTyp>,
Expand Down Expand Up @@ -92,6 +96,23 @@ impl DatafusionOptimizer {
for rule in rules {
rule_wrappers.push(RuleWrapper::new_cascades(rule));
}
// add all filter pushdown rules as heuristic rules
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
FilterProjectTransposeRule::new(),
)));
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(FilterMergeRule::new())));
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
FilterCrossJoinTransposeRule::new(),
)));
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
FilterInnerJoinTransposeRule::new(),
)));
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
FilterSortTransposeRule::new(),
)));
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
FilterAggTransposeRule::new(),
)));
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(HashJoinRule::new()))); // 17
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(JoinCommuteRule::new()))); // 18
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(JoinAssocRule::new())));
Expand Down
51 changes: 51 additions & 0 deletions optd-datafusion-repr/src/plan_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::fmt::Debug;
use std::sync::Arc;

use arrow_schema::DataType;
use itertools::Itertools;
use optd_core::{
cascades::{CascadesOptimizer, GroupId},
rel_node::{RelNode, RelNodeMeta, RelNodeMetaMap, RelNodeRef, RelNodeTyp},
Expand Down Expand Up @@ -282,6 +283,56 @@ impl Expr {
pub fn child(&self, idx: usize) -> OptRelNodeRef {
self.0.child(idx)
}

/// Recursively rewrite all column references in the expression.using a provided
/// function that replaces a column index.
/// The provided function will, given a ColumnRefExpr's index,
/// return either Some(usize) or None.
/// - If it is Some, the column index can be rewritten with the value.
/// - If any of the columns is None, we will return None all the way up
/// the call stack, and no expression will be returned.
pub fn rewrite_column_refs(
&self,
rewrite_fn: &impl Fn(usize) -> Option<usize>,
) -> Option<Self> {
assert!(self.typ().is_expression());
if let OptRelNodeTyp::ColumnRef = self.typ() {
let col_ref = ColumnRefExpr::from_rel_node(self.0.clone()).unwrap();
let rewritten = rewrite_fn(col_ref.index());
return if let Some(rewritten_idx) = rewritten {
let new_col_ref = ColumnRefExpr::new(rewritten_idx);
Some(Self(new_col_ref.into_rel_node()))
} else {
None
};
}

let children = self.0.children.clone();
let children = children
.into_iter()
.map(|child| {
if child.typ == OptRelNodeTyp::List {
// TODO: What should we do with List?
return Some(child);
}
Expr::from_rel_node(child.clone())
.unwrap()
.rewrite_column_refs(rewrite_fn)
.map(|x| x.into_rel_node())
})
.collect::<Option<Vec<_>>>()?;
Some(
Expr::from_rel_node(
RelNode {
typ: self.0.typ.clone(),
children: children.into_iter().collect_vec(),
data: self.0.data.clone(),
}
.into(),
)
.unwrap(),
)
}
}

impl OptRelNode for Expr {
Expand Down
23 changes: 23 additions & 0 deletions optd-datafusion-repr/src/plan_nodes/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,29 @@ impl LogOpExpr {
))
}

/// flatten_nested_logical is a helper function to flatten nested logical operators with same op type
/// eg. (a AND (b AND c)) => ExprList([a, b, c])
/// (a OR (b OR c)) => ExprList([a, b, c])
/// It assume the children of the input expr_list are already flattened
/// and can only be used in bottom up manner
pub fn new_flattened_nested_logical(op: LogOpType, expr_list: ExprList) -> Self {
// Since we assume that we are building the children bottom up,
// there is no need to call flatten_nested_logical recursively
let mut new_expr_list = Vec::new();
for child in expr_list.to_vec() {
if let OptRelNodeTyp::LogOp(child_op) = child.typ() {
if child_op == op {
let child_log_op_expr =
LogOpExpr::from_rel_node(child.into_rel_node()).unwrap();
new_expr_list.extend(child_log_op_expr.children().to_vec());
continue;
}
}
new_expr_list.push(child.clone());
}
LogOpExpr::new(op, ExprList::new(new_expr_list))
}

pub fn children(&self) -> Vec<Expr> {
self.0
.0
Expand Down
17 changes: 17 additions & 0 deletions optd-datafusion-repr/src/plan_nodes/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,20 @@ define_plan_node!(
{ 3, right_keys: ExprList }
], { join_type: JoinType }
);

impl LogicalJoin {
/// Takes in left/right schema sizes, and maps a column index to be as if it
/// were pushed down to the left or right side of a join accordingly.
pub fn map_through_join(
col_idx: usize,
left_schema_size: usize,
right_schema_size: usize,
) -> usize {
assert!(col_idx < left_schema_size + right_schema_size);
if col_idx < left_schema_size {
col_idx
} else {
col_idx - left_schema_size
}
}
}
73 changes: 72 additions & 1 deletion optd-datafusion-repr/src/plan_nodes/projection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::expr::ExprList;
use super::macros::define_plan_node;

use super::{OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};
use super::{ColumnRefExpr, Expr, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};

#[derive(Clone, Debug)]
pub struct LogicalProjection(pub PlanNode);
Expand All @@ -26,3 +26,74 @@ define_plan_node!(
{ 1, exprs: ExprList }
]
);

/// This struct holds the mapping from original columns to projected columns.
///
/// # Example
/// With the following plan:
/// | Filter (#0 < 5)
/// |
/// |-| Projection [#2, #3]
/// |- Scan [#0, #1, #2, #3]
///
/// The computed projection mapping is:
/// #2 -> #0
/// #3 -> #1
pub struct ProjectionMapping {
forward: Vec<usize>,
_backward: Vec<Option<usize>>,
}

impl ProjectionMapping {
jurplel marked this conversation as resolved.
Show resolved Hide resolved
pub fn build(mapping: Vec<usize>) -> Option<Self> {
let mut backward = vec![];
for (i, &x) in mapping.iter().enumerate() {
if x >= backward.len() {
backward.resize(x + 1, None);
}
backward[x] = Some(i);
}
Some(Self {
forward: mapping,
_backward: backward,
})
}

pub fn projection_col_refers_to(&self, col: usize) -> usize {
self.forward[col]
}

pub fn _original_col_maps_to(&self, col: usize) -> Option<usize> {
self._backward[col]
}

/// Recursively rewrites all ColumnRefs in an Expr to *undo* the projection
/// condition. You might want to do this if you are pushing something
/// through a projection, or pulling a projection up.
///
/// # Example
/// If we have a projection node, mapping column A to column B (A -> B)
/// All B's in `cond` will be rewritten as A.
pub fn rewrite_condition(&self, cond: Expr, child_schema_len: usize) -> Expr {
let proj_schema_size = self.forward.len();
cond.rewrite_column_refs(&|idx| {
Some(if idx < proj_schema_size {
self.projection_col_refers_to(idx)
} else {
idx - proj_schema_size + child_schema_len
})
})
.unwrap()
}
}

impl LogicalProjection {
pub fn compute_column_mapping(exprs: &ExprList) -> Option<ProjectionMapping> {
let mut mapping = vec![];
for expr in exprs.to_vec() {
let col_expr = ColumnRefExpr::from_rel_node(expr.into_rel_node())?;
mapping.push(col_expr.index());
}
ProjectionMapping::build(mapping)
}
}
5 changes: 5 additions & 0 deletions optd-datafusion-repr/src/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
mod eliminate_duplicated_expr;
mod eliminate_limit;
mod filter;
mod filter_pushdown;
mod joins;
mod macros;
mod physical;
Expand All @@ -12,6 +13,10 @@ pub use eliminate_duplicated_expr::{
};
pub use eliminate_limit::EliminateLimitRule;
pub use filter::{EliminateFilterRule, SimplifyFilterRule, SimplifyJoinCondRule};
pub use filter_pushdown::{
FilterAggTransposeRule, FilterCrossJoinTransposeRule, FilterInnerJoinTransposeRule,
FilterMergeRule, FilterProjectTransposeRule, FilterSortTransposeRule,
};
pub use joins::{
EliminateJoinRule, HashJoinRule, JoinAssocRule, JoinCommuteRule, ProjectionPullUpJoin,
};
Expand Down
Loading
Loading