Skip to content

Commit 9c40e49

Browse files
committed
fix tests
1 parent fef3c6e commit 9c40e49

File tree

3 files changed

+43
-21
lines changed

3 files changed

+43
-21
lines changed

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
3636

3737
use crate::assert_optimized;
3838
use crate::physical_optimizer::test_util::{
39-
csv_exec_ordered, csv_exec_sorted, parquet_exec, parquet_exec_sorted,
40-
stream_exec_ordered,
39+
csv_exec_ordered, csv_exec_sorted, parquet_exec, parquet_exec_sorted, stream_exec_ordered,
4140
};
4241
use datafusion::physical_optimizer::enforce_distribution::EnforceDistribution;
4342
use datafusion_physical_expr::Partitioning;

datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,14 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
3939
use arrow::record_batch::RecordBatch;
4040

4141
use datafusion_common::tree_node::{TransformedResult, TreeNode};
42-
use datafusion_common::Result;
43-
use datafusion_expr::{JoinType, Operator};
44-
use datafusion_physical_expr::expressions::{self, col, Column};
45-
use datafusion_physical_expr::PhysicalSortExpr;
46-
use datafusion_physical_optimizer::test_utils::check_integrity;
47-
use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{replace_with_order_preserving_variants, OrderPreservationContext};
42+
use datafusion_common::Result;
43+
use datafusion_expr::{JoinType, Operator};
44+
use datafusion_physical_expr::expressions::{self, col, Column};
45+
use datafusion_physical_expr::PhysicalSortExpr;
46+
use datafusion_physical_optimizer::test_utils::check_integrity;
47+
use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{replace_with_order_preserving_variants, OrderPreservationContext};
4848

49-
use crate::physical_optimizer::test_util::stream_exec_ordered;
49+
use crate::physical_optimizer::test_util::stream_exec_ordered_with_projection;
5050

