diff --git a/Cargo.lock b/Cargo.lock index 96ad2527f757..10f437c47ddb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1832,6 +1832,7 @@ dependencies = [ "chrono", "datafusion-catalog", "datafusion-common", + "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", "datafusion-physical-expr", @@ -1843,6 +1844,7 @@ dependencies = [ "itertools 0.14.0", "log", "object_store", + "rand 0.8.5", "tempfile", "tokio", "tokio-util", diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index ad97fda19d71..b3c864d77856 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -40,11 +40,13 @@ async-compression = { version = "0.4.0", features = [ "zstd", "tokio", ], optional = true } +async-trait = { workspace = true } bytes = { workspace = true } bzip2 = { version = "0.5.0", optional = true } chrono = { workspace = true } datafusion-catalog = { workspace = true } datafusion-common = { workspace = true, features = ["object_store"] } +datafusion-common-runtime = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-expr = { workspace = true } @@ -56,6 +58,7 @@ glob = "0.3.0" itertools = { workspace = true } log = { workspace = true } object_store = { workspace = true } +rand = { workspace = true } tokio = { workspace = true } tokio-util = { version = "0.7.4", features = ["io"], optional = true } url = { workspace = true } @@ -63,7 +66,6 @@ xz2 = { version = "0.1", optional = true, features = ["static"] } zstd = { version = "0.13", optional = true, default-features = false } [dev-dependencies] -async-trait = { workspace = true } tempfile = { workspace = true } [lints] diff --git a/datafusion/catalog-listing/src/file_sink_config.rs b/datafusion/catalog-listing/src/file_sink_config.rs new file mode 100644 index 000000000000..6087f930d3fe --- /dev/null +++ b/datafusion/catalog-listing/src/file_sink_config.rs @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver}; +use crate::{ListingTableUrl, PartitionedFile}; +use arrow::datatypes::{DataType, SchemaRef}; +use async_trait::async_trait; +use datafusion_common::Result; +use datafusion_common_runtime::SpawnedTask; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_expr::dml::InsertOp; +use datafusion_physical_plan::insert::DataSink; +use object_store::ObjectStore; +use std::sync::Arc; + +/// General behaviors for files that do `DataSink` operations +#[async_trait] +pub trait FileSink: DataSink { + /// Retrieves the file sink configuration. + fn config(&self) -> &FileSinkConfig; + + /// Spawns writer tasks and joins them to perform file writing operations. + /// Is a critical part of `FileSink` trait, since it's the very last step for `write_all`. + /// + /// This function handles the process of writing data to files by: + /// 1. Spawning tasks for writing data to individual files. + /// 2. Coordinating the tasks using a demuxer to distribute data among files. + /// 3. Collecting results using `tokio::join`, ensuring that all tasks complete successfully. + /// + /// # Parameters + /// - `context`: The execution context (`TaskContext`) that provides resources + /// like memory management and runtime environment. + /// - `demux_task`: A spawned task that handles demuxing, responsible for splitting + /// an input [`SendableRecordBatchStream`] into dynamically determined partitions. + /// See `start_demuxer_task()` + /// - `file_stream_rx`: A receiver that yields streams of record batches and their + /// corresponding file paths for writing. See `start_demuxer_task()` + /// - `object_store`: A handle to the object store where the files are written. + /// + /// # Returns + /// - `Result`: Returns the total number of rows written across all files. + async fn spawn_writer_tasks_and_join( + &self, + context: &Arc, + demux_task: SpawnedTask>, + file_stream_rx: DemuxedStreamReceiver, + object_store: Arc, + ) -> Result; + + /// File sink implementation of the [`DataSink::write_all`] method. + async fn write_all( + &self, + data: SendableRecordBatchStream, + context: &Arc, + ) -> Result { + let config = self.config(); + let object_store = context + .runtime_env() + .object_store(&config.object_store_url)?; + let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context); + self.spawn_writer_tasks_and_join( + context, + demux_task, + file_stream_rx, + object_store, + ) + .await + } +} + +/// The base configurations to provide when creating a physical plan for +/// writing to any given file format. +pub struct FileSinkConfig { + /// Object store URL, used to get an ObjectStore instance + pub object_store_url: ObjectStoreUrl, + /// A vector of [`PartitionedFile`] structs, each representing a file partition + pub file_groups: Vec, + /// Vector of partition paths + pub table_paths: Vec, + /// The schema of the output file + pub output_schema: SchemaRef, + /// A vector of column names and their corresponding data types, + /// representing the partitioning columns for the file + pub table_partition_cols: Vec<(String, DataType)>, + /// Controls how new data should be written to the file, determining whether + /// to append to, overwrite, or replace records in existing files. + pub insert_op: InsertOp, + /// Controls whether partition columns are kept for the file + pub keep_partition_by_columns: bool, + /// File extension without a dot(.) + pub file_extension: String, +} + +impl FileSinkConfig { + /// Get output schema + pub fn output_schema(&self) -> &SchemaRef { + &self.output_schema + } +} diff --git a/datafusion/catalog-listing/src/mod.rs b/datafusion/catalog-listing/src/mod.rs index 98c03253733e..786c27acb95e 100644 --- a/datafusion/catalog-listing/src/mod.rs +++ b/datafusion/catalog-listing/src/mod.rs @@ -20,8 +20,10 @@ pub mod file_compression_type; pub mod file_groups; +pub mod file_sink_config; pub mod helpers; pub mod url; +pub mod write; use chrono::TimeZone; use datafusion_common::Result; use datafusion_common::{ScalarValue, Statistics}; diff --git a/datafusion/core/src/datasource/file_format/write/demux.rs b/datafusion/catalog-listing/src/write/demux.rs similarity index 99% rename from datafusion/core/src/datasource/file_format/write/demux.rs rename to datafusion/catalog-listing/src/write/demux.rs index f3798847fe29..111d22060c0d 100644 --- a/datafusion/core/src/datasource/file_format/write/demux.rs +++ b/datafusion/catalog-listing/src/write/demux.rs @@ -22,10 +22,10 @@ use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; -use crate::datasource::listing::ListingTableUrl; -use crate::datasource::physical_plan::FileSinkConfig; -use crate::error::Result; -use crate::physical_plan::SendableRecordBatchStream; +use crate::url::ListingTableUrl; +use crate::write::FileSinkConfig; +use datafusion_common::error::Result; +use datafusion_physical_plan::SendableRecordBatchStream; use arrow::array::{ builder::UInt64Builder, cast::AsArray, downcast_dictionary_array, RecordBatch, diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs b/datafusion/catalog-listing/src/write/mod.rs similarity index 90% rename from datafusion/core/src/datasource/file_format/write/mod.rs rename to datafusion/catalog-listing/src/write/mod.rs index 8070887b92df..f581126095a7 100644 --- a/datafusion/core/src/datasource/file_format/write/mod.rs +++ b/datafusion/catalog-listing/src/write/mod.rs @@ -21,9 +21,9 @@ use std::io::Write; use std::sync::Arc; -use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::physical_plan::FileSinkConfig; -use crate::error::Result; +use crate::file_compression_type::FileCompressionType; +use crate::file_sink_config::FileSinkConfig; +use datafusion_common::error::Result; use arrow::array::RecordBatch; use arrow::datatypes::Schema; @@ -33,18 +33,18 @@ use object_store::path::Path; use object_store::ObjectStore; use tokio::io::AsyncWrite; -pub(crate) mod demux; -pub(crate) mod orchestration; +pub mod demux; +pub mod orchestration; /// A buffer with interior mutability shared by the SerializedFileWriter and /// ObjectStore writer #[derive(Clone)] -pub(crate) struct SharedBuffer { +pub struct SharedBuffer { /// The inner buffer for reading and writing /// /// The lock is used to obtain internal mutability, so no worry about the /// lock contention. - pub(crate) buffer: Arc>>, + pub buffer: Arc>>, } impl SharedBuffer { @@ -79,7 +79,7 @@ pub trait BatchSerializer: Sync + Send { /// with the specified compression. /// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure. /// Users can configure automatic cleanup with their cloud provider. -pub(crate) async fn create_writer( +pub async fn create_writer( file_compression_type: FileCompressionType, location: &Path, object_store: Arc, @@ -91,7 +91,7 @@ pub(crate) async fn create_writer( /// Converts table schema to writer schema, which may differ in the case /// of hive style partitioning where some columns are removed from the /// underlying files. -pub(crate) fn get_writer_schema(config: &FileSinkConfig) -> Arc { +pub fn get_writer_schema(config: &FileSinkConfig) -> Arc { if !config.table_partition_cols.is_empty() && !config.keep_partition_by_columns { let schema = config.output_schema(); let partition_names: Vec<_> = diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/catalog-listing/src/write/orchestration.rs similarity index 98% rename from datafusion/core/src/datasource/file_format/write/orchestration.rs rename to datafusion/catalog-listing/src/write/orchestration.rs index 75836d1b48b0..1364e7d9f236 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/catalog-listing/src/write/orchestration.rs @@ -23,8 +23,8 @@ use std::sync::Arc; use super::demux::DemuxedStreamReceiver; use super::{create_writer, BatchSerializer}; -use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::error::Result; +use crate::file_compression_type::FileCompressionType; +use datafusion_common::error::Result; use arrow::array::RecordBatch; use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError}; @@ -237,7 +237,7 @@ pub(crate) async fn stateless_serialize_and_write_files( /// Orchestrates multipart put of a dynamic number of output files from a single input stream /// for any statelessly serialized file type. That is, any file type for which each [RecordBatch] /// can be serialized independently of all other [RecordBatch]s. -pub(crate) async fn spawn_writer_tasks_and_join( +pub async fn spawn_writer_tasks_and_join( context: &Arc, serializer: Arc, compression: FileCompressionType, diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index e50043d31c15..dd48a9537187 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -28,8 +28,8 @@ pub mod json; pub mod options; #[cfg(feature = "parquet")] pub mod parquet; -pub mod write; pub use datafusion_catalog_listing::file_compression_type; +pub use datafusion_catalog_listing::write; use std::any::Any; use std::collections::{HashMap, VecDeque}; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index a40d8680b1a5..f08981605f2f 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -35,6 +35,12 @@ pub use self::parquet::source::ParquetSource; pub use self::parquet::{ ParquetExec, ParquetExecBuilder, ParquetFileMetrics, ParquetFileReaderFactory, }; +use crate::error::Result; +use crate::physical_plan::{DisplayAs, DisplayFormatType}; +use crate::{ + datasource::listing::{FileRange, PartitionedFile}, + physical_plan::display::{display_orderings, ProjectSchemaDisplay}, +}; #[allow(deprecated)] pub use arrow_file::ArrowExec; pub use arrow_file::ArrowSource; @@ -45,15 +51,17 @@ pub use avro::AvroSource; pub use csv::{CsvExec, CsvExecBuilder}; pub use csv::{CsvOpener, CsvSource}; pub use datafusion_catalog_listing::file_groups::FileGroupPartitioner; -use datafusion_expr::dml::InsertOp; +pub use datafusion_catalog_listing::file_sink_config::*; pub use file_scan_config::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, }; pub use file_stream::{FileOpenFuture, FileOpener, FileStream, OnError}; +use futures::StreamExt; #[allow(deprecated)] pub use json::NdJsonExec; pub use json::{JsonOpener, JsonSource}; - +use log::debug; +use object_store::{path::Path, GetOptions, GetRange, ObjectMeta, ObjectStore}; use std::{ fmt::{Debug, Formatter, Result as FmtResult}, ops::Range, @@ -61,115 +69,10 @@ use std::{ vec, }; -use super::{file_format::write::demux::start_demuxer_task, listing::ListingTableUrl}; -use crate::datasource::file_format::write::demux::DemuxedStreamReceiver; -use crate::error::Result; -use crate::physical_plan::{DisplayAs, DisplayFormatType}; -use crate::{ - datasource::{ - listing::{FileRange, PartitionedFile}, - object_store::ObjectStoreUrl, - }, - physical_plan::display::{display_orderings, ProjectSchemaDisplay}, -}; - -use arrow::datatypes::{DataType, SchemaRef}; -use datafusion_common_runtime::SpawnedTask; -use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use arrow::datatypes::SchemaRef; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_plan::insert::DataSink; - -use async_trait::async_trait; -use futures::StreamExt; -use log::debug; -use object_store::{path::Path, GetOptions, GetRange, ObjectMeta, ObjectStore}; - -/// General behaviors for files that do `DataSink` operations -#[async_trait] -pub trait FileSink: DataSink { - /// Retrieves the file sink configuration. - fn config(&self) -> &FileSinkConfig; - - /// Spawns writer tasks and joins them to perform file writing operations. - /// Is a critical part of `FileSink` trait, since it's the very last step for `write_all`. - /// - /// This function handles the process of writing data to files by: - /// 1. Spawning tasks for writing data to individual files. - /// 2. Coordinating the tasks using a demuxer to distribute data among files. - /// 3. Collecting results using `tokio::join`, ensuring that all tasks complete successfully. - /// - /// # Parameters - /// - `context`: The execution context (`TaskContext`) that provides resources - /// like memory management and runtime environment. - /// - `demux_task`: A spawned task that handles demuxing, responsible for splitting - /// an input [`SendableRecordBatchStream`] into dynamically determined partitions. - /// See `start_demuxer_task()` - /// - `file_stream_rx`: A receiver that yields streams of record batches and their - /// corresponding file paths for writing. See `start_demuxer_task()` - /// - `object_store`: A handle to the object store where the files are written. - /// - /// # Returns - /// - `Result`: Returns the total number of rows written across all files. - async fn spawn_writer_tasks_and_join( - &self, - context: &Arc, - demux_task: SpawnedTask>, - file_stream_rx: DemuxedStreamReceiver, - object_store: Arc, - ) -> Result; - - /// File sink implementation of the [`DataSink::write_all`] method. - async fn write_all( - &self, - data: SendableRecordBatchStream, - context: &Arc, - ) -> Result { - let config = self.config(); - let object_store = context - .runtime_env() - .object_store(&config.object_store_url)?; - let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context); - self.spawn_writer_tasks_and_join( - context, - demux_task, - file_stream_rx, - object_store, - ) - .await - } -} - -/// The base configurations to provide when creating a physical plan for -/// writing to any given file format. -pub struct FileSinkConfig { - /// Object store URL, used to get an ObjectStore instance - pub object_store_url: ObjectStoreUrl, - /// A vector of [`PartitionedFile`] structs, each representing a file partition - pub file_groups: Vec, - /// Vector of partition paths - pub table_paths: Vec, - /// The schema of the output file - pub output_schema: SchemaRef, - /// A vector of column names and their corresponding data types, - /// representing the partitioning columns for the file - pub table_partition_cols: Vec<(String, DataType)>, - /// Controls how new data should be written to the file, determining whether - /// to append to, overwrite, or replace records in existing files. - pub insert_op: InsertOp, - /// Controls whether partition columns are kept for the file - pub keep_partition_by_columns: bool, - /// File extension without a dot(.) - pub file_extension: String, -} - -impl FileSinkConfig { - /// Get output schema - pub fn output_schema(&self) -> &SchemaRef { - &self.output_schema - } -} impl Debug for FileScanConfig { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { @@ -586,7 +489,7 @@ mod tests { BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, RecordBatch, StringArray, UInt64Array, }; - use arrow::datatypes::{Field, Schema}; + use arrow::datatypes::{DataType, Field, Schema}; use crate::datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory,