Skip to content

Commit 47d8edc

Browse files
committed
push down filter: consider already pushed filters
If optimizer runs several times it can try to push filters into table provider several times. But, `supports_filters_pushdown` may be context-dependent. For example, it may support any filter like `column=value` but not the both at the same time. Consider the following optimizer run: 1. Try to push `a = 1, b = 1`. `supports_filters_pushdown` returns [Exact, Inexact] Ok, will remember that `a=1` is pushed and make a filter node for `b=1`. ... Another optimization iteration. 2. Try to push `b = 1`. `supports_filters_pushdown` returns [Exact]. Of course, the table provider can't remember all pushed filters. So, there is nothing to left but to answer: "Exact". Now, the optimizer thinks that conjunction `a = 1 AND b = 1` is supported exactly, but it's not. To prevent this problem, this patch adds already pushed filters to the `supports_filters_pushdown` call.
1 parent 4abed57 commit 47d8edc

File tree

2 files changed

+11
-10
lines changed

2 files changed

+11
-10
lines changed

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,13 @@ impl OptimizerRule for PushDownFilter {
872872
convert_to_cross_join_if_beneficial(plan.data)
873873
}
874874
LogicalPlan::TableScan(scan) => {
875-
let filter_predicates = split_conjunction(&filter.predicate);
875+
let filter_predicates: Vec<_> = split_conjunction(&filter.predicate)
876+
.into_iter()
877+
// Add already pushed filters.
878+
.chain(scan.filters.iter())
879+
.unique()
880+
.collect();
881+
876882
let results = scan
877883
.source
878884
.supports_filters_pushdown(filter_predicates.as_slice())?;
@@ -889,13 +895,8 @@ impl OptimizerRule for PushDownFilter {
889895
.clone()
890896
.filter(|(_, res)| res != &TableProviderFilterPushDown::Unsupported)
891897
.map(|(pred, _)| pred);
892-
let new_scan_filters: Vec<Expr> = scan
893-
.filters
894-
.iter()
895-
.chain(new_scan_filters)
896-
.unique()
897-
.cloned()
898-
.collect();
898+
let new_scan_filters: Vec<Expr> =
899+
new_scan_filters.unique().cloned().collect();
899900
let new_predicate: Vec<Expr> = zip
900901
.filter(|(_, res)| res != &TableProviderFilterPushDown::Exact)
901902
.map(|(pred, _)| pred.clone())

datafusion/sqllogictest/test_files/predicates.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ CREATE EXTERNAL TABLE aggregate_test_100 (
3939
c13 VARCHAR NOT NULL
4040
)
4141
STORED AS CSV
42-
LOCATION '../../testing/data/csv/aggregate_test_100.csv'
42+
LOCATION '../../testing/data/csv/aggregate_test_100.csv'
4343
OPTIONS ('format.has_header' 'true');
4444

4545
statement ok
@@ -666,7 +666,7 @@ logical_plan
666666
03)----Filter: lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)
667667
04)------TableScan: lineitem projection=[l_partkey, l_quantity], partial_filters=[lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)]
668668
05)----Filter: (part.p_brand = Utf8("Brand#12") AND part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_size <= Int32(10) OR part.p_brand = Utf8("Brand#34") AND part.p_size <= Int32(15)) AND part.p_size >= Int32(1)
669-
06)------TableScan: part projection=[p_partkey, p_brand, p_size], partial_filters=[part.p_size >= Int32(1), part.p_brand = Utf8("Brand#12") AND part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_size <= Int32(10) OR part.p_brand = Utf8("Brand#34") AND part.p_size <= Int32(15)]
669+
06)------TableScan: part projection=[p_partkey, p_brand, p_size], partial_filters=[part.p_brand = Utf8("Brand#12") AND part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_size <= Int32(10) OR part.p_brand = Utf8("Brand#34") AND part.p_size <= Int32(15), part.p_size >= Int32(1)]
670670
physical_plan
671671
01)CoalesceBatchesExec: target_batch_size=8192
672672
02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_partkey@0, p_partkey@0)], filter=p_brand@1 = Brand#12 AND l_quantity@0 >= Some(100),15,2 AND l_quantity@0 <= Some(1100),15,2 AND p_size@2 <= 5 OR p_brand@1 = Brand#23 AND l_quantity@0 >= Some(1000),15,2 AND l_quantity@0 <= Some(2000),15,2 AND p_size@2 <= 10 OR p_brand@1 = Brand#34 AND l_quantity@0 >= Some(2000),15,2 AND l_quantity@0 <= Some(3000),15,2 AND p_size@2 <= 15, projection=[l_partkey@0]

0 commit comments

Comments
 (0)