diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index afa329ec7ef0..45b75c7a914e 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1228,6 +1228,10 @@ impl DefaultPhysicalPlanner { "Unsupported logical plan: Analyze must be root of the plan" ) } + LogicalPlan::Truncate(_) => { + // DataFusion is a read-only query engine, but also a library, so consumers may implement this + return not_impl_err!("Unsupported logical plan: Truncate"); + } }; Ok(exec_node) } diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 5db86631ae11..2847e2c83c0a 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -655,6 +655,11 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { "StructColumn": expr_vec_fmt!(struct_type_columns), }) } + LogicalPlan::Truncate(_) => { + json!({ + "Node Type": "Truncate" + }) + } } } } diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index 025bb7b289dc..95135f1740f7 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -71,6 +71,31 @@ impl Hash for CopyTo { } } +/// Operator that truncates the content of a table +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Truncate { + /// The table name + pub table_name: TableReference, + /// The schema of the output relation + pub output_schema: DFSchemaRef, +} + +impl Truncate { + /// Creates a new truncate statement with the output schema set empty. + pub fn new(table_name: TableReference) -> Self { + Self { + table_name, + + // The output schema is always empty + output_schema: make_empty_schema(), + } + } +} + +fn make_empty_schema() -> DFSchemaRef { + Arc::new(Schema::empty().try_into().unwrap()) +} + /// The operator that modifies the content of a database (adapted from /// substrait WriteRel) #[derive(Debug, Clone, PartialEq, Eq, Hash)] diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index f23f6e571a19..bc46859fc013 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use super::dml::CopyTo; use super::DdlStatement; use crate::builder::{change_redundant_column, unnest_with_options}; +use crate::dml::Truncate; use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction}; use crate::expr_rewriter::{ create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver, @@ -280,6 +281,8 @@ pub enum LogicalPlan { Unnest(Unnest), /// A variadic query (e.g. "Recursive CTEs") RecursiveQuery(RecursiveQuery), + /// Truncate a table + Truncate(Truncate), } impl Default for LogicalPlan { @@ -330,6 +333,7 @@ impl LogicalPlan { // we take the schema of the static term as the schema of the entire recursive query static_term.schema() } + LogicalPlan::Truncate(Truncate { output_schema, .. }) => output_schema, } } @@ -481,7 +485,8 @@ impl LogicalPlan { | LogicalPlan::Statement { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Values { .. } - | LogicalPlan::DescribeTable(_) => vec![], + | LogicalPlan::DescribeTable(_) + | LogicalPlan::Truncate(_) => vec![], } } @@ -598,7 +603,8 @@ impl LogicalPlan { | LogicalPlan::Copy(_) | LogicalPlan::Ddl(_) | LogicalPlan::DescribeTable(_) - | LogicalPlan::Unnest(_) => Ok(None), + | LogicalPlan::Unnest(_) + | LogicalPlan::Truncate(_) => Ok(None), } } @@ -768,6 +774,7 @@ impl LogicalPlan { // Update schema with unnested column type. unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options) } + LogicalPlan::Truncate(_) => Ok(self), } } @@ -1117,7 +1124,8 @@ impl LogicalPlan { LogicalPlan::EmptyRelation(_) | LogicalPlan::Ddl(_) | LogicalPlan::Statement(_) - | LogicalPlan::DescribeTable(_) => { + | LogicalPlan::DescribeTable(_) + | LogicalPlan::Truncate(_) => { // All of these plan types have no inputs / exprs so should not be called self.assert_no_expressions(expr)?; self.assert_no_inputs(inputs)?; @@ -1367,7 +1375,8 @@ impl LogicalPlan { | LogicalPlan::DescribeTable(_) | LogicalPlan::Prepare(_) | LogicalPlan::Statement(_) - | LogicalPlan::Extension(_) => None, + | LogicalPlan::Extension(_) + | LogicalPlan::Truncate(_) => None, } } @@ -1990,6 +1999,9 @@ impl LogicalPlan { write!(f, "Unnest: lists[{}] structs[{}]", expr_vec_fmt!(list_type_columns), expr_vec_fmt!(struct_type_columns)) + }, + LogicalPlan::Truncate(_) => { + write!(f, "Truncate") } } } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 0964fb601879..9d866d35bcf5 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -374,7 +374,8 @@ impl TreeNode for LogicalPlan { | LogicalPlan::Statement { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Values { .. } - | LogicalPlan::DescribeTable(_) => Transformed::no(self), + | LogicalPlan::DescribeTable(_) + | LogicalPlan::Truncate(_) => Transformed::no(self), }) } } @@ -527,7 +528,8 @@ impl LogicalPlan { | LogicalPlan::Ddl(_) | LogicalPlan::Copy(_) | LogicalPlan::DescribeTable(_) - | LogicalPlan::Prepare(_) => Ok(TreeNodeRecursion::Continue), + | LogicalPlan::Prepare(_) + | LogicalPlan::Truncate(_) => Ok(TreeNodeRecursion::Continue), } } @@ -739,7 +741,8 @@ impl LogicalPlan { | LogicalPlan::Ddl(_) | LogicalPlan::Copy(_) | LogicalPlan::DescribeTable(_) - | LogicalPlan::Prepare(_) => Transformed::no(self), + | LogicalPlan::Prepare(_) + | LogicalPlan::Truncate(_) => Transformed::no(self), }) } diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 1083114f6c3b..ed58515b5cd6 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -803,7 +803,8 @@ impl OptimizerRule for CommonSubexprEliminate { | LogicalPlan::Copy(_) | LogicalPlan::Unnest(_) | LogicalPlan::RecursiveQuery(_) - | LogicalPlan::Prepare(_) => { + | LogicalPlan::Prepare(_) + | LogicalPlan::Truncate(_) => { // This rule handles recursion itself in a `ApplyOrder::TopDown` like // manner. plan.map_children(|c| self.rewrite(c, config))? diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index e9032935e49c..cef1ab2b9d3b 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -349,7 +349,8 @@ fn optimize_projections( | LogicalPlan::RecursiveQuery(_) | LogicalPlan::Statement(_) | LogicalPlan::Values(_) - | LogicalPlan::DescribeTable(_) => { + | LogicalPlan::DescribeTable(_) + | LogicalPlan::Truncate(_) => { // These operators have no inputs, so stop the optimization process. return Ok(Transformed::no(plan)); } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index bf5394ec01de..18ecfbe09940 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1628,6 +1628,9 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::RecursiveQuery(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for RecursiveQuery", )), + LogicalPlan::Truncate(_) => Err(proto_error( + "LogicalPlan serde is not yet implemented for Truncate", + )), } } } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 6b434c1f85e4..3df128928624 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -37,7 +37,7 @@ use datafusion_common::{ DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, TableReference, ToDFSchema, }; -use datafusion_expr::dml::CopyTo; +use datafusion_expr::dml::{CopyTo, Truncate}; use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check; use datafusion_expr::logical_plan::builder::project; use datafusion_expr::logical_plan::DdlStatement; @@ -542,7 +542,21 @@ impl SqlToRel<'_, S> { } self.update_to_plan(table, assignments, from, selection) } + Statement::Truncate { + table_name, + partitions, + table, + } => { + if !table { + plan_err!("Truncate of non-tables not yet supported")?; + } + if partitions.is_some() { + plan_err!("Partition clause not supported")?; + } + + self.truncate_to_plan(table_name) + } Statement::Delete(Delete { tables, using, @@ -1499,6 +1513,15 @@ impl SqlToRel<'_, S> { Ok(plan) } + fn truncate_to_plan(&self, table_name: ObjectName) -> Result { + // Do a table lookup to verify the table exists + let table_ref = self.object_name_to_table_reference(table_name.clone())?; + let _ = self.context_provider.get_table_source(table_ref.clone())?; + + let plan = LogicalPlan::Truncate(Truncate::new(table_ref)); + Ok(plan) + } + fn show_columns_to_plan( &self, extended: bool, diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 802d4762574d..76d839d3b685 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -112,7 +112,8 @@ impl Unparser<'_> { | LogicalPlan::Copy(_) | LogicalPlan::DescribeTable(_) | LogicalPlan::RecursiveQuery(_) - | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"), + | LogicalPlan::Unnest(_) + | LogicalPlan::Truncate(_) => not_impl_err!("Unsupported plan: {plan:?}"), } }