diff --git a/datafusion-examples/examples/custom_file_format.rs b/datafusion-examples/examples/custom_file_format.rs index 5493fa21968e..23dba8112ad2 100644 --- a/datafusion-examples/examples/custom_file_format.rs +++ b/datafusion-examples/examples/custom_file_format.rs @@ -21,10 +21,13 @@ use arrow::{ array::{AsArray, RecordBatch, StringArray, UInt8Array}, datatypes::{DataType, Field, Schema, SchemaRef, UInt64Type}, }; -use datafusion::common::{GetExt, Statistics}; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::physical_expr::LexRequirement; use datafusion::physical_expr::PhysicalExpr; +use datafusion::{ + catalog::Session, + common::{GetExt, Statistics}, +}; use datafusion::{ datasource::{ file_format::{ @@ -35,7 +38,6 @@ use datafusion::{ MemTable, }, error::Result, - execution::context::SessionState, physical_plan::ExecutionPlan, prelude::SessionContext, }; @@ -83,7 +85,7 @@ impl FileFormat for TSVFileFormat { async fn infer_schema( &self, - state: &SessionState, + state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -94,7 +96,7 @@ impl FileFormat for TSVFileFormat { async fn infer_stats( &self, - state: &SessionState, + state: &dyn Session, store: &Arc, table_schema: SchemaRef, object: &ObjectMeta, @@ -106,7 +108,7 @@ impl FileFormat for TSVFileFormat { async fn create_physical_plan( &self, - state: &SessionState, + state: &dyn Session, conf: FileScanConfig, filters: Option<&Arc>, ) -> Result> { @@ -118,7 +120,7 @@ impl FileFormat for TSVFileFormat { async fn create_writer_physical_plan( &self, input: Arc, - state: &SessionState, + state: &dyn Session, conf: FileSinkConfig, order_requirements: Option, ) -> Result> { @@ -148,7 +150,7 @@ impl TSVFileFactory { impl FileFormatFactory for TSVFileFactory { fn create( &self, - state: &SessionState, + state: &dyn Session, format_options: &std::collections::HashMap, ) -> Result> { let mut new_options = format_options.clone(); diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index 03132e7b7bb5..72402df8a2ce 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -28,6 +28,7 @@ rust-version.workspace = true version.workspace = true [dependencies] +apache-avro = { version = "0.17", optional = true } arrow = { workspace = true } arrow-schema = { workspace = true } async-compression = { version = "0.4.0", features = [ @@ -49,9 +50,14 @@ futures = { workspace = true } glob = "0.3.0" itertools = { workspace = true } log = { workspace = true } +num-traits = { version = "0.2", optional = true } object_store = { workspace = true } url = { workspace = true } +[features] +# Used to enable the avro format +avro = ["apache-avro", "num-traits", "datafusion-common/avro"] + [dev-dependencies] async-trait = { workspace = true } tempfile = { workspace = true } diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/catalog-listing/src/file_scan_config.rs similarity index 76% rename from datafusion/core/src/datasource/physical_plan/file_scan_config.rs rename to datafusion/catalog-listing/src/file_scan_config.rs index 5a38886bb16f..f42268502b82 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/catalog-listing/src/file_scan_config.rs @@ -19,26 +19,273 @@ //! file sources. use std::{ - borrow::Cow, collections::HashMap, fmt::Debug, marker::PhantomData, mem::size_of, - sync::Arc, vec, + borrow::Cow, + collections::HashMap, + fmt::{Debug, Formatter, Result as FmtResult}, + marker::PhantomData, + mem::size_of, + sync::Arc, + vec, }; -use super::{get_projected_output_ordering, statistics::MinMaxStatistics}; -use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; -use crate::{error::Result, scalar::ScalarValue}; +use crate::minmax_statistics::MinMaxStatistics; +use crate::PartitionedFile; +use datafusion_common::error::Result; +use datafusion_common::scalar::ScalarValue; +use datafusion_execution::object_store::ObjectStoreUrl; use arrow::array::{ArrayData, BufferBuilder}; +use arrow::array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions}; use arrow::buffer::Buffer; use arrow::datatypes::{ArrowNativeType, UInt16Type}; -use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion_common::stats::Precision; use datafusion_common::{ exec_err, ColumnStatistics, Constraints, DataFusionError, Statistics, }; -use datafusion_physical_expr::LexOrdering; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; -use log::warn; +use datafusion_physical_plan::display::display_orderings; +use datafusion_physical_plan::display::ProjectSchemaDisplay; +use datafusion_physical_plan::DisplayAs; +use datafusion_physical_plan::DisplayFormatType; +use log::{debug, warn}; + +/// A wrapper to customize partitioned file display +/// +/// Prints in the format: +/// ```text +/// {NUM_GROUPS groups: [[file1, file2,...], [fileN, fileM, ...], ...]} +/// ``` +#[derive(Debug)] +pub struct FileGroupsDisplay<'a>(pub &'a [Vec]); + +impl DisplayAs for FileGroupsDisplay<'_> { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { + let n_groups = self.0.len(); + let groups = if n_groups == 1 { "group" } else { "groups" }; + write!(f, "{{{n_groups} {groups}: [")?; + match t { + DisplayFormatType::Default => { + // To avoid showing too many partitions + let max_groups = 5; + fmt_up_to_n_elements(self.0, max_groups, f, |group, f| { + FileGroupDisplay(group).fmt_as(t, f) + })?; + } + DisplayFormatType::Verbose => { + fmt_elements_split_by_commas(self.0.iter(), f, |group, f| { + FileGroupDisplay(group).fmt_as(t, f) + })? + } + } + write!(f, "]}}") + } +} + +/// A wrapper to customize partitioned group of files display +/// +/// Prints in the format: +/// ```text +/// [file1, file2,...] +/// ``` +#[derive(Debug)] +pub struct FileGroupDisplay<'a>(pub &'a [PartitionedFile]); + +impl DisplayAs for FileGroupDisplay<'_> { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { + write!(f, "[")?; + match t { + DisplayFormatType::Default => { + // To avoid showing too many files + let max_files = 5; + fmt_up_to_n_elements(self.0, max_files, f, |pf, f| { + write!(f, "{}", pf.object_meta.location.as_ref())?; + if let Some(range) = pf.range.as_ref() { + write!(f, ":{}..{}", range.start, range.end)?; + } + Ok(()) + })? + } + DisplayFormatType::Verbose => { + fmt_elements_split_by_commas(self.0.iter(), f, |pf, f| { + write!(f, "{}", pf.object_meta.location.as_ref())?; + if let Some(range) = pf.range.as_ref() { + write!(f, ":{}..{}", range.start, range.end)?; + } + Ok(()) + })? + } + } + write!(f, "]") + } +} + +/// helper to format an array of up to N elements +fn fmt_up_to_n_elements( + elements: &[E], + n: usize, + f: &mut Formatter, + format_element: F, +) -> FmtResult +where + F: Fn(&E, &mut Formatter) -> FmtResult, +{ + let len = elements.len(); + fmt_elements_split_by_commas(elements.iter().take(n), f, |element, f| { + format_element(element, f) + })?; + // Remaining elements are showed as `...` (to indicate there is more) + if len > n { + write!(f, ", ...")?; + } + Ok(()) +} + +/// helper formatting array elements with a comma and a space between them +fn fmt_elements_split_by_commas( + iter: I, + f: &mut Formatter, + format_element: F, +) -> FmtResult +where + I: Iterator, + F: Fn(E, &mut Formatter) -> FmtResult, +{ + for (idx, element) in iter.enumerate() { + if idx > 0 { + write!(f, ", ")?; + } + format_element(element, f)?; + } + Ok(()) +} + +/// The various listing tables does not attempt to read all files +/// concurrently, instead they will read files in sequence within a +/// partition. This is an important property as it allows plans to +/// run against 1000s of files and not try to open them all +/// concurrently. +/// +/// However, it means if we assign more than one file to a partition +/// the output sort order will not be preserved as illustrated in the +/// following diagrams: +/// +/// When only 1 file is assigned to each partition, each partition is +/// correctly sorted on `(A, B, C)` +/// +/// ```text +///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓ +/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +///┃ ┌───────────────┐ ┌──────────────┐ │ ┌──────────────┐ │ ┌─────────────┐ ┃ +/// │ │ 1.parquet │ │ │ │ 2.parquet │ │ │ 3.parquet │ │ │ 4.parquet │ │ +///┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ │Sort: A, B, C │ │ │Sort: A, B, C│ ┃ +/// │ └───────────────┘ │ │ └──────────────┘ │ └──────────────┘ │ └─────────────┘ │ +///┃ │ │ ┃ +/// │ │ │ │ │ │ +///┃ │ │ ┃ +/// │ │ │ │ │ │ +///┃ │ │ ┃ +/// │ │ │ │ │ │ +///┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ +/// DataFusion DataFusion DataFusion DataFusion +///┃ Partition 1 Partition 2 Partition 3 Partition 4 ┃ +/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ +/// +/// ParquetExec +///``` +/// +/// However, when more than 1 file is assigned to each partition, each +/// partition is NOT correctly sorted on `(A, B, C)`. Once the second +/// file is scanned, the same values for A, B and C can be repeated in +/// the same sorted stream +/// +///```text +///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ +/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ +///┃ ┌───────────────┐ ┌──────────────┐ │ +/// │ │ 1.parquet │ │ │ │ 2.parquet │ ┃ +///┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ +/// │ └───────────────┘ │ │ └──────────────┘ ┃ +///┃ ┌───────────────┐ ┌──────────────┐ │ +/// │ │ 3.parquet │ │ │ │ 4.parquet │ ┃ +///┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ +/// │ └───────────────┘ │ │ └──────────────┘ ┃ +///┃ │ +/// │ │ │ ┃ +///┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ +/// DataFusion DataFusion ┃ +///┃ Partition 1 Partition 2 +/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛ +/// +/// ParquetExec +///``` +fn get_projected_output_ordering( + base_config: &FileScanConfig, + projected_schema: &SchemaRef, +) -> Vec { + let mut all_orderings = vec![]; + for output_ordering in &base_config.output_ordering { + let mut new_ordering = LexOrdering::default(); + for PhysicalSortExpr { expr, options } in output_ordering.iter() { + if let Some(col) = expr.as_any().downcast_ref::() { + let name = col.name(); + if let Some((idx, _)) = projected_schema.column_with_name(name) { + // Compute the new sort expression (with correct index) after projection: + new_ordering.push(PhysicalSortExpr { + expr: Arc::new(Column::new(name, idx)), + options: *options, + }); + continue; + } + } + // Cannot find expression in the projected_schema, stop iterating + // since rest of the orderings are violated + break; + } + + // do not push empty entries + // otherwise we may have `Some(vec![])` at the output ordering. + if new_ordering.is_empty() { + continue; + } + + // Check if any file groups are not sorted + if base_config.file_groups.iter().any(|group| { + if group.len() <= 1 { + // File groups with <= 1 files are always sorted + return false; + } + + let statistics = match MinMaxStatistics::new_from_files( + &new_ordering, + projected_schema, + base_config.projection.as_deref(), + group, + ) { + Ok(statistics) => statistics, + Err(e) => { + log::trace!("Error fetching statistics for file group: {e}"); + // we can't prove that it's ordered, so we have to reject it + return true; + } + }; + + !statistics.is_sorted() + }) { + debug!( + "Skipping specified output ordering {:?}. \ + Some file groups couldn't be determined to be sorted: {:?}", + base_config.output_ordering[0], base_config.file_groups + ); + continue; + } + + all_orderings.push(new_ordering); + } + all_orderings +} /// Convert type to a type suitable for use as a [`ListingTable`] /// partition column. Returns `Dictionary(UInt16, val_type)`, which is @@ -51,7 +298,7 @@ use log::warn; /// /// Use [`wrap_partition_value_in_dict`] to wrap a [`ScalarValue`] in the same say. /// -/// [`ListingTable`]: crate::datasource::listing::ListingTable +/// [`ListingTable`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/listing/table.rs#L707 pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType { DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type)) } @@ -70,8 +317,8 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue { /// ``` /// # use std::sync::Arc; /// # use arrow_schema::Schema; -/// use datafusion::datasource::listing::PartitionedFile; -/// # use datafusion::datasource::physical_plan::FileScanConfig; +/// # use datafusion_catalog_listing::PartitionedFile; +/// # use datafusion_catalog_listing::file_scan_config::FileScanConfig; /// # use datafusion_execution::object_store::ObjectStoreUrl; /// # let file_schema = Arc::new(Schema::empty()); /// // create FileScan config for reading data from file:// @@ -283,7 +530,7 @@ impl FileScanConfig { } #[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro - pub(crate) fn projected_file_column_names(&self) -> Option> { + pub fn projected_file_column_names(&self) -> Option> { self.projection.as_ref().map(|p| { p.iter() .filter(|col_idx| **col_idx < self.file_schema.fields().len()) @@ -294,7 +541,7 @@ impl FileScanConfig { } /// Projects only file schema, ignoring partition columns - pub(crate) fn projected_file_schema(&self) -> SchemaRef { + pub fn projected_file_schema(&self) -> SchemaRef { let fields = self.file_column_projection_indices().map(|indices| { indices .iter() @@ -314,7 +561,7 @@ impl FileScanConfig { ) } - pub(crate) fn file_column_projection_indices(&self) -> Option> { + pub fn file_column_projection_indices(&self) -> Option> { self.projection.as_ref().map(|p| { p.iter() .filter(|col_idx| **col_idx < self.file_schema.fields().len()) @@ -389,6 +636,41 @@ impl FileScanConfig { } } +impl Debug for FileScanConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "object_store_url={:?}, ", self.object_store_url)?; + + write!(f, "statistics={:?}, ", self.statistics)?; + + DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f) + } +} + +impl DisplayAs for FileScanConfig { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { + let (schema, _, _, orderings) = self.project(); + + write!(f, "file_groups=")?; + FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; + + if !schema.fields().is_empty() { + write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; + } + + if let Some(limit) = self.limit { + write!(f, ", limit={limit}")?; + } + + display_orderings(f, &orderings)?; + + if !self.constraints.is_empty() { + write!(f, ", {}", self.constraints)?; + } + + Ok(()) + } +} + /// A helper that projects partition columns into the file record batches. /// /// One interesting trick is the usage of a cache for the key buffers of the partition column @@ -639,10 +921,41 @@ fn create_output_array( #[cfg(test)] mod tests { - use arrow_array::Int32Array; + use arrow::array::Int32Array; + use arrow_schema::SortOptions; + use datafusion_common::assert_batches_eq; + use datafusion_expr::SortExpr; + use datafusion_physical_expr::create_physical_expr; use super::*; - use crate::{test::columns, test_util::aggr_test_schema}; + + /// Get the schema for the aggregate_test_* csv files + fn aggr_test_schema() -> SchemaRef { + let mut f1 = Field::new("c1", DataType::Utf8, false); + f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())])); + let schema = Schema::new(vec![ + f1, + Field::new("c2", DataType::UInt32, false), + Field::new("c3", DataType::Int8, false), + Field::new("c4", DataType::Int16, false), + Field::new("c5", DataType::Int32, false), + Field::new("c6", DataType::Int64, false), + Field::new("c7", DataType::UInt8, false), + Field::new("c8", DataType::UInt16, false), + Field::new("c9", DataType::UInt32, false), + Field::new("c10", DataType::UInt64, false), + Field::new("c11", DataType::Float32, false), + Field::new("c12", DataType::Float64, false), + Field::new("c13", DataType::Utf8, false), + ]); + + Arc::new(schema) + } + + /// Returns the column names on the schema + fn columns(schema: &Schema) -> Vec { + schema.fields().iter().map(|f| f.name().clone()).collect() + } #[test] fn physical_plan_config_no_projection() { @@ -814,7 +1127,7 @@ mod tests { "| 2 | 0 | 12 | 2021 | 26 |", "+---+----+----+------+-----+", ]; - crate::assert_batches_eq!(expected, &[projected_batch]); + assert_batches_eq!(expected, &[projected_batch]); // project another batch that is larger than the previous one let file_batch = build_table_i32( @@ -844,7 +1157,7 @@ mod tests { "| 9 | -6 | 16 | 2021 | 27 |", "+---+-----+----+------+-----+", ]; - crate::assert_batches_eq!(expected, &[projected_batch]); + assert_batches_eq!(expected, &[projected_batch]); // project another batch that is smaller than the previous one let file_batch = build_table_i32( @@ -872,7 +1185,7 @@ mod tests { "| 3 | 4 | 6 | 2021 | 28 |", "+---+---+---+------+-----+", ]; - crate::assert_batches_eq!(expected, &[projected_batch]); + assert_batches_eq!(expected, &[projected_batch]); // forgot to dictionary-wrap the scalar value let file_batch = build_table_i32( @@ -900,7 +1213,7 @@ mod tests { "| 2 | 0 | 12 | 2021 | 26 |", "+---+----+----+------+-----+", ]; - crate::assert_batches_eq!(expected, &[projected_batch]); + assert_batches_eq!(expected, &[projected_batch]); } #[test] @@ -993,7 +1306,7 @@ mod tests { name: &'static str, file_schema: Schema, files: Vec, - sort: Vec, + sort: Vec, expected_result: Result>, &'static str>, } @@ -1134,11 +1447,32 @@ mod tests { )))) .collect::>(), )); + + // This is a copy of datafusion::physical_planner::create_physical_sort_expr + // Create a physical sort expression from a logical expression + fn create_physical_sort_expr( + e: &SortExpr, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, + ) -> Result { + let SortExpr { + expr, + asc, + nulls_first, + } = e; + Ok(PhysicalSortExpr { + expr: create_physical_expr(expr, input_dfschema, execution_props)?, + options: SortOptions { + descending: !asc, + nulls_first: *nulls_first, + }, + }) + } let sort_order = LexOrdering::from( case.sort .into_iter() .map(|expr| { - crate::physical_planner::create_physical_sort_expr( + create_physical_sort_expr( &expr, &DFSchema::try_from(table_schema.as_ref().clone())?, &ExecutionProps::default(), diff --git a/datafusion/core/src/datasource/physical_plan/statistics.rs b/datafusion/catalog-listing/src/minmax_statistics.rs similarity index 99% rename from datafusion/core/src/datasource/physical_plan/statistics.rs rename to datafusion/catalog-listing/src/minmax_statistics.rs index b4a8f377d256..71b2e551fd73 100644 --- a/datafusion/core/src/datasource/physical_plan/statistics.rs +++ b/datafusion/catalog-listing/src/minmax_statistics.rs @@ -26,13 +26,13 @@ use std::sync::Arc; -use crate::datasource::listing::PartitionedFile; +use crate::PartitionedFile; +use arrow::array::RecordBatch; use arrow::{ compute::SortColumn, row::{Row, Rows}, }; -use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr}; diff --git a/datafusion/catalog-listing/src/mod.rs b/datafusion/catalog-listing/src/mod.rs index e952e39fd479..0cbdf5fc8376 100644 --- a/datafusion/catalog-listing/src/mod.rs +++ b/datafusion/catalog-listing/src/mod.rs @@ -18,7 +18,9 @@ //! A table that uses the `ObjectStore` listing capability //! to get the list of files to process. +pub mod file_scan_config; pub mod helpers; +mod minmax_statistics; pub mod url; use chrono::TimeZone; diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 1d9827ae0ab5..d9c990d6937b 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -35,7 +35,6 @@ use crate::datasource::physical_plan::{ ArrowExec, FileGroupDisplay, FileScanConfig, FileSink, FileSinkConfig, }; use crate::error::Result; -use crate::execution::context::SessionState; use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use arrow::ipc::convert::fb_to_schema; @@ -43,6 +42,7 @@ use arrow::ipc::reader::FileReader; use arrow::ipc::writer::IpcWriteOptions; use arrow::ipc::{root_as_message, CompressionType}; use arrow_schema::{ArrowError, Schema, SchemaRef}; +use datafusion_catalog::Session; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION, @@ -83,7 +83,7 @@ impl ArrowFormatFactory { impl FileFormatFactory for ArrowFormatFactory { fn create( &self, - _state: &SessionState, + _state: &dyn Session, _format_options: &HashMap, ) -> Result> { Ok(Arc::new(ArrowFormat)) @@ -134,7 +134,7 @@ impl FileFormat for ArrowFormat { async fn infer_schema( &self, - _state: &SessionState, + _state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -158,7 +158,7 @@ impl FileFormat for ArrowFormat { async fn infer_stats( &self, - _state: &SessionState, + _state: &dyn Session, _store: &Arc, table_schema: SchemaRef, _object: &ObjectMeta, @@ -168,7 +168,7 @@ impl FileFormat for ArrowFormat { async fn create_physical_plan( &self, - _state: &SessionState, + _state: &dyn Session, conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { @@ -179,7 +179,7 @@ impl FileFormat for ArrowFormat { async fn create_writer_physical_plan( &self, input: Arc, - _state: &SessionState, + _state: &dyn Session, conf: FileSinkConfig, order_requirements: Option, ) -> Result> { diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index f854b9506a64..2bc25ca417f0 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use arrow::datatypes::Schema; use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use datafusion_catalog::Session; use datafusion_common::internal_err; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::GetExt; @@ -38,7 +39,6 @@ use super::FileFormatFactory; use crate::datasource::avro_to_arrow::read_avro_schema_from_reader; use crate::datasource::physical_plan::{AvroExec, FileScanConfig}; use crate::error::Result; -use crate::execution::context::SessionState; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; @@ -56,7 +56,7 @@ impl AvroFormatFactory { impl FileFormatFactory for AvroFormatFactory { fn create( &self, - _state: &SessionState, + _state: &dyn Session, _format_options: &HashMap, ) -> Result> { Ok(Arc::new(AvroFormat)) @@ -111,7 +111,7 @@ impl FileFormat for AvroFormat { async fn infer_schema( &self, - _state: &SessionState, + _state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -136,7 +136,7 @@ impl FileFormat for AvroFormat { async fn infer_stats( &self, - _state: &SessionState, + _state: &dyn Session, _store: &Arc, table_schema: SchemaRef, _object: &ObjectMeta, @@ -146,7 +146,7 @@ impl FileFormat for AvroFormat { async fn create_physical_plan( &self, - _state: &SessionState, + _state: &dyn Session, conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { @@ -160,6 +160,7 @@ impl FileFormat for AvroFormat { mod tests { use super::*; use crate::datasource::file_format::test_util::scan_format; + use crate::execution::SessionState; use crate::physical_plan::collect; use crate::prelude::{SessionConfig, SessionContext}; use arrow::array::{as_string_array, Array}; diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index edf757e539a9..80b1e858c1c0 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -44,6 +44,7 @@ use arrow::array::RecordBatch; use arrow::csv::WriterBuilder; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use arrow_schema::ArrowError; +use datafusion_catalog::Session; use datafusion_common::config::{ConfigField, ConfigFileType, CsvOptions}; use datafusion_common::file_options::csv_writer::CsvWriterOptions; use datafusion_common::{ @@ -94,9 +95,10 @@ impl Debug for CsvFormatFactory { impl FileFormatFactory for CsvFormatFactory { fn create( &self, - state: &SessionState, + state: &dyn Session, format_options: &HashMap, ) -> Result> { + let state = state.as_any().downcast_ref::().unwrap(); let csv_options = match &self.options { None => { let mut table_options = state.default_table_options(); @@ -364,7 +366,7 @@ impl FileFormat for CsvFormat { async fn infer_schema( &self, - state: &SessionState, + state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -399,7 +401,7 @@ impl FileFormat for CsvFormat { async fn infer_stats( &self, - _state: &SessionState, + _state: &dyn Session, _store: &Arc, table_schema: SchemaRef, _object: &ObjectMeta, @@ -409,7 +411,7 @@ impl FileFormat for CsvFormat { async fn create_physical_plan( &self, - state: &SessionState, + state: &dyn Session, conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { @@ -439,7 +441,7 @@ impl FileFormat for CsvFormat { async fn create_writer_physical_plan( &self, input: Arc, - state: &SessionState, + state: &dyn Session, conf: FileSinkConfig, order_requirements: Option, ) -> Result> { @@ -480,7 +482,7 @@ impl CsvFormat { /// number of lines that were read async fn infer_schema_from_stream( &self, - state: &SessionState, + state: &dyn Session, mut records_to_read: usize, stream: impl Stream>, ) -> Result<(Schema, usize)> { diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 5bffb7e582c1..25adbb66c402 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -47,6 +47,7 @@ use arrow::json; use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter}; use arrow_array::RecordBatch; use arrow_schema::ArrowError; +use datafusion_catalog::Session; use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions}; use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION}; @@ -85,9 +86,10 @@ impl JsonFormatFactory { impl FileFormatFactory for JsonFormatFactory { fn create( &self, - state: &SessionState, + state: &dyn Session, format_options: &HashMap, ) -> Result> { + let state = state.as_any().downcast_ref::().unwrap(); let json_options = match &self.options { None => { let mut table_options = state.default_table_options(); @@ -187,7 +189,7 @@ impl FileFormat for JsonFormat { async fn infer_schema( &self, - _state: &SessionState, + _state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -235,7 +237,7 @@ impl FileFormat for JsonFormat { async fn infer_stats( &self, - _state: &SessionState, + _state: &dyn Session, _store: &Arc, table_schema: SchemaRef, _object: &ObjectMeta, @@ -245,7 +247,7 @@ impl FileFormat for JsonFormat { async fn create_physical_plan( &self, - _state: &SessionState, + _state: &dyn Session, conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { @@ -257,7 +259,7 @@ impl FileFormat for JsonFormat { async fn create_writer_physical_plan( &self, input: Arc, - _state: &SessionState, + _state: &dyn Session, conf: FileSinkConfig, order_requirements: Option, ) -> Result> { diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 2e2e6dba1c0e..ce94098501c0 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -39,12 +39,12 @@ use std::task::Poll; use crate::arrow::datatypes::SchemaRef; use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; -use crate::error::Result; -use crate::execution::context::SessionState; -use crate::physical_plan::{ExecutionPlan, Statistics}; +use datafusion_common::error::Result; +use datafusion_physical_plan::{ExecutionPlan, Statistics}; use arrow_array::RecordBatch; use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema}; +use datafusion_catalog::Session; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{internal_err, not_impl_err, GetExt}; use datafusion_expr::Expr; @@ -65,7 +65,7 @@ pub trait FileFormatFactory: Sync + Send + GetExt + Debug { /// Initialize a [FileFormat] and configure based on session and command level options fn create( &self, - state: &SessionState, + state: &dyn Session, format_options: &HashMap, ) -> Result>; @@ -103,7 +103,7 @@ pub trait FileFormat: Send + Sync + Debug { /// the files have schemas that cannot be merged. async fn infer_schema( &self, - state: &SessionState, + state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result; @@ -117,7 +117,7 @@ pub trait FileFormat: Send + Sync + Debug { /// TODO: should the file source return statistics for only columns referred to in the table schema? async fn infer_stats( &self, - state: &SessionState, + state: &dyn Session, store: &Arc, table_schema: SchemaRef, object: &ObjectMeta, @@ -127,7 +127,7 @@ pub trait FileFormat: Send + Sync + Debug { /// according to this file format. async fn create_physical_plan( &self, - state: &SessionState, + state: &dyn Session, conf: FileScanConfig, filters: Option<&Arc>, ) -> Result>; @@ -137,7 +137,7 @@ pub trait FileFormat: Send + Sync + Debug { async fn create_writer_physical_plan( &self, _input: Arc, - _state: &SessionState, + _state: &dyn Session, _conf: FileSinkConfig, _order_requirements: Option, ) -> Result> { @@ -565,7 +565,7 @@ pub(crate) mod test_util { }; pub async fn scan_format( - state: &SessionState, + state: &dyn Session, format: &dyn FileFormat, store_root: &str, file_name: &str, diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 4c7169764a76..a1978d6699d0 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -48,6 +48,7 @@ use crate::physical_plan::{ }; use arrow::compute::sum; +use datafusion_catalog::Session; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; @@ -121,9 +122,10 @@ impl ParquetFormatFactory { impl FileFormatFactory for ParquetFormatFactory { fn create( &self, - state: &SessionState, + state: &dyn Session, format_options: &std::collections::HashMap, ) -> Result> { + let state = state.as_any().downcast_ref::().unwrap(); let parquet_options = match &self.options { None => { let mut table_options = state.default_table_options(); @@ -325,7 +327,7 @@ impl FileFormat for ParquetFormat { async fn infer_schema( &self, - state: &SessionState, + state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -378,7 +380,7 @@ impl FileFormat for ParquetFormat { async fn infer_stats( &self, - _state: &SessionState, + _state: &dyn Session, store: &Arc, table_schema: SchemaRef, object: &ObjectMeta, @@ -395,7 +397,7 @@ impl FileFormat for ParquetFormat { async fn create_physical_plan( &self, - _state: &SessionState, + _state: &dyn Session, conf: FileScanConfig, filters: Option<&Arc>, ) -> Result> { @@ -420,7 +422,7 @@ impl FileFormat for ParquetFormat { async fn create_writer_physical_plan( &self, input: Arc, - _state: &SessionState, + _state: &dyn Session, conf: FileSinkConfig, order_requirements: Option, ) -> Result> { @@ -2216,13 +2218,13 @@ mod tests { } async fn get_exec( - state: &SessionState, + state: &dyn Session, file_name: &str, projection: Option>, limit: Option, ) -> Result> { let testdata = crate::test_util::parquet_test_data(); - + let state = state.as_any().downcast_ref::().unwrap(); let format = state .get_file_format_factory("parquet") .map(|factory| factory.create(state, &Default::default()).unwrap()) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 9b9bcd22c464..0d011b29acc3 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -32,6 +32,7 @@ use crate::datasource::{ physical_plan::{FileScanConfig, FileSinkConfig}, }; use crate::execution::context::SessionState; + use datafusion_catalog::TableProvider; use datafusion_common::{config_err, DataFusionError, Result}; use datafusion_expr::dml::InsertOp; @@ -191,7 +192,7 @@ impl ListingTableConfig { } /// Infer the [`SchemaRef`] based on `table_path` suffix. Requires `self.options` to be set prior to using. - pub async fn infer_schema(self, state: &SessionState) -> Result { + pub async fn infer_schema(self, state: &dyn Session) -> Result { match self.options { Some(options) => { let schema = if let Some(url) = self.table_paths.first() { @@ -216,7 +217,7 @@ impl ListingTableConfig { } /// Infer the partition columns from the path. Requires `self.options` to be set prior to using. - pub async fn infer_partitions_from_path(self, state: &SessionState) -> Result { + pub async fn infer_partitions_from_path(self, state: &dyn Session) -> Result { match self.options { Some(options) => { let Some(url) = self.table_paths.first() else { @@ -484,7 +485,7 @@ impl ListingOptions { /// locally or ask a remote service to do it (e.g a scheduler). pub async fn infer_schema<'a>( &'a self, - state: &SessionState, + state: &dyn Session, table_path: &'a ListingTableUrl, ) -> Result { let store = state.runtime_env().object_store(table_path)?; @@ -509,7 +510,7 @@ impl ListingOptions { /// Allows specifying partial partitions. pub async fn validate_partitions( &self, - state: &SessionState, + state: &dyn Session, table_path: &ListingTableUrl, ) -> Result<()> { if self.table_partition_cols.is_empty() { @@ -563,7 +564,7 @@ impl ListingOptions { /// and therefore may fail to detect invalid partitioning. pub(crate) async fn infer_partitions( &self, - state: &SessionState, + state: &dyn Session, table_path: &ListingTableUrl, ) -> Result> { let store = state.runtime_env().object_store(table_path)?; @@ -1091,7 +1092,7 @@ impl ListingTable { /// be distributed to different threads / executors. async fn list_files_for_scan<'a>( &'a self, - ctx: &'a SessionState, + ctx: &'a dyn Session, filters: &'a [Expr], limit: Option, ) -> Result<(Vec>, Statistics)> { @@ -1152,7 +1153,7 @@ impl ListingTable { /// If they are not, it infers the statistics from the file and stores them in the cache. async fn do_collect_statistics( &self, - ctx: &SessionState, + ctx: &dyn Session, store: &Arc, part_file: &PartitionedFile, ) -> Result> { diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 18cda4524ab2..7f95b909ee89 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -28,13 +28,13 @@ use std::sync::Arc; use std::task::{Context, Poll}; use crate::datasource::listing::PartitionedFile; -use crate::datasource::physical_plan::file_scan_config::PartitionColumnProjector; use crate::datasource::physical_plan::{FileMeta, FileScanConfig}; use crate::error::Result; use crate::physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time, }; use crate::physical_plan::RecordBatchStream; +use datafusion_catalog_listing::file_scan_config::PartitionColumnProjector; use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 5bb7da8376a2..90e0b5bc6e4a 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -21,12 +21,10 @@ mod arrow_file; mod avro; mod csv; mod file_groups; -mod file_scan_config; mod file_stream; mod json; #[cfg(feature = "parquet")] pub mod parquet; -mod statistics; pub(crate) use self::csv::plan_to_csv; pub(crate) use self::json::plan_to_json; @@ -36,44 +34,30 @@ pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactor pub use arrow_file::ArrowExec; pub use avro::AvroExec; pub use csv::{CsvConfig, CsvExec, CsvExecBuilder, CsvOpener}; +pub use datafusion_catalog_listing::file_scan_config::*; use datafusion_expr::dml::InsertOp; pub use file_groups::FileGroupPartitioner; -pub use file_scan_config::{ - wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, -}; pub use file_stream::{FileOpenFuture, FileOpener, FileStream, OnError}; pub use json::{JsonOpener, NdJsonExec}; -use std::{ - fmt::{Debug, Formatter, Result as FmtResult}, - ops::Range, - sync::Arc, - vec, -}; +use std::{ops::Range, sync::Arc}; use super::{file_format::write::demux::start_demuxer_task, listing::ListingTableUrl}; use crate::datasource::file_format::write::demux::DemuxedStreamReceiver; -use crate::error::Result; -use crate::physical_plan::{DisplayAs, DisplayFormatType}; -use crate::{ - datasource::{ - listing::{FileRange, PartitionedFile}, - object_store::ObjectStoreUrl, - }, - physical_plan::display::{display_orderings, ProjectSchemaDisplay}, +use crate::datasource::{ + listing::{FileRange, PartitionedFile}, + object_store::ObjectStoreUrl, }; +use crate::error::Result; +use crate::physical_plan::DisplayAs; use arrow::datatypes::{DataType, SchemaRef}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::PhysicalSortExpr; -use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::insert::DataSink; use async_trait::async_trait; use futures::StreamExt; -use log::debug; use object_store::{path::Path, GetOptions, GetRange, ObjectMeta, ObjectStore}; /// General behaviors for files that do `DataSink` operations @@ -161,151 +145,6 @@ impl FileSinkConfig { } } -impl Debug for FileScanConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - write!(f, "object_store_url={:?}, ", self.object_store_url)?; - - write!(f, "statistics={:?}, ", self.statistics)?; - - DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f) - } -} - -impl DisplayAs for FileScanConfig { - fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { - let (schema, _, _, orderings) = self.project(); - - write!(f, "file_groups=")?; - FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; - - if !schema.fields().is_empty() { - write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; - } - - if let Some(limit) = self.limit { - write!(f, ", limit={limit}")?; - } - - display_orderings(f, &orderings)?; - - if !self.constraints.is_empty() { - write!(f, ", {}", self.constraints)?; - } - - Ok(()) - } -} - -/// A wrapper to customize partitioned file display -/// -/// Prints in the format: -/// ```text -/// {NUM_GROUPS groups: [[file1, file2,...], [fileN, fileM, ...], ...]} -/// ``` -#[derive(Debug)] -struct FileGroupsDisplay<'a>(&'a [Vec]); - -impl DisplayAs for FileGroupsDisplay<'_> { - fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { - let n_groups = self.0.len(); - let groups = if n_groups == 1 { "group" } else { "groups" }; - write!(f, "{{{n_groups} {groups}: [")?; - match t { - DisplayFormatType::Default => { - // To avoid showing too many partitions - let max_groups = 5; - fmt_up_to_n_elements(self.0, max_groups, f, |group, f| { - FileGroupDisplay(group).fmt_as(t, f) - })?; - } - DisplayFormatType::Verbose => { - fmt_elements_split_by_commas(self.0.iter(), f, |group, f| { - FileGroupDisplay(group).fmt_as(t, f) - })? - } - } - write!(f, "]}}") - } -} - -/// A wrapper to customize partitioned group of files display -/// -/// Prints in the format: -/// ```text -/// [file1, file2,...] -/// ``` -#[derive(Debug)] -pub(crate) struct FileGroupDisplay<'a>(pub &'a [PartitionedFile]); - -impl DisplayAs for FileGroupDisplay<'_> { - fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { - write!(f, "[")?; - match t { - DisplayFormatType::Default => { - // To avoid showing too many files - let max_files = 5; - fmt_up_to_n_elements(self.0, max_files, f, |pf, f| { - write!(f, "{}", pf.object_meta.location.as_ref())?; - if let Some(range) = pf.range.as_ref() { - write!(f, ":{}..{}", range.start, range.end)?; - } - Ok(()) - })? - } - DisplayFormatType::Verbose => { - fmt_elements_split_by_commas(self.0.iter(), f, |pf, f| { - write!(f, "{}", pf.object_meta.location.as_ref())?; - if let Some(range) = pf.range.as_ref() { - write!(f, ":{}..{}", range.start, range.end)?; - } - Ok(()) - })? - } - } - write!(f, "]") - } -} - -/// helper to format an array of up to N elements -fn fmt_up_to_n_elements( - elements: &[E], - n: usize, - f: &mut Formatter, - format_element: F, -) -> FmtResult -where - F: Fn(&E, &mut Formatter) -> FmtResult, -{ - let len = elements.len(); - fmt_elements_split_by_commas(elements.iter().take(n), f, |element, f| { - format_element(element, f) - })?; - // Remaining elements are showed as `...` (to indicate there is more) - if len > n { - write!(f, ", ...")?; - } - Ok(()) -} - -/// helper formatting array elements with a comma and a space between them -fn fmt_elements_split_by_commas( - iter: I, - f: &mut Formatter, - format_element: F, -) -> FmtResult -where - I: Iterator, - F: Fn(E, &mut Formatter) -> FmtResult, -{ - for (idx, element) in iter.enumerate() { - if idx > 0 { - write!(f, ", ")?; - } - format_element(element, f)?; - } - Ok(()) -} - /// A single file or part of a file that should be read, along with its schema, statistics pub struct FileMeta { /// Path for the file (e.g. URL, filesystem path, etc) @@ -336,131 +175,6 @@ impl From for FileMeta { } } -/// The various listing tables does not attempt to read all files -/// concurrently, instead they will read files in sequence within a -/// partition. This is an important property as it allows plans to -/// run against 1000s of files and not try to open them all -/// concurrently. -/// -/// However, it means if we assign more than one file to a partition -/// the output sort order will not be preserved as illustrated in the -/// following diagrams: -/// -/// When only 1 file is assigned to each partition, each partition is -/// correctly sorted on `(A, B, C)` -/// -/// ```text -///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓ -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐ -///┃ ┌───────────────┐ ┌──────────────┐ │ ┌──────────────┐ │ ┌─────────────┐ ┃ -/// │ │ 1.parquet │ │ │ │ 2.parquet │ │ │ 3.parquet │ │ │ 4.parquet │ │ -///┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ │Sort: A, B, C │ │ │Sort: A, B, C│ ┃ -/// │ └───────────────┘ │ │ └──────────────┘ │ └──────────────┘ │ └─────────────┘ │ -///┃ │ │ ┃ -/// │ │ │ │ │ │ -///┃ │ │ ┃ -/// │ │ │ │ │ │ -///┃ │ │ ┃ -/// │ │ │ │ │ │ -///┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ -/// DataFusion DataFusion DataFusion DataFusion -///┃ Partition 1 Partition 2 Partition 3 Partition 4 ┃ -/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ -/// -/// ParquetExec -///``` -/// -/// However, when more than 1 file is assigned to each partition, each -/// partition is NOT correctly sorted on `(A, B, C)`. Once the second -/// file is scanned, the same values for A, B and C can be repeated in -/// the same sorted stream -/// -///```text -///┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ -///┃ ┌───────────────┐ ┌──────────────┐ │ -/// │ │ 1.parquet │ │ │ │ 2.parquet │ ┃ -///┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ -/// │ └───────────────┘ │ │ └──────────────┘ ┃ -///┃ ┌───────────────┐ ┌──────────────┐ │ -/// │ │ 3.parquet │ │ │ │ 4.parquet │ ┃ -///┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ -/// │ └───────────────┘ │ │ └──────────────┘ ┃ -///┃ │ -/// │ │ │ ┃ -///┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ -/// DataFusion DataFusion ┃ -///┃ Partition 1 Partition 2 -/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛ -/// -/// ParquetExec -///``` -fn get_projected_output_ordering( - base_config: &FileScanConfig, - projected_schema: &SchemaRef, -) -> Vec { - let mut all_orderings = vec![]; - for output_ordering in &base_config.output_ordering { - let mut new_ordering = LexOrdering::default(); - for PhysicalSortExpr { expr, options } in output_ordering.iter() { - if let Some(col) = expr.as_any().downcast_ref::() { - let name = col.name(); - if let Some((idx, _)) = projected_schema.column_with_name(name) { - // Compute the new sort expression (with correct index) after projection: - new_ordering.push(PhysicalSortExpr { - expr: Arc::new(Column::new(name, idx)), - options: *options, - }); - continue; - } - } - // Cannot find expression in the projected_schema, stop iterating - // since rest of the orderings are violated - break; - } - - // do not push empty entries - // otherwise we may have `Some(vec![])` at the output ordering. - if new_ordering.is_empty() { - continue; - } - - // Check if any file groups are not sorted - if base_config.file_groups.iter().any(|group| { - if group.len() <= 1 { - // File groups with <= 1 files are always sorted - return false; - } - - let statistics = match statistics::MinMaxStatistics::new_from_files( - &new_ordering, - projected_schema, - base_config.projection.as_deref(), - group, - ) { - Ok(statistics) => statistics, - Err(e) => { - log::trace!("Error fetching statistics for file group: {e}"); - // we can't prove that it's ordered, so we have to reject it - return true; - } - }; - - !statistics.is_sorted() - }) { - debug!( - "Skipping specified output ordering {:?}. \ - Some file groups couldn't be determined to be sorted: {:?}", - base_config.output_ordering[0], base_config.file_groups - ); - continue; - } - - all_orderings.push(new_ordering); - } - all_orderings -} - /// Represents the possible outcomes of a range calculation. /// /// This enum is used to encapsulate the result of calculating the range of @@ -570,6 +284,10 @@ mod tests { use super::*; use crate::physical_plan::{DefaultDisplay, VerboseDisplay}; + use crate::datasource::physical_plan::FileGroupDisplay; + use crate::datasource::schema_adapter::{ + DefaultSchemaAdapterFactory, SchemaAdapterFactory, + }; use arrow_array::cast::AsArray; use arrow_array::types::{Float32Type, Float64Type, UInt32Type}; use arrow_array::{ @@ -577,10 +295,6 @@ mod tests { StringArray, UInt64Array, }; use arrow_schema::{Field, Schema}; - - use crate::datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapterFactory, - }; use chrono::Utc; #[test]