Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
executor sink
Browse files Browse the repository at this point in the history
  • Loading branch information
yashkothari42 committed Mar 27, 2024
1 parent d16fc04 commit 10d01f9
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 17 deletions.
2 changes: 1 addition & 1 deletion arrow-datafusion
50 changes: 41 additions & 9 deletions datafusion-integration/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use datafusion::error::Result;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion::prelude::{CsvReadOptions, SessionContext};
use std::sync::Arc;
mod util;
use vayu::{sinks, VayuExecutionEngine};

#[tokio::main]
async fn main() -> Result<()> {
// test_scan_filter_project().await?;
test_scan_filter_project().await?;
test_hash_join().await?;
test_store_record_batch().await?;
Ok(())
}

Expand Down Expand Up @@ -72,21 +73,52 @@ async fn test_scan_filter_project() -> Result<()> {
)
.await?;
let mut executor = VayuExecutionEngine::new();
let uuid = 1;
let sql1 = "SELECT c1,c4,c13 FROM aggregate_test_100 WHERE c3 < 0 AND c1='a'";
let sql = "SELECT c1,c3 as neg,c4 as pos,c13 FROM aggregate_test_100 WHERE (c3 < 0 AND c1='a') OR ( c4 > 0 AND c1='b' ) ";

let plan = util::get_execution_plan_from_sql(&ctx, sql).await?;

run_pipeline(&mut executor, plan, sinks::SchedulerSinkType::PrintOutput).await;

Ok(())
}

let plan1 = util::get_execution_plan_from_sql(&ctx, sql1).await?;
async fn test_store_record_batch() -> Result<()> {
// create local execution context
let ctx: SessionContext = SessionContext::new();
// register csv file with the execution context

ctx.register_csv(
"aggregate_test_100",
&format!("./testing/data/csv/aggregate_test_100.csv"),
CsvReadOptions::new(),
)
.await?;
let mut executor = VayuExecutionEngine::new();
let sql = "SELECT c1,c3 as neg,c4 as pos,c13 FROM aggregate_test_100 WHERE c3 < 0 AND c1='a' ";

let plan = util::get_execution_plan_from_sql(&ctx, sql).await?;
let uuid = 42;
run_pipeline(
&mut executor,
plan1,
plan,
sinks::SchedulerSinkType::StoreRecordBatch(uuid),
)
.await;
let sql =
"SELECT c1,c3 as neg,c4 as pos,c13 FROM aggregate_test_100 WHERE c4 > 0 AND c1='b' ";

let plan = util::get_execution_plan_from_sql(&ctx, sql).await?;
let uuid = 42;
run_pipeline(
&mut executor,
plan,
sinks::SchedulerSinkType::StoreRecordBatch(uuid),
)
.await;

// get executor to print the value
executor.sink(uuid);

let sql2 = "SELECT c1,c4,c13 FROM aggregate_test_100 WHERE c4 > 0 AND c1='b'";
let plan2 = util::get_execution_plan_from_sql(&ctx, sql2).await?;
run_pipeline(&mut executor, plan2, sinks::SchedulerSinkType::PrintOutput).await;
Ok(())
}

Expand Down
21 changes: 18 additions & 3 deletions vayu/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ pub mod sinks;

pub mod store;
use crate::sinks::SchedulerSinkType;
use crate::store::Blob::{HashMapBlob, RecordBatchBlob};
use crate::store::Store;
use core::panic;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;

pub struct VayuExecutionEngine {
pub store: Store,
}
Expand All @@ -26,7 +27,6 @@ impl VayuExecutionEngine {

pub fn execute(&mut self, scheduler_pipeline: SchedulerPipeline) {
let plan = scheduler_pipeline.plan;
let sink_type = scheduler_pipeline.sink;
// convert execution plan to a pipeline

let pipeline = Pipeline::new(plan, &mut self.store, 1);
Expand All @@ -37,7 +37,8 @@ impl VayuExecutionEngine {
// do the sinking - very simple API
// no need to create a seperate class and introduce indirection unless it moves out of hands
// to call one function we would need 30+ lines otherwise
match sink_type {
let sink: SchedulerSinkType = scheduler_pipeline.sink;
match sink {
SchedulerSinkType::PrintOutput => {
pretty::print_batches(&result).unwrap();
}
Expand All @@ -51,6 +52,20 @@ impl VayuExecutionEngine {
}
};
}
pub fn sink(&mut self, uuid: i32) {
let blob = self.store.remove(uuid);
match blob {
Some(blob) => match blob {
RecordBatchBlob(result) => {
pretty::print_batches(&result).unwrap();
}
HashMapBlob(results) => {
pretty::print_batches(&[results.batch().clone()]).unwrap();
}
},
None => panic!("no blob for {uuid} found"),
}
}
}
pub struct SchedulerPipeline {
pub plan: Arc<dyn ExecutionPlan>,
Expand Down
6 changes: 3 additions & 3 deletions vayu/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::operators::join::HashProbeOperator;
use crate::operators::projection::ProjectionOperator;
use crate::store::Store;
use arrow::array::BooleanBufferBuilder;
use core::panic;
use datafusion::arrow::array::RecordBatch;
use datafusion::datasource::physical_plan::CsvExec;
use datafusion::error::Result;
Expand Down Expand Up @@ -74,28 +75,27 @@ fn make_pipeline(pipeline: &mut Pipeline, plan: Arc<dyn ExecutionPlan>, store: &
make_pipeline(pipeline, exec.right().clone(), store);
println!("adding hashprobe");
let mut hashjoinstream = exec.get_hash_join_stream(0, context).unwrap();
// using uuid but this value would be present in HashProbeExec itself
let build_map = store.remove(pipeline.state.uuid).unwrap();
let left_data = Arc::new(build_map.get_map());
let visited_left_side = BooleanBufferBuilder::new(0);
hashjoinstream.build_side = BuildSide::Ready(BuildSideReadyState {
left_data,
visited_left_side,
});
// println!("{:?}", left_data);
let tt = Box::new(HashProbeOperator::new(hashjoinstream));
pipeline.operators.push(tt);
return;
}
if let Some(exec) = p.downcast_ref::<RepartitionExec>() {
make_pipeline(pipeline, exec.input().clone(), store);
return;
// pipeline.operators.push(FilterOperator::new())
}
if let Some(exec) = p.downcast_ref::<CoalesceBatchesExec>() {
make_pipeline(pipeline, exec.input().clone(), store);
return;
// pipeline.operators.push(FilterOperator::new())
}
panic!("should never reach the end");
}
pub trait PhysicalOperator {
fn name(&self) -> String;
Expand Down
2 changes: 1 addition & 1 deletion vayu/src/pipeline_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl PipelineExecutor {
operators: &mut Vec<Box<dyn IntermediateOperator>>,
mut data: RecordBatch,
) -> RecordBatch {
for mut x in operators {
for x in operators {
println!(
"running operator {} size {}x{}",
x.name(),
Expand Down

0 comments on commit 10d01f9

Please sign in to comment.