diff --git a/Cargo.lock b/Cargo.lock index 79910869f032..f2c94e9bfaa7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1892,8 +1892,10 @@ dependencies = [ "async-trait", "dashmap", "datafusion-common", + "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", + "datafusion-physical-expr", "datafusion-physical-plan", "datafusion-sql", "futures", diff --git a/datafusion/catalog/Cargo.toml b/datafusion/catalog/Cargo.toml index 4e1555b7827e..113d68825390 100644 --- a/datafusion/catalog/Cargo.toml +++ b/datafusion/catalog/Cargo.toml @@ -35,8 +35,10 @@ arrow = { workspace = true } async-trait = { workspace = true } dashmap = { workspace = true } datafusion-common = { workspace = true } +datafusion-common-runtime = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-physical-expr = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-sql = { workspace = true } futures = { workspace = true } @@ -44,8 +46,6 @@ itertools = { workspace = true } log = { workspace = true } object_store = { workspace = true } parking_lot = { workspace = true } - -[dev-dependencies] tokio = { workspace = true } [lints] diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs index e4ccba865189..7ba97fbc9faa 100644 --- a/datafusion/catalog/src/lib.rs +++ b/datafusion/catalog/src/lib.rs @@ -49,4 +49,6 @@ pub use r#async::*; pub use schema::*; pub use session::*; pub use table::*; +pub mod stream; pub mod streaming; +pub mod view; diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/catalog/src/stream.rs similarity index 98% rename from datafusion/core/src/datasource/stream.rs rename to datafusion/catalog/src/stream.rs index ffb4860544a1..3fb672490712 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/catalog/src/stream.rs @@ -25,9 +25,7 @@ use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; -use crate::catalog::{TableProvider, TableProviderFactory}; -use crate::datasource::create_ordering; - +use crate::{Session, TableProvider, TableProviderFactory}; use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter}; use arrow::datatypes::SchemaRef; use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result}; @@ -35,13 +33,13 @@ use datafusion_common_runtime::SpawnedTask; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType}; +use datafusion_physical_expr::create_ordering; use datafusion_physical_plan::insert::{DataSink, DataSinkExec}; use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder; use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use async_trait::async_trait; -use datafusion_catalog::Session; use futures::StreamExt; /// A [`TableProviderFactory`] for [`StreamTable`] @@ -292,7 +290,7 @@ impl StreamConfig { /// data stored in object storage, should instead consider [`ListingTable`]. /// /// [Hadoop]: https://hadoop.apache.org/ -/// [`ListingTable`]: crate::datasource::listing::ListingTable +/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html #[derive(Debug)] pub struct StreamTable(Arc); diff --git a/datafusion/catalog/src/view.rs b/datafusion/catalog/src/view.rs new file mode 100644 index 000000000000..8dfb79718c9b --- /dev/null +++ b/datafusion/catalog/src/view.rs @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! View data source which uses a LogicalPlan as it's input. + +use std::{any::Any, borrow::Cow, sync::Arc}; + +use crate::Session; +use crate::TableProvider; + +use arrow::datatypes::SchemaRef; +use async_trait::async_trait; +use datafusion_common::error::Result; +use datafusion_common::Column; +use datafusion_expr::TableType; +use datafusion_expr::{Expr, LogicalPlan}; +use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown}; +use datafusion_physical_plan::ExecutionPlan; + +/// An implementation of `TableProvider` that uses another logical plan. +#[derive(Debug)] +pub struct ViewTable { + /// LogicalPlan of the view + logical_plan: LogicalPlan, + /// File fields + partition columns + table_schema: SchemaRef, + /// SQL used to create the view, if available + definition: Option, +} + +impl ViewTable { + /// Create new view that is executed at query runtime. + /// + /// Takes a `LogicalPlan` and optionally the SQL text of the `CREATE` + /// statement. + /// + /// Notes: the `LogicalPlan` is not validated or type coerced. If this is + /// needed it should be done after calling this function. + pub fn new(logical_plan: LogicalPlan, definition: Option) -> Self { + let table_schema = logical_plan.schema().as_ref().to_owned().into(); + Self { + logical_plan, + table_schema, + definition, + } + } + + #[deprecated( + since = "47.0.0", + note = "Use `ViewTable::new` instead and apply TypeCoercion to the logical plan if needed" + )] + pub fn try_new( + logical_plan: LogicalPlan, + definition: Option, + ) -> Result { + Ok(Self::new(logical_plan, definition)) + } + + /// Get definition ref + pub fn definition(&self) -> Option<&String> { + self.definition.as_ref() + } + + /// Get logical_plan ref + pub fn logical_plan(&self) -> &LogicalPlan { + &self.logical_plan + } +} + +#[async_trait] +impl TableProvider for ViewTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_logical_plan(&self) -> Option> { + Some(Cow::Borrowed(&self.logical_plan)) + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.table_schema) + } + + fn table_type(&self) -> TableType { + TableType::View + } + + fn get_table_definition(&self) -> Option<&str> { + self.definition.as_deref() + } + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + // A filter is added on the View when given + Ok(vec![TableProviderFilterPushDown::Exact; filters.len()]) + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result> { + let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new)); + let plan = self.logical_plan().clone(); + let mut plan = LogicalPlanBuilder::from(plan); + + if let Some(filter) = filter { + plan = plan.filter(filter)?; + } + + let mut plan = if let Some(projection) = projection { + // avoiding adding a redundant projection (e.g. SELECT * FROM view) + let current_projection = + (0..plan.schema().fields().len()).collect::>(); + if projection == ¤t_projection { + plan + } else { + let fields: Vec = projection + .iter() + .map(|i| { + Expr::Column(Column::from( + self.logical_plan.schema().qualified_field(*i), + )) + }) + .collect(); + plan.project(fields)? + } + } else { + plan + }; + + if let Some(limit) = limit { + plan = plan.limit(0, Some(limit))?; + } + + state.create_physical_plan(&plan.build()?).await + } +} diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 18a1318dd40d..80783b4892c7 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -30,9 +30,10 @@ pub mod memory; pub mod physical_plan; pub mod provider; mod statistics; -pub mod stream; -pub mod view; +mod view_test; +pub use datafusion_catalog::stream; +pub use datafusion_catalog::view; pub use datafusion_datasource::schema_adapter; pub use datafusion_datasource::source; @@ -45,95 +46,9 @@ pub use self::view::ViewTable; pub use crate::catalog::TableProvider; pub use crate::logical_expr::TableType; pub use datafusion_execution::object_store; +pub use datafusion_physical_expr::create_ordering; pub use statistics::get_statistics_with_limit; -use arrow::compute::SortOptions; -use arrow::datatypes::Schema; -use datafusion_common::{plan_err, Result}; -use datafusion_expr::{Expr, SortExpr}; -use datafusion_physical_expr::{expressions, LexOrdering, PhysicalSortExpr}; - -/// Converts logical sort expressions to physical sort expressions -/// -/// This function transforms a collection of logical sort expressions into their physical -/// representation that can be used during query execution. -/// -/// # Arguments -/// -/// * `schema` - The schema containing column definitions -/// * `sort_order` - A collection of logical sort expressions grouped into lexicographic orderings -/// -/// # Returns -/// -/// A vector of lexicographic orderings for physical execution, or an error if the transformation fails -/// -/// # Examples -/// -/// ``` -/// // Create orderings from columns "id" and "name" -/// # use arrow::datatypes::{Schema, Field, DataType}; -/// # use datafusion::datasource::create_ordering; -/// # use datafusion_common::Column; -/// # use datafusion_expr::{Expr, SortExpr}; -/// # -/// // Create a schema with two fields -/// let schema = Schema::new(vec![ -/// Field::new("id", DataType::Int32, false), -/// Field::new("name", DataType::Utf8, false), -/// ]); -/// -/// let sort_exprs = vec![ -/// vec![ -/// SortExpr { expr: Expr::Column(Column::new(Some("t"), "id")), asc: true, nulls_first: false } -/// ], -/// vec![ -/// SortExpr { expr: Expr::Column(Column::new(Some("t"), "name")), asc: false, nulls_first: true } -/// ] -/// ]; -/// let result = create_ordering(&schema, &sort_exprs).unwrap(); -/// ``` -pub fn create_ordering( - schema: &Schema, - sort_order: &[Vec], -) -> Result> { - let mut all_sort_orders = vec![]; - - for (group_idx, exprs) in sort_order.iter().enumerate() { - // Construct PhysicalSortExpr objects from Expr objects: - let mut sort_exprs = LexOrdering::default(); - for (expr_idx, sort) in exprs.iter().enumerate() { - match &sort.expr { - Expr::Column(col) => match expressions::col(&col.name, schema) { - Ok(expr) => { - sort_exprs.push(PhysicalSortExpr { - expr, - options: SortOptions { - descending: !sort.asc, - nulls_first: sort.nulls_first, - }, - }); - } - // Cannot find expression in the projected_schema, stop iterating - // since rest of the orderings are violated - Err(_) => break, - }, - expr => { - return plan_err!( - "Expected single column reference in sort_order[{}][{}], got {}", - group_idx, - expr_idx, - expr - ); - } - } - } - if !sort_exprs.is_empty() { - all_sort_orders.push(sort_exprs); - } - } - Ok(all_sort_orders) -} - #[cfg(all(test, feature = "parquet"))] mod tests { diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view_test.rs similarity index 78% rename from datafusion/core/src/datasource/view.rs rename to datafusion/core/src/datasource/view_test.rs index e4f57b0d9798..c3dd5a2dd979 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view_test.rs @@ -17,157 +17,14 @@ //! View data source which uses a LogicalPlan as it's input. -use std::{any::Any, borrow::Cow, sync::Arc}; - -use crate::{ - error::Result, - logical_expr::{Expr, LogicalPlan}, - physical_plan::ExecutionPlan, -}; -use arrow::datatypes::SchemaRef; -use async_trait::async_trait; -use datafusion_catalog::Session; -use datafusion_common::config::ConfigOptions; -use datafusion_common::Column; -use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown}; -use datafusion_optimizer::analyzer::type_coercion::TypeCoercion; -use datafusion_optimizer::Analyzer; - -use crate::datasource::{TableProvider, TableType}; - -/// An implementation of `TableProvider` that uses another logical plan. -#[derive(Debug)] -pub struct ViewTable { - /// LogicalPlan of the view - logical_plan: LogicalPlan, - /// File fields + partition columns - table_schema: SchemaRef, - /// SQL used to create the view, if available - definition: Option, -} - -impl ViewTable { - /// Create new view that is executed at query runtime. - /// Takes a `LogicalPlan` and an optional create statement as input. - pub fn try_new( - logical_plan: LogicalPlan, - definition: Option, - ) -> Result { - let logical_plan = Self::apply_required_rule(logical_plan)?; - let table_schema = logical_plan.schema().as_ref().to_owned().into(); - - let view = Self { - logical_plan, - table_schema, - definition, - }; - - Ok(view) - } - - fn apply_required_rule(logical_plan: LogicalPlan) -> Result { - let options = ConfigOptions::default(); - Analyzer::with_rules(vec![Arc::new(TypeCoercion::new())]).execute_and_check( - logical_plan, - &options, - |_, _| {}, - ) - } - - /// Get definition ref - pub fn definition(&self) -> Option<&String> { - self.definition.as_ref() - } - - /// Get logical_plan ref - pub fn logical_plan(&self) -> &LogicalPlan { - &self.logical_plan - } -} - -#[async_trait] -impl TableProvider for ViewTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn get_logical_plan(&self) -> Option> { - Some(Cow::Borrowed(&self.logical_plan)) - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.table_schema) - } - - fn table_type(&self) -> TableType { - TableType::View - } - - fn get_table_definition(&self) -> Option<&str> { - self.definition.as_deref() - } - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> Result> { - // A filter is added on the View when given - Ok(vec![TableProviderFilterPushDown::Exact; filters.len()]) - } - - async fn scan( - &self, - state: &dyn Session, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> Result> { - let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new)); - let plan = self.logical_plan().clone(); - let mut plan = LogicalPlanBuilder::from(plan); - - if let Some(filter) = filter { - plan = plan.filter(filter)?; - } - - let mut plan = if let Some(projection) = projection { - // avoiding adding a redundant projection (e.g. SELECT * FROM view) - let current_projection = - (0..plan.schema().fields().len()).collect::>(); - if projection == ¤t_projection { - plan - } else { - let fields: Vec = projection - .iter() - .map(|i| { - Expr::Column(Column::from( - self.logical_plan.schema().qualified_field(*i), - )) - }) - .collect(); - plan.project(fields)? - } - } else { - plan - }; - - if let Some(limit) = limit { - plan = plan.limit(0, Some(limit))?; - } - - state.create_physical_plan(&plan.build()?).await - } -} - #[cfg(test)] mod tests { - use datafusion_expr::{col, lit}; - + use crate::error::Result; use crate::execution::options::ParquetReadOptions; use crate::prelude::SessionContext; use crate::test_util::parquet_test_data; use crate::{assert_batches_eq, execution::context::SessionConfig}; - - use super::*; + use datafusion_expr::{col, lit}; #[tokio::test] async fn issue_3242() -> Result<()> { diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index beefca6d572d..faf689179eca 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -75,9 +75,12 @@ use chrono::{DateTime, Utc}; use datafusion_catalog::{ DynamicFileCatalog, SessionStore, TableFunction, TableFunctionImpl, UrlTableFactory, }; +use datafusion_common::config::ConfigOptions; pub use datafusion_execution::config::SessionConfig; pub use datafusion_execution::TaskContext; pub use datafusion_expr::execution_props::ExecutionProps; +use datafusion_optimizer::analyzer::type_coercion::TypeCoercion; +use datafusion_optimizer::Analyzer; use datafusion_optimizer::{AnalyzerRule, OptimizerRule}; use object_store::ObjectStore; use parking_lot::RwLock; @@ -856,6 +859,16 @@ impl SessionContext { } } + /// Applies the `TypeCoercion` rewriter to the logical plan. + fn apply_type_coercion(logical_plan: LogicalPlan) -> Result { + let options = ConfigOptions::default(); + Analyzer::with_rules(vec![Arc::new(TypeCoercion::new())]).execute_and_check( + logical_plan, + &options, + |_, _| {}, + ) + } + async fn create_view(&self, cmd: CreateView) -> Result { let CreateView { name, @@ -874,13 +887,14 @@ impl SessionContext { match (or_replace, view) { (true, Ok(_)) => { self.deregister_table(name.clone())?; - let table = Arc::new(ViewTable::try_new((*input).clone(), definition)?); - + let input = Self::apply_type_coercion(input.as_ref().clone())?; + let table = Arc::new(ViewTable::new(input, definition)); self.register_table(name, table)?; self.return_empty_dataframe() } (_, Err(_)) => { - let table = Arc::new(ViewTable::try_new((*input).clone(), definition)?); + let input = Self::apply_type_coercion(input.as_ref().clone())?; + let table = Arc::new(ViewTable::new(input, definition)); self.register_table(name, table)?; self.return_empty_dataframe() } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index e52c686cd948..9abaeae4408e 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -54,8 +54,8 @@ pub use equivalence::{ }; pub use partitioning::{Distribution, Partitioning}; pub use physical_expr::{ - physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal, - PhysicalExprRef, + create_ordering, physical_exprs_bag_equal, physical_exprs_contains, + physical_exprs_equal, PhysicalExprRef, }; pub use datafusion_physical_expr_common::physical_expr::PhysicalExpr; diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index a4184845a0de..2221bc980f6c 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -58,6 +58,94 @@ pub fn physical_exprs_bag_equal( multi_set_lhs == multi_set_rhs } +use crate::{expressions, LexOrdering, PhysicalSortExpr}; +use arrow::compute::SortOptions; +use arrow::datatypes::Schema; +use datafusion_common::plan_err; +use datafusion_common::Result; +use datafusion_expr::{Expr, SortExpr}; + +/// Converts logical sort expressions to physical sort expressions +/// +/// This function transforms a collection of logical sort expressions into their physical +/// representation that can be used during query execution. +/// +/// # Arguments +/// +/// * `schema` - The schema containing column definitions +/// * `sort_order` - A collection of logical sort expressions grouped into lexicographic orderings +/// +/// # Returns +/// +/// A vector of lexicographic orderings for physical execution, or an error if the transformation fails +/// +/// # Examples +/// +/// ``` +/// // Create orderings from columns "id" and "name" +/// # use arrow::datatypes::{Schema, Field, DataType}; +/// # use datafusion_physical_expr::create_ordering; +/// # use datafusion_common::Column; +/// # use datafusion_expr::{Expr, SortExpr}; +/// # +/// // Create a schema with two fields +/// let schema = Schema::new(vec![ +/// Field::new("id", DataType::Int32, false), +/// Field::new("name", DataType::Utf8, false), +/// ]); +/// +/// let sort_exprs = vec![ +/// vec![ +/// SortExpr { expr: Expr::Column(Column::new(Some("t"), "id")), asc: true, nulls_first: false } +/// ], +/// vec![ +/// SortExpr { expr: Expr::Column(Column::new(Some("t"), "name")), asc: false, nulls_first: true } +/// ] +/// ]; +/// let result = create_ordering(&schema, &sort_exprs).unwrap(); +/// ``` +pub fn create_ordering( + schema: &Schema, + sort_order: &[Vec], +) -> Result> { + let mut all_sort_orders = vec![]; + + for (group_idx, exprs) in sort_order.iter().enumerate() { + // Construct PhysicalSortExpr objects from Expr objects: + let mut sort_exprs = LexOrdering::default(); + for (expr_idx, sort) in exprs.iter().enumerate() { + match &sort.expr { + Expr::Column(col) => match expressions::col(&col.name, schema) { + Ok(expr) => { + sort_exprs.push(PhysicalSortExpr { + expr, + options: SortOptions { + descending: !sort.asc, + nulls_first: sort.nulls_first, + }, + }); + } + // Cannot find expression in the projected_schema, stop iterating + // since rest of the orderings are violated + Err(_) => break, + }, + expr => { + return plan_err!( + "Expected single column reference in sort_order[{}][{}], got {}", + group_idx, + expr_idx, + expr + ); + } + } + } + if !sort_exprs.is_empty() { + all_sort_orders.push(sort_exprs); + } + } + Ok(all_sort_orders) +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 148856cd103c..c65569ef1cfb 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -867,7 +867,7 @@ impl AsLogicalPlan for LogicalPlanNode { None }; - let provider = ViewTable::try_new(input, definition)?; + let provider = ViewTable::new(input, definition); let table_name = from_table_reference(scan.table_name.as_ref(), "ViewScan")?;