Skip to content

Commit e3d65b0

Browse files
committed
[FLINK-35854][table] plans after literalAgg
1 parent d34a88b commit e3d65b0

File tree

6 files changed

+84
-94
lines changed

6 files changed

+84
-94
lines changed

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1826,29 +1826,27 @@ LogicalProject(d=[$0])
18261826
<Resource name="optimized exec plan">
18271827
<![CDATA[
18281828
Calc(select=[b])
1829-
+- MultipleInput(readOrder=[0,0,0,0,1,0], members=[\nHashJoin(joinType=[LeftAntiJoin], where=[((d IS NULL OR ($f3 = d)) AND (c = f))], select=[b, c, $f3], isBroadcast=[true], build=[right])\n:- Calc(select=[b, c, CASE(((c0 = 0) OR (i0 IS NULL AND (ck >= c0) AND a IS NOT NULL)), 1, ((c1 = 0) OR (i IS NULL AND (ck0 >= c1) AND a IS NOT NULL)), 2, 3) AS $f3])\n: +- HashJoin(joinType=[LeftOuterJoin], where=[(a = EXPR$0)], select=[a, b, c, c0, ck, i0, c1, ck0, EXPR$0, i], isBroadcast=[true], build=[right])\n: :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck, i0, c1, ck0], build=[right], singleRowJoin=[true])\n: : :- Calc(select=[a, b, c, c0, ck, i0])\n: : : +- HashJoin(joinType=[LeftOuterJoin], where=[(a = i)], select=[a, b, c, c0, ck, i, i0], isBroadcast=[true], build=[right])\n: : : :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck], build=[right], singleRowJoin=[true])\n: : : : :- [#5] TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c])\n: : : : +- [#6] Exchange(distribution=[broadcast])\n: : : +- [#4] Exchange(distribution=[broadcast])\n: : +- [#3] Exchange(distribution=[broadcast])\n: +- [#2] Exchange(distribution=[broadcast])\n+- [#1] Exchange(distribution=[broadcast])\n])
1829+
+- MultipleInput(readOrder=[0,0,0,0,1,0], members=[\nHashJoin(joinType=[LeftAntiJoin], where=[((d IS NULL OR ($f3 = d)) AND (c = f))], select=[b, c, $f3], isBroadcast=[true], build=[right])\n:- Calc(select=[b, c, CASE(((c0 = 0) OR (i_0 IS NULL AND (ck >= c0) AND a IS NOT NULL)), 1, ((c1 = 0) OR (i IS NULL AND (ck0 >= c1) AND a IS NOT NULL)), 2, 3) AS $f3])\n: +- HashJoin(joinType=[LeftOuterJoin], where=[(a = EXPR$0)], select=[a, b, c, c0, ck, i_0, c1, ck0, EXPR$0, i], isBroadcast=[true], build=[right])\n: :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck, i_0, c1, ck0], build=[right], singleRowJoin=[true])\n: : :- Calc(select=[a, b, c, c0, ck, i_0])\n: : : +- HashJoin(joinType=[LeftOuterJoin], where=[(a = i)], select=[a, b, c, c0, ck, i, i_0], isBroadcast=[true], build=[right])\n: : : :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck], build=[right], singleRowJoin=[true])\n: : : : :- [#5] TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c])\n: : : : +- [#6] Exchange(distribution=[broadcast])\n: : : +- [#4] Exchange(distribution=[broadcast])\n: : +- [#3] Exchange(distribution=[broadcast])\n: +- [#2] Exchange(distribution=[broadcast])\n+- [#1] Exchange(distribution=[broadcast])\n])
18301830
:- Exchange(distribution=[broadcast])
18311831
: +- Calc(select=[d, f])
18321832
: +- TableSourceScan(table=[[default_catalog, default_database, r]], fields=[d, e, f])
18331833
:- Exchange(distribution=[broadcast])
1834-
: +- Calc(select=[EXPR$0, true AS i])
1835-
: +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0])
1836-
: +- Exchange(distribution=[hash[EXPR$0]])
1837-
: +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
1838-
: +- Calc(select=[CAST(j AS INTEGER) AS EXPR$0])(reuse_id=[1])
1839-
: +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[i, j, k])(reuse_id=[2])
1834+
: +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0, Final_LITERAL_AGG(literalAgg$0) AS i])
1835+
: +- Exchange(distribution=[hash[EXPR$0]])
1836+
: +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0, Partial_LITERAL_AGG(*) AS literalAgg$0])
1837+
: +- Calc(select=[CAST(j AS INTEGER) AS EXPR$0])(reuse_id=[1])
1838+
: +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[i, j, k])(reuse_id=[2])
18401839
:- Exchange(distribution=[broadcast])
18411840
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
18421841
: +- Exchange(distribution=[single])
18431842
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(EXPR$0) AS count$1])
18441843
: +- Reused(reference_id=[1])
18451844
:- Exchange(distribution=[broadcast])
1846-
: +- Calc(select=[i, true AS i0])
1847-
: +- HashAggregate(isMerge=[true], groupBy=[i], select=[i])
1848-
: +- Exchange(distribution=[hash[i]])
1849-
: +- LocalHashAggregate(groupBy=[i], select=[i])
1850-
: +- Calc(select=[i])(reuse_id=[3])
1851-
: +- Reused(reference_id=[2])
1845+
: +- HashAggregate(isMerge=[true], groupBy=[i], select=[i, Final_LITERAL_AGG(literalAgg$0) AS i_0])
1846+
: +- Exchange(distribution=[hash[i]])
1847+
: +- LocalHashAggregate(groupBy=[i], select=[i, Partial_LITERAL_AGG(*) AS literalAgg$0])
1848+
: +- Calc(select=[i])(reuse_id=[3])
1849+
: +- Reused(reference_id=[2])
18521850
:- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c])
18531851
+- Exchange(distribution=[broadcast])
18541852
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -669,10 +669,10 @@ Calc(select=[b])
669669
: +- Calc(select=[d, f])
670670
: +- TableSourceScan(table=[[default_catalog, default_database, r]], fields=[d, e, f])
671671
:- Exchange(distribution=[broadcast])
672-
: +- Calc(select=[true AS i])
673-
: +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0])
672+
: +- Calc(select=[i])
673+
: +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0, Final_LITERAL_AGG(literalAgg$0) AS i])
674674
: +- Exchange(distribution=[hash[EXPR$0]])
675-
: +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
675+
: +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0, Partial_LITERAL_AGG(*) AS literalAgg$0])
676676
: +- Calc(select=[1 AS EXPR$0])
677677
: +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[i, j, k])(reuse_id=[1])
678678
:- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c])
@@ -1190,10 +1190,10 @@ Calc(select=[b])
11901190
: +- Calc(select=[d])
11911191
: +- TableSourceScan(table=[[default_catalog, default_database, r]], fields=[d, e, f])
11921192
:- Exchange(distribution=[broadcast])
1193-
: +- Calc(select=[true AS i])
1194-
: +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0])
1193+
: +- Calc(select=[i])
1194+
: +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0, Final_LITERAL_AGG(literalAgg$0) AS i])
11951195
: +- Exchange(distribution=[hash[EXPR$0]])
1196-
: +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
1196+
: +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0, Partial_LITERAL_AGG(*) AS literalAgg$0])
11971197
: +- Calc(select=[1 AS EXPR$0])
11981198
: +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[i, j, k])(reuse_id=[1])
11991199
:- Calc(select=[a, b])
@@ -2212,29 +2212,27 @@ LogicalProject(d=[$0])
22122212
<Resource name="optimized exec plan">
22132213
<![CDATA[
22142214
Calc(select=[b])
2215-
+- MultipleInput(readOrder=[0,0,0,0,1,0], members=[\nNestedLoopJoin(joinType=[LeftAntiJoin], where=[((d IS NULL OR ($f3 = d)) AND (c = f))], select=[b, c, $f3], build=[right])\n:- Calc(select=[b, c, CASE(((c0 = 0) OR (i0 IS NULL AND (ck >= c0) AND a IS NOT NULL)), 1, ((c1 = 0) OR (i IS NULL AND (ck0 >= c1) AND a IS NOT NULL)), 2, 3) AS $f3])\n: +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[(a = EXPR$0)], select=[a, b, c, c0, ck, i0, c1, ck0, EXPR$0, i], build=[right])\n: :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck, i0, c1, ck0], build=[right], singleRowJoin=[true])\n: : :- Calc(select=[a, b, c, c0, ck, i0])\n: : : +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[(a = i)], select=[a, b, c, c0, ck, i, i0], build=[right])\n: : : :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck], build=[right], singleRowJoin=[true])\n: : : : :- [#5] TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c])\n: : : : +- [#6] Exchange(distribution=[broadcast])\n: : : +- [#4] Exchange(distribution=[broadcast])\n: : +- [#3] Exchange(distribution=[broadcast])\n: +- [#2] Exchange(distribution=[broadcast])\n+- [#1] Exchange(distribution=[broadcast])\n])
2215+
+- MultipleInput(readOrder=[0,0,0,0,1,0], members=[\nNestedLoopJoin(joinType=[LeftAntiJoin], where=[((d IS NULL OR ($f3 = d)) AND (c = f))], select=[b, c, $f3], build=[right])\n:- Calc(select=[b, c, CASE(((c0 = 0) OR (i_0 IS NULL AND (ck >= c0) AND a IS NOT NULL)), 1, ((c1 = 0) OR (i IS NULL AND (ck0 >= c1) AND a IS NOT NULL)), 2, 3) AS $f3])\n: +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[(a = EXPR$0)], select=[a, b, c, c0, ck, i_0, c1, ck0, EXPR$0, i], build=[right])\n: :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck, i_0, c1, ck0], build=[right], singleRowJoin=[true])\n: : :- Calc(select=[a, b, c, c0, ck, i_0])\n: : : +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[(a = i)], select=[a, b, c, c0, ck, i, i_0], build=[right])\n: : : :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck], build=[right], singleRowJoin=[true])\n: : : : :- [#5] TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c])\n: : : : +- [#6] Exchange(distribution=[broadcast])\n: : : +- [#4] Exchange(distribution=[broadcast])\n: : +- [#3] Exchange(distribution=[broadcast])\n: +- [#2] Exchange(distribution=[broadcast])\n+- [#1] Exchange(distribution=[broadcast])\n])
22162216
:- Exchange(distribution=[broadcast])
22172217
: +- Calc(select=[d, f])
22182218
: +- TableSourceScan(table=[[default_catalog, default_database, r]], fields=[d, e, f])
22192219
:- Exchange(distribution=[broadcast])
2220-
: +- Calc(select=[EXPR$0, true AS i])
2221-
: +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0])
2222-
: +- Exchange(distribution=[hash[EXPR$0]])
2223-
: +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
2224-
: +- Calc(select=[CAST(j AS INTEGER) AS EXPR$0])(reuse_id=[1])
2225-
: +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[i, j, k])(reuse_id=[2])
2220+
: +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0, Final_LITERAL_AGG(literalAgg$0) AS i])
2221+
: +- Exchange(distribution=[hash[EXPR$0]])
2222+
: +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0, Partial_LITERAL_AGG(*) AS literalAgg$0])
2223+
: +- Calc(select=[CAST(j AS INTEGER) AS EXPR$0])(reuse_id=[1])
2224+
: +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[i, j, k])(reuse_id=[2])
22262225
:- Exchange(distribution=[broadcast])
22272226
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
22282227
: +- Exchange(distribution=[single])
22292228
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(EXPR$0) AS count$1])
22302229
: +- Reused(reference_id=[1])
22312230
:- Exchange(distribution=[broadcast])
2232-
: +- Calc(select=[i, true AS i0])
2233-
: +- HashAggregate(isMerge=[true], groupBy=[i], select=[i])
2234-
: +- Exchange(distribution=[hash[i]])
2235-
: +- LocalHashAggregate(groupBy=[i], select=[i])
2236-
: +- Calc(select=[i])(reuse_id=[3])
2237-
: +- Reused(reference_id=[2])
2231+
: +- HashAggregate(isMerge=[true], groupBy=[i], select=[i, Final_LITERAL_AGG(literalAgg$0) AS i_0])
2232+
: +- Exchange(distribution=[hash[i]])
2233+
: +- LocalHashAggregate(groupBy=[i], select=[i, Partial_LITERAL_AGG(*) AS literalAgg$0])
2234+
: +- Calc(select=[i])(reuse_id=[3])
2235+
: +- Reused(reference_id=[2])
22382236
:- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c])
22392237
+- Exchange(distribution=[broadcast])
22402238
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
@@ -2442,27 +2440,25 @@ LogicalProject(e=[$1], d=[$0])
24422440
<Resource name="optimized exec plan">
24432441
<![CDATA[
24442442
Calc(select=[b])
2445-
+- MultipleInput(readOrder=[0,0,0,0,1,0], members=[\nNestedLoopJoin(joinType=[LeftAntiJoin], where=[((b IS NULL OR e IS NULL OR (b = e)) AND (d IS NULL OR ($f3 = d)))], select=[b, $f3], build=[right])\n:- Calc(select=[b, CASE(((c0 = 0) OR (i0 IS NULL AND (ck >= c0) AND a IS NOT NULL)), 1, ((c = 0) OR (i IS NULL AND (ck0 >= c) AND a IS NOT NULL)), 2, 3) AS $f3])\n: +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[(a = j)], select=[a, b, c0, ck, i0, c, ck0, j, i], build=[right])\n: :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c0, ck, i0, c, ck0], build=[right], singleRowJoin=[true])\n: : :- Calc(select=[a, b, c AS c0, ck, i0])\n: : : +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[(a = i)], select=[a, b, c, ck, i, i0], build=[right])\n: : : :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, ck], build=[right], singleRowJoin=[true])\n: : : : :- [#5] Calc(select=[a, b])\n: : : : +- [#6] Exchange(distribution=[broadcast])\n: : : +- [#4] Exchange(distribution=[broadcast])\n: : +- [#3] Exchange(distribution=[broadcast])\n: +- [#2] Exchange(distribution=[broadcast])\n+- [#1] Exchange(distribution=[broadcast])\n])
2443+
+- MultipleInput(readOrder=[0,0,0,0,1,0], members=[\nNestedLoopJoin(joinType=[LeftAntiJoin], where=[((b IS NULL OR e IS NULL OR (b = e)) AND (d IS NULL OR ($f3 = d)))], select=[b, $f3], build=[right])\n:- Calc(select=[b, CASE(((c0 = 0) OR (i_0 IS NULL AND (ck >= c0) AND a IS NOT NULL)), 1, ((c = 0) OR (i IS NULL AND (ck0 >= c) AND a IS NOT NULL)), 2, 3) AS $f3])\n: +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[(a = j)], select=[a, b, c0, ck, i_0, c, ck0, j, i], build=[right])\n: :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c0, ck, i_0, c, ck0], build=[right], singleRowJoin=[true])\n: : :- Calc(select=[a, b, c AS c0, ck, i_0])\n: : : +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[(a = i)], select=[a, b, c, ck, i, i_0], build=[right])\n: : : :- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, ck], build=[right], singleRowJoin=[true])\n: : : : :- [#5] Calc(select=[a, b])\n: : : : +- [#6] Exchange(distribution=[broadcast])\n: : : +- [#4] Exchange(distribution=[broadcast])\n: : +- [#3] Exchange(distribution=[broadcast])\n: +- [#2] Exchange(distribution=[broadcast])\n+- [#1] Exchange(distribution=[broadcast])\n])
24462444
:- Exchange(distribution=[broadcast])
24472445
: +- Calc(select=[e, d])
24482446
: +- TableSourceScan(table=[[default_catalog, default_database, r]], fields=[d, e, f])
24492447
:- Exchange(distribution=[broadcast])
2450-
: +- Calc(select=[j, true AS i])
2451-
: +- HashAggregate(isMerge=[true], groupBy=[j], select=[j])
2452-
: +- Exchange(distribution=[hash[j]])
2453-
: +- LocalHashAggregate(groupBy=[j], select=[j])
2454-
: +- TableSourceScan(table=[[default_catalog, default_database, t2]], fields=[j])(reuse_id=[1])
2448+
: +- HashAggregate(isMerge=[true], groupBy=[j], select=[j, Final_LITERAL_AGG(literalAgg$0) AS i])
2449+
: +- Exchange(distribution=[hash[j]])
2450+
: +- LocalHashAggregate(groupBy=[j], select=[j, Partial_LITERAL_AGG(*) AS literalAgg$0])
2451+
: +- TableSourceScan(table=[[default_catalog, default_database, t2]], fields=[j])(reuse_id=[1])
24552452
:- Exchange(distribution=[broadcast])
24562453
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
24572454
: +- Exchange(distribution=[single])
24582455
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(j) AS count$1])
24592456
: +- Reused(reference_id=[1])
24602457
:- Exchange(distribution=[broadcast])
2461-
: +- Calc(select=[i, true AS i0])
2462-
: +- HashAggregate(isMerge=[true], groupBy=[i], select=[i])
2463-
: +- Exchange(distribution=[hash[i]])
2464-
: +- LocalHashAggregate(groupBy=[i], select=[i])
2465-
: +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[i])(reuse_id=[2])
2458+
: +- HashAggregate(isMerge=[true], groupBy=[i], select=[i, Final_LITERAL_AGG(literalAgg$0) AS i_0])
2459+
: +- Exchange(distribution=[hash[i]])
2460+
: +- LocalHashAggregate(groupBy=[i], select=[i, Partial_LITERAL_AGG(*) AS literalAgg$0])
2461+
: +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[i])(reuse_id=[2])
24662462
:- Calc(select=[a, b])
24672463
: +- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c])
24682464
+- Exchange(distribution=[broadcast])

0 commit comments

Comments
 (0)