5151
use object_store::memory::InMemory;
5252
use object_store::ObjectStore;
@@ -190,7 +190,7 @@ async fn test_replace_multiple_input_repartition_1(
190190
let schema = create_test_schema()?;
191191
let sort_exprs = vec![sort_expr("a", &schema)];
192192
let source = if source_unbounded {
193-
stream_exec_ordered(&schema, sort_exprs)
193+
stream_exec_ordered_with_projection(&schema, sort_exprs)
194194
} else {
195195
memory_exec_sorted(&schema, sort_exprs)
196196
};
@@ -259,7 +259,7 @@ async fn test_with_inter_children_change_only(
259259
let schema = create_test_schema()?;
260260
let sort_exprs = vec![sort_expr_default("a", &schema)];
261261
let source = if source_unbounded {
262-
stream_exec_ordered(&schema, sort_exprs)
262+
stream_exec_ordered_with_projection(&schema, sort_exprs)
263263
} else {
264264
memory_exec_sorted(&schema, sort_exprs)
265265
};
@@ -362,7 +362,7 @@ async fn test_replace_multiple_input_repartition_2(
362362
let schema = create_test_schema()?;
363363
let sort_exprs = vec![sort_expr("a", &schema)];
364364
let source = if source_unbounded {
365-
stream_exec_ordered(&schema, sort_exprs)
365+
stream_exec_ordered_with_projection(&schema, sort_exprs)
366366
} else {
367367
memory_exec_sorted(&schema, sort_exprs)
368368
};
@@ -438,7 +438,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps(
438438
let schema = create_test_schema()?;
439439
let sort_exprs = vec![sort_expr("a", &schema)];
440440
let source = if source_unbounded {
441-
stream_exec_ordered(&schema, sort_exprs)
441+
stream_exec_ordered_with_projection(&schema, sort_exprs)
442442
} else {
443443
memory_exec_sorted(&schema, sort_exprs)
444444
};
@@ -520,7 +520,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2(
520520
let schema = create_test_schema()?;
521521
let sort_exprs = vec![sort_expr("a", &schema)];
522522
let source = if source_unbounded {
523-
stream_exec_ordered(&schema, sort_exprs)
523+
stream_exec_ordered_with_projection(&schema, sort_exprs)
524524
} else {
525525
memory_exec_sorted(&schema, sort_exprs)
526526
};
@@ -608,7 +608,7 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting(
608608
let schema = create_test_schema()?;
609609
let sort_exprs = vec![sort_expr("a", &schema)];
610610
let source = if source_unbounded {
611-
stream_exec_ordered(&schema, sort_exprs)
611+
stream_exec_ordered_with_projection(&schema, sort_exprs)
612612
} else {
613613
memory_exec_sorted(&schema, sort_exprs)
614614
};
@@ -681,7 +681,7 @@ async fn test_with_multiple_replacable_repartitions(
681681
let schema = create_test_schema()?;
682682
let sort_exprs = vec![sort_expr("a", &schema)];
683683
let source = if source_unbounded {
684-
stream_exec_ordered(&schema, sort_exprs)
684+
stream_exec_ordered_with_projection(&schema, sort_exprs)
685685
} else {
686686
memory_exec_sorted(&schema, sort_exprs)
687687
};
@@ -769,7 +769,7 @@ async fn test_not_replace_with_different_orderings(
769769
let schema = create_test_schema()?;
770770
let sort_exprs = vec![sort_expr("a", &schema)];
771771
let source = if source_unbounded {
772-
stream_exec_ordered(&schema, sort_exprs)
772+
stream_exec_ordered_with_projection(&schema, sort_exprs)
773773
} else {
774774
memory_exec_sorted(&schema, sort_exprs)
775775
};
@@ -841,7 +841,7 @@ async fn test_with_lost_ordering(
841841
let schema = create_test_schema()?;
842842
let sort_exprs = vec![sort_expr("a", &schema)];
843843
let source = if source_unbounded {
844-
stream_exec_ordered(&schema, sort_exprs)
844+
stream_exec_ordered_with_projection(&schema, sort_exprs)
845845
} else {
846846
memory_exec_sorted(&schema, sort_exprs)
847847
};
@@ -911,7 +911,7 @@ async fn test_with_lost_and_kept_ordering(
911911
let schema = create_test_schema()?;
912912
let sort_exprs = vec![sort_expr("a", &schema)];
913913
let source = if source_unbounded {
914-
stream_exec_ordered(&schema, sort_exprs)
914+
stream_exec_ordered_with_projection(&schema, sort_exprs)
915915
} else {
916916
memory_exec_sorted(&schema, sort_exprs)
917917
};
@@ -1017,7 +1017,7 @@ async fn test_with_multiple_child_trees(
10171017

10181018
let left_sort_exprs = vec![sort_expr("a", &schema)];
10191019
let left_source = if source_unbounded {
1020-
stream_exec_ordered(&schema, left_sort_exprs)
1020+
stream_exec_ordered_with_projection(&schema, left_sort_exprs)
10211021
} else {
10221022
memory_exec_sorted(&schema, left_sort_exprs)
10231023
};
@@ -1028,7 +1028,7 @@ async fn test_with_multiple_child_trees(
10281028

10291029
let right_sort_exprs = vec![sort_expr("a", &schema)];
10301030
let right_source = if source_unbounded {
1031-
stream_exec_ordered(&schema, right_sort_exprs)
1031+
stream_exec_ordered_with_projection(&schema, right_sort_exprs)
10321032
} else {
10331033
memory_exec_sorted(&schema, right_sort_exprs)
10341034
};

datafusion/core/tests/physical_optimizer/test_util.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,29 @@ pub fn stream_exec_ordered(
268268
)
269269
}
270270

271+
// Creates a stream exec source for the test purposes
272+
pub fn stream_exec_ordered_with_projection(
273+
schema: &SchemaRef,
274+
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
275+
) -> Arc<dyn ExecutionPlan> {
276+
let sort_exprs = sort_exprs.into_iter().collect();
277+
let projection: Vec<usize> = vec![0, 2, 3];
278+
279+
Arc::new(
280+
StreamingTableExec::try_new(
281+
schema.clone(),
282+
vec![Arc::new(TestStreamPartition {
283+
schema: schema.clone(),
284+
}) as _],
285+
Some(&projection),
286+
vec![sort_exprs],
287+
true,
288+
None,
289+
)
290+
.unwrap(),
291+
)
292+
}
293+
271294
/// Create a csv exec for tests
272295
pub fn csv_exec_ordered(
273296
schema: &SchemaRef,

0 commit comments

Comments
 (0)