Skip to content

Commit

Permalink
Merge branch 'main' into substrait-interval
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed May 24, 2024
2 parents bbdc1e4 + 3e4e09a commit db601ed
Show file tree
Hide file tree
Showing 239 changed files with 10,370 additions and 5,577 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ version = "38.0.0"
# for the inherited dependency but cannot do the reverse (override from true to false).
#
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
arrow = { version = "51.0.0", features = ["prettyprint"] }
arrow-array = { version = "51.0.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = { version = "51.0.0", default-features = false }
Expand Down Expand Up @@ -93,6 +96,7 @@ doc-comment = "0.3"
env_logger = "0.11"
futures = "0.3"
half = { version = "2.2.1", default-features = false }
hashbrown = { version = "0.14", features = ["raw"] }
indexmap = "2.0.0"
itertools = "0.12"
log = "^0.4"
Expand All @@ -101,6 +105,7 @@ object_store = { version = "0.9.1", default-features = false }
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
rand = "0.8"
regex = "1.8"
rstest = "0.19.0"
serde_json = "1"
sqlparser = { version = "0.45.0", features = ["visitor"] }
Expand Down
3 changes: 3 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions datafusion-cli/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

FROM rust:1.73-bullseye as builder
FROM rust:1.78-bookworm as builder

COPY . /usr/src/arrow-datafusion
COPY ./datafusion /usr/src/arrow-datafusion/datafusion
Expand All @@ -28,7 +28,7 @@ RUN rustup component add rustfmt

RUN cargo build --release

FROM debian:bullseye-slim
FROM debian:bookworm-slim

COPY --from=builder /usr/src/arrow-datafusion/datafusion-cli/target/release/datafusion-cli /usr/local/bin

Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
let store = get_object_store(&ctx.state(), scheme, url, &table_options).await?;

// Register the retrieved object store in the session context's runtime environment
ctx.runtime_env().register_object_store(url, store);
ctx.register_object_store(url, store);

Ok(())
}
Expand Down
8 changes: 6 additions & 2 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ enum ByteUnit {
}

