Skip to content

Commit

Permalink
add example
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Jul 31, 2024
1 parent 0ad3e4f commit 5af79fa
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 0 deletions.
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ cargo run --example dataframe
- [`sql_analysis.rs`](examples/sql_analysis.rs): Analyse SQL queries with DataFusion structures
- [`sql_frontend.rs`](examples/sql_frontend.rs): Create LogicalPlans (only) from sql strings
- [`sql_dialect.rs`](examples/sql_dialect.rs): Example of implementing a custom SQL dialect on top of `DFParser`
- [`sql_to_physical_plan.rs](examples/sql_to_physical_plan.rs): Show how to convert a SQL query to a physical plan step-by-step
- [`to_char.rs`](examples/to_char.rs): Examples of using the to_char function
- [`to_timestamp.rs`](examples/to_timestamp.rs): Examples of using to_timestamp functions

Expand Down
127 changes: 127 additions & 0 deletions datafusion-examples/examples/sql_to_physical_plan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::error::Result;
use datafusion::physical_plan::displayable;
use datafusion::prelude::*;
use datafusion_expr::{LogicalPlan, PlanType};

/// This example demonstrates the process of converting logical plan
/// into physical execution plans using DataFusion.
///
/// Planning phase in DataFusion contains several steps:
/// 1. Analyzing and optimizing logical plan
/// 2. Converting logical plan into physical plan
///
/// The code in this example shows two ways to convert a logical plan into
/// physical plan:
/// - Via the combined `create_physical_plan` API.
/// - Utilizing the analyzer, optimizer, and query planner APIs separately.
#[tokio::main]
async fn main() -> Result<()> {
// Set up a DataFusion context and load a Parquet file
let ctx = SessionContext::new();
let testdata = datafusion::test_util::parquet_test_data();
let df = ctx
.read_parquet(
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;

// Construct the input logical plan using DataFrame API
let df = df
.clone()
.select(vec![
df.parse_sql_expr("int_col")?,
df.parse_sql_expr("double_col")?,
])?
.filter(df.parse_sql_expr("int_col < 5 OR double_col = 8.0")?)?
.aggregate(
vec![df.parse_sql_expr("double_col")?],
vec![df.parse_sql_expr("SUM(int_col) as sum_int_col")?],
)?
.limit(0, Some(1))?;
let logical_plan = df.logical_plan().clone();

to_physical_plan_in_one_api_demo(&logical_plan, &ctx).await?;

to_physical_plan_step_by_step_demo(logical_plan, &ctx).await?;

Ok(())
}

/// Converts a logical plan into a physical plan using the combined
/// `create_physical_plan` API. It will first optimize the logical
/// plan and then convert it into physical plan.
async fn to_physical_plan_in_one_api_demo(
input: &LogicalPlan,
ctx: &SessionContext,
) -> Result<()> {
let physical_plan = ctx.state().create_physical_plan(&input).await?;

println!(
"Physical plan direct from logical plan:\n\n{}\n\n",
displayable(physical_plan.as_ref())
.to_stringified(false, PlanType::InitialPhysicalPlan)
.plan
);

Ok(())
}

/// Converts a logical plan into a physical plan by utilizing the analyzer,
/// optimizer, and query planner APIs separately. This flavor gives more
/// control over the planning process.
async fn to_physical_plan_step_by_step_demo(
input: LogicalPlan,
ctx: &SessionContext,
) -> Result<()> {
// First analyze the logical plan
let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
input,
ctx.state().config_options(),
|_, _| (),
)?;
println!("Analyzed logical plan:\n\n{:?}\n\n", analyzed_logical_plan);

// Optimize the analyzed logical plan
let optimized_logical_plan = ctx.state().optimizer().optimize(
analyzed_logical_plan,
&ctx.state(),
|_, _| (),
)?;
println!(
"Optimized logical plan:\n\n{:?}\n\n",
optimized_logical_plan
);

// Create the physical plan
let physical_plan = ctx
.state()
.query_planner()
.create_physical_plan(&optimized_logical_plan, &ctx.state())
.await?;
println!(
"Final physical plan:\n\n{}\n\n",
displayable(physical_plan.as_ref())
.to_stringified(false, PlanType::InitialPhysicalPlan)
.plan
);

Ok(())
}

0 comments on commit 5af79fa

Please sign in to comment.