impl ByteUnit {
fn multiplier(&self) -> usize {
fn multiplier(&self) -> u64 {
match self {
ByteUnit::Byte => 1,
ByteUnit::KiB => 1 << 10,
Expand Down Expand Up @@ -349,8 +349,12 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
let unit = byte_suffixes()
.get(suffix)
.ok_or_else(|| format!("Invalid memory pool size '{}'", size))?;
let memory_pool_size = usize::try_from(unit.multiplier())
.ok()
.and_then(|multiplier| num.checked_mul(multiplier))
.ok_or_else(|| format!("Memory pool size '{}' is too large", size))?;

Ok(num * unit.multiplier())
Ok(memory_pool_size)
} else {
Err(format!("Invalid memory pool size '{}'", size))
}
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ cargo run --example csv_sql
- [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file
- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files
- ['parquet_exec_visitor.rs'](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from Datafusion `Expr` and `LogicalPlan`
- [`pruning.rs`](examples/parquet_sql.rs): Use pruning to rule out files based on statistics
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
Expand Down
15 changes: 5 additions & 10 deletions datafusion-examples/examples/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::{cast::as_float64_array, ScalarValue};
use datafusion_expr::{
function::AccumulatorArgs, Accumulator, AggregateUDF, AggregateUDFImpl,
GroupsAccumulator, Signature,
function::{AccumulatorArgs, StateFieldsArgs},
Accumulator, AggregateUDF, AggregateUDFImpl, GroupsAccumulator, Signature,
};

/// This example shows how to use the full AggregateUDFImpl API to implement a user
Expand Down Expand Up @@ -92,21 +92,16 @@ impl AggregateUDFImpl for GeoMeanUdaf {
}

/// This is the description of the state. accumulator's state() must match the types here.
fn state_fields(
&self,
_name: &str,
value_type: DataType,
_ordering_fields: Vec<arrow_schema::Field>,
) -> Result<Vec<arrow_schema::Field>> {
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<arrow_schema::Field>> {
Ok(vec![
Field::new("prod", value_type, true),
Field::new("prod", args.return_type.clone(), true),
Field::new("n", DataType::UInt32, true),
])
}

/// Tell DataFusion that this aggregate supports the more performant `GroupsAccumulator`
/// which is used for cases when there are grouping columns in the query
fn groups_accumulator_supported(&self) -> bool {
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
true
}

Expand Down
28 changes: 12 additions & 16 deletions datafusion-examples/examples/advanced_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::{
arrow::{
array::{ArrayRef, Float32Array, Float64Array},
datatypes::DataType,
record_batch::RecordBatch,
},
logical_expr::Volatility,
};
use std::any::Any;
use std::sync::Arc;

use arrow::array::{new_null_array, Array, AsArray};
use arrow::array::{
new_null_array, Array, ArrayRef, AsArray, Float32Array, Float64Array,
};
use arrow::compute;
use arrow::datatypes::Float64Type;
use arrow::datatypes::{DataType, Float64Type};
use arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::logical_expr::Volatility;
use datafusion::prelude::*;
use datafusion_common::{internal_err, ScalarValue};
use datafusion_expr::{
ColumnarValue, FuncMonotonicity, ScalarUDF, ScalarUDFImpl, Signature,
};
use std::sync::Arc;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature};

/// This example shows how to use the full ScalarUDFImpl API to implement a user
/// defined function. As in the `simple_udf.rs` example, this struct implements
Expand Down Expand Up @@ -186,8 +181,9 @@ impl ScalarUDFImpl for PowUdf {
&self.aliases
}

fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
Ok(Some(vec![Some(true)]))
fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
// The POW function preserves the order of its argument.
Ok(input[0].sort_properties)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ async fn main() -> Result<()> {
let path = format!("s3://{bucket_name}");
let s3_url = Url::parse(&path).unwrap();
let arc_s3 = Arc::new(s3);
ctx.runtime_env()
.register_object_store(&s3_url, arc_s3.clone());
ctx.register_object_store(&s3_url, arc_s3.clone());

let path = format!("s3://{bucket_name}/test_data/");
let file_format = ParquetFormat::default().with_enable_pruning(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ async fn main() -> Result<()> {

let path = format!("s3://{bucket_name}");
let s3_url = Url::parse(&path).unwrap();
ctx.runtime_env()
.register_object_store(&s3_url, Arc::new(s3));
ctx.register_object_store(&s3_url, Arc::new(s3));

// cannot query the parquet files from this bucket because the path contains a whitespace
// and we don't support that yet
Expand Down
13 changes: 7 additions & 6 deletions datafusion-examples/examples/function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::result::Result as RResult;
use std::sync::Arc;

use datafusion::error::Result;
use datafusion::execution::context::{
FunctionFactory, RegisterFunction, SessionContext, SessionState,
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{exec_err, internal_err, DataFusionError};
use datafusion_expr::simplify::ExprSimplifyResult;
use datafusion_expr::simplify::SimplifyInfo;
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{CreateFunction, Expr, ScalarUDF, ScalarUDFImpl, Signature};
use std::result::Result as RResult;
use std::sync::Arc;

/// This example shows how to utilize [FunctionFactory] to implement simple
/// SQL-macro like functions using a `CREATE FUNCTION` statement. The same
Expand Down Expand Up @@ -156,8 +157,8 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
&[]
}

fn monotonicity(&self) -> Result<Option<datafusion_expr::FuncMonotonicity>> {
Ok(None)
fn output_ordering(&self, _input: &[ExprProperties]) -> Result<SortProperties> {
Ok(SortProperties::Unordered)
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/parquet_sql_multiple_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let local_fs = Arc::new(LocalFileSystem::default());

let u = url::Url::parse("file://./")?;
ctx.runtime_env().register_object_store(&u, local_fs);
ctx.register_object_store(&u, local_fs);

// Register a listing table - this will use all files in the directory as data sources
// for the query
Expand Down
140 changes: 140 additions & 0 deletions datafusion-examples/examples/plan_to_sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::error::Result;

use datafusion::prelude::*;
use datafusion::sql::unparser::expr_to_sql;
use datafusion_sql::unparser::dialect::CustomDialect;
use datafusion_sql::unparser::{plan_to_sql, Unparser};

/// This example demonstrates the programmatic construction of SQL strings using
/// the DataFusion Expr [`Expr`] and LogicalPlan [`LogicalPlan`] API.
///
///
/// The code in this example shows how to:
///
/// 1. [`simple_expr_to_sql_demo`]: Create a simple expression [`Exprs`] with
/// fluent API and convert to sql suitable for passing to another database
///
/// 2. [`simple_expr_to_sql_demo_no_escape`] Create a simple expression
/// [`Exprs`] with fluent API and convert to sql without escaping column names
/// more suitable for displaying to humans.
///
/// 3. [`simple_expr_to_sql_demo_escape_mysql_style`]" Create a simple
/// expression [`Exprs`] with fluent API and convert to sql escaping column
/// names in MySQL style.
///
/// 4. [`simple_plan_to_sql_demo`]: Create a simple logical plan using the
/// DataFrames API and convert to sql string.
///
/// 5. [`round_trip_plan_to_sql_demo`]: Create a logical plan from a SQL string, modify it using the
/// DataFrames API and convert it back to a sql string.
#[tokio::main]
async fn main() -> Result<()> {
// See how to evaluate expressions
simple_expr_to_sql_demo()?;
simple_expr_to_sql_demo_escape_mysql_style()?;
simple_plan_to_sql_demo().await?;
round_trip_plan_to_sql_demo().await?;
Ok(())
}

/// DataFusion can convert expressions to SQL, using column name escaping
/// PostgreSQL style.
fn simple_expr_to_sql_demo() -> Result<()> {
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
let sql = expr_to_sql(&expr)?.to_string();
assert_eq!(sql, r#"((a < 5) OR (a = 8))"#);
Ok(())
}

/// DataFusion can convert expressions to SQL without escaping column names using
/// using a custom dialect and an explicit unparser
fn simple_expr_to_sql_demo_escape_mysql_style() -> Result<()> {
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
let dialect = CustomDialect::new(Some('`'));
let unparser = Unparser::new(&dialect);
let sql = unparser.expr_to_sql(&expr)?.to_string();
assert_eq!(sql, r#"((`a` < 5) OR (`a` = 8))"#);
Ok(())
}

/// DataFusion can convert a logic plan created using the DataFrames API to read from a parquet file
/// to SQL, using column name escaping PostgreSQL style.
async fn simple_plan_to_sql_demo() -> Result<()> {
let ctx = SessionContext::new();

let testdata = datafusion::test_util::parquet_test_data();
let df = ctx
.read_parquet(
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?
.select_columns(&["id", "int_col", "double_col", "date_string_col"])?;

// Convert the data frame to a SQL string
let sql = plan_to_sql(df.logical_plan())?.to_string();

assert_eq!(
sql,
r#"SELECT "?table?".id, "?table?".int_col, "?table?".double_col, "?table?".date_string_col FROM "?table?""#
);

Ok(())
}

/// DataFusion can also be used to parse SQL, programmatically modify the query
/// (in this case adding a filter) and then and converting back to SQL.
async fn round_trip_plan_to_sql_demo() -> Result<()> {
let ctx = SessionContext::new();

let testdata = datafusion::test_util::parquet_test_data();

// register parquet file with the execution context
ctx.register_parquet(
"alltypes_plain",
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;

// create a logical plan from a SQL string and then programmatically add new filters
let df = ctx
// Use SQL to read some data from the parquet file
.sql(
"SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
FROM alltypes_plain",
)
.await?
// Add id > 1 and tinyint_col < double_col filter
.filter(
col("id")
.gt(lit(1))
.and(col("tinyint_col").lt(col("double_col"))),
)?;

let sql = plan_to_sql(df.logical_plan())?.to_string();
assert_eq!(
sql,
r#"SELECT alltypes_plain.int_col, alltypes_plain.double_col, CAST(alltypes_plain.date_string_col AS VARCHAR) FROM alltypes_plain WHERE ((alltypes_plain.id > 1) AND (alltypes_plain.tinyint_col < alltypes_plain.double_col))"#
);

Ok(())
}
Loading

0 comments on commit db601ed

Please sign in to comment.