From e1502873e06f5be0e01c1eee1940ec97f1d21c22 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Sun, 25 Feb 2024 11:56:46 -0500 Subject: [PATCH 1/8] create hash join skeleton --- .../src/execution/operators/hash_join.rs | 49 +++++++++++++++++++ eggstrain/src/execution/operators/mod.rs | 1 + 2 files changed, 50 insertions(+) create mode 100644 eggstrain/src/execution/operators/hash_join.rs diff --git a/eggstrain/src/execution/operators/hash_join.rs b/eggstrain/src/execution/operators/hash_join.rs new file mode 100644 index 0000000..e9380d4 --- /dev/null +++ b/eggstrain/src/execution/operators/hash_join.rs @@ -0,0 +1,49 @@ +use super::{Operator, BinaryOperator}; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_common::Result; +use std::sync::Arc; +use tokio::sync::broadcast; +use tokio::sync::broadcast::error::RecvError; + +/// TODO docs +pub struct HashJoin { + pub _todo: bool, + pub children: Vec>, +} + +/// TODO docs +impl HashJoin { + pub fn new() -> Self { + Self { _todo: false, children: vec![] } + } +} + +/// TODO docs +impl Operator for HashJoin { + fn children(&self) -> Vec> { + self.children.clone() + } +} + +/// TODO docs +#[async_trait] +impl BinaryOperator for HashJoin { + type InLeft = RecordBatch; + type InRight = RecordBatch; + type Out = RecordBatch; + + fn into_binary(self) -> Arc> { + Arc::new(self) + } + + async fn execute( + &self, + rx_left: broadcast::Receiver, + rx_right: broadcast::Receiver, + tx: broadcast::Sender, + ) { + todo!() + } +} diff --git a/eggstrain/src/execution/operators/mod.rs b/eggstrain/src/execution/operators/mod.rs index 603e02b..100f540 100644 --- a/eggstrain/src/execution/operators/mod.rs +++ b/eggstrain/src/execution/operators/mod.rs @@ -5,6 +5,7 @@ use tokio::sync::broadcast::{Receiver, Sender}; pub mod filter; pub mod project; +pub mod hash_join; /// Defines shared behavior for all operators /// From 5d6092d4aee4b005216c37b6ab347a6fcb7143b0 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Sun, 25 Feb 2024 14:09:13 -0500 Subject: [PATCH 2/8] add helper data structures --- eggstrain/src/execution/record_buffer.rs | 69 ++++++++++++++++++++++++ eggstrain/src/execution/record_table.rs | 54 +++++++++++++++++++ 2 files changed, 123 insertions(+) create mode 100644 eggstrain/src/execution/record_buffer.rs create mode 100644 eggstrain/src/execution/record_table.rs diff --git a/eggstrain/src/execution/record_buffer.rs b/eggstrain/src/execution/record_buffer.rs new file mode 100644 index 0000000..97de637 --- /dev/null +++ b/eggstrain/src/execution/record_buffer.rs @@ -0,0 +1,69 @@ +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; + +/// Make this an opaque type that can be used later, we may want to make this contiguous and then +/// use offsets instead. +#[derive(Clone, Copy)] +pub struct RecordIndex { + index: u32, + row: u32, // by default this is just 0 +} + +/// TODO docs +impl RecordIndex { + pub fn new(index: u32, row: u32) -> Self { + Self { index, row } + } + + pub fn update_row(&mut self, row: u32) { + self.row = row; + } +} + +pub struct RecordBuffer { + schema: SchemaRef, // The schema for all of the record batches + inner: Vec, // make this contiguous +} + +impl RecordBuffer { + pub fn new(schema: SchemaRef) -> Self { + Self { + schema, + inner: vec![], + } + } + + pub fn with_capacity(schema: SchemaRef, capacity: usize) -> Self { + Self { + schema, + inner: Vec::with_capacity(capacity), + } + } + + pub fn insert(&mut self, batch: RecordBatch) -> RecordIndex { + assert_eq!( + self.schema, + batch.schema(), + "Trying to insert a RecordBatch into a RecordBuffer with the incorrect schema" + ); + assert!( + (self.inner.len() as u32) < u32::MAX - 1, + "Maximum size for a RecordBuffer is u32::MAX" + ); + + self.inner.push(batch); + + RecordIndex { + index: (self.inner.len() - 1) as u32, + row: 0, + } + } + + /// Retrieve the batch and row number associated with the RecordIndex + pub fn get(&self, index: RecordIndex) -> Option<(&RecordBatch, u32)> { + if (index.index as usize) >= self.inner.len() { + return None; + } + + Some((&self.inner[index.index as usize], index.row)) + } +} diff --git a/eggstrain/src/execution/record_table.rs b/eggstrain/src/execution/record_table.rs new file mode 100644 index 0000000..4ce6474 --- /dev/null +++ b/eggstrain/src/execution/record_table.rs @@ -0,0 +1,54 @@ +use super::record_buffer::{RecordBuffer, RecordIndex}; +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use std::collections::HashMap; // TODO replace with a raw table + +pub struct RecordTable { + /// Maps a Hash value to a `RecordIndex` into the `RecordBuffer` + inner: HashMap>, + buffer: RecordBuffer, +} + +impl RecordTable { + pub fn new(schema: SchemaRef) -> Self { + Self { + buffer: RecordBuffer::new(schema), + inner: HashMap::new(), + } + } + + pub fn with_capacity(schema: SchemaRef, map_capacity: usize, buffer_capacity: usize) -> Self { + Self { + buffer: RecordBuffer::with_capacity(schema, buffer_capacity), + inner: HashMap::with_capacity(map_capacity), + } + } + + pub fn insert_batch(&mut self, batch: RecordBatch, hashes: Vec) { + assert_eq!( + batch.num_rows(), + hashes.len(), + "There should be an equal amount of batch rows and hashed values" + ); + + // Points to the location of the base of the record batch + let base_record_id = self.buffer.insert(batch); + + for (row, &hash) in hashes.iter().enumerate() { + // Given the row, we can create a record id for a specific tuple by updating the row + // from the base_record_id + let mut record_id = base_record_id; + record_id.update_row(row as u32); + + // Insert the record into the hashtable bucket + self.inner.entry(hash).or_default().push(record_id) + } + } + + pub fn get_records(&self, hash: usize) -> Option<&Vec> { + self.inner.get(&hash) + } + + pub fn get(&self, index: RecordIndex) -> Option<(&RecordBatch, u32)> { + self.buffer.get(index) + } +} From ad04f610633be365c6049ca052b02655a45792bd Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Sun, 25 Feb 2024 14:09:39 -0500 Subject: [PATCH 3/8] format --- eggstrain/src/execution/operators/filter.rs | 5 ++++- eggstrain/src/execution/query_dag.rs | 3 +-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/eggstrain/src/execution/operators/filter.rs b/eggstrain/src/execution/operators/filter.rs index d771c18..286423e 100644 --- a/eggstrain/src/execution/operators/filter.rs +++ b/eggstrain/src/execution/operators/filter.rs @@ -18,7 +18,10 @@ pub(crate) struct Filter { /// TODO docs impl Filter { - pub(crate) fn new(predicate: Arc, children: Vec>) -> Self { + pub(crate) fn new( + predicate: Arc, + children: Vec>, + ) -> Self { Self { predicate, children, diff --git a/eggstrain/src/execution/query_dag.rs b/eggstrain/src/execution/query_dag.rs index 354fb1f..dc3eb36 100644 --- a/eggstrain/src/execution/query_dag.rs +++ b/eggstrain/src/execution/query_dag.rs @@ -121,8 +121,7 @@ fn datafusion_execute(plan: Arc, tx: broadcast::Sender Date: Sun, 25 Feb 2024 14:09:48 -0500 Subject: [PATCH 4/8] hash join progress --- eggstrain/src/execution/mod.rs | 2 + .../src/execution/operators/hash_join.rs | 109 ++++++++++++++++-- eggstrain/src/execution/operators/mod.rs | 2 +- 3 files changed, 104 insertions(+), 9 deletions(-) diff --git a/eggstrain/src/execution/mod.rs b/eggstrain/src/execution/mod.rs index 0492aff..c439a44 100644 --- a/eggstrain/src/execution/mod.rs +++ b/eggstrain/src/execution/mod.rs @@ -1,2 +1,4 @@ pub mod operators; pub mod query_dag; +pub mod record_buffer; +pub mod record_table; diff --git a/eggstrain/src/execution/operators/hash_join.rs b/eggstrain/src/execution/operators/hash_join.rs index e9380d4..25958a7 100644 --- a/eggstrain/src/execution/operators/hash_join.rs +++ b/eggstrain/src/execution/operators/hash_join.rs @@ -1,6 +1,9 @@ -use super::{Operator, BinaryOperator}; +use super::{BinaryOperator, Operator}; +use crate::execution::record_table::RecordTable; +use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::physical_expr::PhysicalExprRef; use datafusion::physical_plan::ExecutionPlan; use datafusion_common::Result; use std::sync::Arc; @@ -9,14 +12,86 @@ use tokio::sync::broadcast::error::RecvError; /// TODO docs pub struct HashJoin { - pub _todo: bool, - pub children: Vec>, + _todo: bool, + children: Vec>, + schema: SchemaRef, + equate_on: Vec<(PhysicalExprRef, PhysicalExprRef)>, } /// TODO docs impl HashJoin { - pub fn new() -> Self { - Self { _todo: false, children: vec![] } + pub(crate) fn new( + schema: SchemaRef, + equate_on: Vec<(PhysicalExprRef, PhysicalExprRef)>, + ) -> Self { + Self { + _todo: false, + children: vec![], + schema, + equate_on, + } + } + + /// Given a [`RecordBatch`]`, hashes based on the input physical expressions + /// + /// TODO docs + fn hash_batch(&self, batch: &RecordBatch) -> Vec { + todo!("use self.equate_on to hash each of the tuple keys in the batch") + } + + /// Builds the Hash Table from the [`RecordBatch`]es coming from the left child. + /// + /// TODO docs + async fn build_table(&self, mut rx_left: broadcast::Receiver) -> RecordTable { + // Take in all of the record batches from the left and create a hash table + let mut record_table = RecordTable::new(self.schema.clone()); + + loop { + match rx_left.recv().await { + Ok(batch) => { + // TODO gather N batches and use rayon to insert all at once + let hashes = self.hash_batch(&batch); + record_table.insert_batch(batch, hashes); + } + Err(e) => match e { + RecvError::Closed => break, + RecvError::Lagged(_) => todo!(), + }, + } + } + + record_table + } + + /// Given a single batch (coming from the right child), probes the hash table and outputs a + /// [`RecordBatch`] for every tuple on the right that gets matched with a tuple in the hash table. + /// + /// Note: This is super inefficient since its possible that we could emit a bunch of + /// [`RecordBatch`]es that have just 1 tuple in them. This is a place for easy optimization. + async fn probe_table( + &self, + table: &RecordTable, + right_batch: RecordBatch, + tx: &broadcast::Sender, + ) { + let hashes = self.hash_batch(&right_batch); + + for (right_row, &hash) in hashes.iter().enumerate() { + // Construct a RecordBatch for each tuple that might get joined with tuples in the hash table + + // For each of these hashes, check if it is in the table + let Some(records) = table.get_records(hash) else { + return; + }; + assert!(!records.is_empty()); + + // There are records associated with this hash value, so we need to emit things + for &record in records { + let (left_batch, left_row) = table.get(record).unwrap(); + + todo!("Join tuples together and then send through `tx`"); + } + } } } @@ -34,16 +109,34 @@ impl BinaryOperator for HashJoin { type InRight = RecordBatch; type Out = RecordBatch; - fn into_binary(self) -> Arc> { + fn into_binary( + self, + ) -> Arc> + { Arc::new(self) } async fn execute( &self, rx_left: broadcast::Receiver, - rx_right: broadcast::Receiver, + mut rx_right: broadcast::Receiver, tx: broadcast::Sender, ) { - todo!() + // Phase 1: Build Phase + // TODO assign to its own tokio task + let record_table = self.build_table(rx_left).await; + + // Phase 2: Probe Phase + loop { + match rx_right.recv().await { + Ok(batch) => { + self.probe_table(&record_table, batch, &tx).await; + } + Err(e) => match e { + RecvError::Closed => break, + RecvError::Lagged(_) => todo!(), + }, + } + } } } diff --git a/eggstrain/src/execution/operators/mod.rs b/eggstrain/src/execution/operators/mod.rs index 100f540..461332d 100644 --- a/eggstrain/src/execution/operators/mod.rs +++ b/eggstrain/src/execution/operators/mod.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use tokio::sync::broadcast::{Receiver, Sender}; pub mod filter; -pub mod project; pub mod hash_join; +pub mod project; /// Defines shared behavior for all operators /// From 6066b2d86492c57b20c070b9cbe156c5b65229e1 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Sun, 25 Feb 2024 18:07:20 -0500 Subject: [PATCH 5/8] some refactoring + hashing --- .../src/execution/operators/hash_join.rs | 99 ++++++++++++++----- eggstrain/src/execution/record_table.rs | 6 +- 2 files changed, 80 insertions(+), 25 deletions(-) diff --git a/eggstrain/src/execution/operators/hash_join.rs b/eggstrain/src/execution/operators/hash_join.rs index 25958a7..aafa899 100644 --- a/eggstrain/src/execution/operators/hash_join.rs +++ b/eggstrain/src/execution/operators/hash_join.rs @@ -1,56 +1,89 @@ use super::{BinaryOperator, Operator}; use crate::execution::record_table::RecordTable; +use arrow::array::ArrayRef; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExprRef; use datafusion::physical_plan::ExecutionPlan; -use datafusion_common::Result; +use datafusion_common::hash_utils::create_hashes; +use datafusion_common::{DataFusionError, Result}; use std::sync::Arc; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; /// TODO docs pub struct HashJoin { - _todo: bool, - children: Vec>, - schema: SchemaRef, + left_schema: SchemaRef, + right_schema: SchemaRef, equate_on: Vec<(PhysicalExprRef, PhysicalExprRef)>, + children: Vec>, } /// TODO docs impl HashJoin { pub(crate) fn new( - schema: SchemaRef, + left_schema: SchemaRef, + right_schema: SchemaRef, equate_on: Vec<(PhysicalExprRef, PhysicalExprRef)>, + children: Vec>, ) -> Self { Self { - _todo: false, - children: vec![], - schema, + children, + left_schema, + right_schema, equate_on, } } - /// Given a [`RecordBatch`]`, hashes based on the input physical expressions + /// Given a [`RecordBatch`]`, hashes based on the input physical expressions. /// /// TODO docs - fn hash_batch(&self, batch: &RecordBatch) -> Vec { - todo!("use self.equate_on to hash each of the tuple keys in the batch") + fn hash_batch(&self, batch: &RecordBatch) -> Result> { + let rows = batch.num_rows(); + + // A vector of columns, horizontally these are the join keys + let mut column_vals = Vec::with_capacity(self.equate_on.len()); + + for (left_eq, right_eq) in self.equate_on.iter() { + let eq = if LEFT { left_eq } else { right_eq }; + + // Extract a single column + let col_val = eq.evaluate(batch)?; + match col_val { + ColumnarValue::Array(arr) => column_vals.push(arr), + ColumnarValue::Scalar(s) => { + return Err(DataFusionError::NotImplemented(format!( + "Join physical expression scalar condition on {:#?} not implemented", + s + ))); + } + } + } + + let mut hashes = Vec::with_capacity(column_vals.len()); + create_hashes(&column_vals, &Default::default(), &mut hashes)?; + assert_eq!(hashes.len(), rows); + + Ok(hashes) } /// Builds the Hash Table from the [`RecordBatch`]es coming from the left child. /// /// TODO docs - async fn build_table(&self, mut rx_left: broadcast::Receiver) -> RecordTable { + async fn build_table( + &self, + mut rx_left: broadcast::Receiver, + ) -> Result { // Take in all of the record batches from the left and create a hash table - let mut record_table = RecordTable::new(self.schema.clone()); + let mut record_table = RecordTable::new(self.left_schema.clone()); loop { match rx_left.recv().await { Ok(batch) => { // TODO gather N batches and use rayon to insert all at once - let hashes = self.hash_batch(&batch); + let hashes = self.hash_batch::(&batch)?; record_table.insert_batch(batch, hashes); } Err(e) => match e { @@ -60,28 +93,37 @@ impl HashJoin { } } - record_table + Ok(record_table) } /// Given a single batch (coming from the right child), probes the hash table and outputs a /// [`RecordBatch`] for every tuple on the right that gets matched with a tuple in the hash table. /// /// Note: This is super inefficient since its possible that we could emit a bunch of - /// [`RecordBatch`]es that have just 1 tuple in them. This is a place for easy optimization. + /// [`RecordBatch`]es that have just 1 tuple in them. + /// + /// TODO This is a place for easy optimization. + /// + /// TODO only implements an inner join async fn probe_table( &self, table: &RecordTable, right_batch: RecordBatch, tx: &broadcast::Sender, - ) { - let hashes = self.hash_batch(&right_batch); + ) -> Result<()> { + let hashes = self.hash_batch::(&right_batch)?; + + let left_column_count = self.left_schema.fields().size(); + let right_column_count = self.right_schema.fields().size(); + let output_columns = left_column_count + right_column_count - self.equate_on.len(); for (right_row, &hash) in hashes.iter().enumerate() { // Construct a RecordBatch for each tuple that might get joined with tuples in the hash table + let mut out_columns: Vec<(String, ArrayRef)> = Vec::with_capacity(output_columns); // For each of these hashes, check if it is in the table let Some(records) = table.get_records(hash) else { - return; + return Ok(()); }; assert!(!records.is_empty()); @@ -89,9 +131,17 @@ impl HashJoin { for &record in records { let (left_batch, left_row) = table.get(record).unwrap(); - todo!("Join tuples together and then send through `tx`"); + // Left tuple is in `left_batch` at `left_row` offset + // Right tuple is in `right_batch` at `right_row` offset } + + let joined_batch = RecordBatch::try_from_iter(out_columns)?; + + tx.send(joined_batch) + .expect("Unable to send the projected batch"); } + + Ok(()) } } @@ -124,13 +174,18 @@ impl BinaryOperator for HashJoin { ) { // Phase 1: Build Phase // TODO assign to its own tokio task - let record_table = self.build_table(rx_left).await; + let record_table = self + .build_table(rx_left) + .await + .expect("Unable to build hash table"); // Phase 2: Probe Phase loop { match rx_right.recv().await { Ok(batch) => { - self.probe_table(&record_table, batch, &tx).await; + self.probe_table(&record_table, batch, &tx) + .await + .expect("Unable to probe hash table"); } Err(e) => match e { RecvError::Closed => break, diff --git a/eggstrain/src/execution/record_table.rs b/eggstrain/src/execution/record_table.rs index 4ce6474..ec1ffaf 100644 --- a/eggstrain/src/execution/record_table.rs +++ b/eggstrain/src/execution/record_table.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; // TODO replace with a raw table pub struct RecordTable { /// Maps a Hash value to a `RecordIndex` into the `RecordBuffer` - inner: HashMap>, + inner: HashMap>, buffer: RecordBuffer, } @@ -23,7 +23,7 @@ impl RecordTable { } } - pub fn insert_batch(&mut self, batch: RecordBatch, hashes: Vec) { + pub fn insert_batch(&mut self, batch: RecordBatch, hashes: Vec) { assert_eq!( batch.num_rows(), hashes.len(), @@ -44,7 +44,7 @@ impl RecordTable { } } - pub fn get_records(&self, hash: usize) -> Option<&Vec> { + pub fn get_records(&self, hash: u64) -> Option<&Vec> { self.inner.get(&hash) } From 7ab03d6381f1e95811878d16904e0f4cf00a2177 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Mon, 26 Feb 2024 10:26:43 -0500 Subject: [PATCH 6/8] refactor to row-oriented hash table storage --- .../src/execution/operators/hash_join.rs | 34 ++++++---- eggstrain/src/execution/record_buffer.rs | 62 +++++++++++++++---- eggstrain/src/execution/record_table.rs | 30 ++++----- 3 files changed, 88 insertions(+), 38 deletions(-) diff --git a/eggstrain/src/execution/operators/hash_join.rs b/eggstrain/src/execution/operators/hash_join.rs index aafa899..a7fc6ae 100644 --- a/eggstrain/src/execution/operators/hash_join.rs +++ b/eggstrain/src/execution/operators/hash_join.rs @@ -84,7 +84,7 @@ impl HashJoin { Ok(batch) => { // TODO gather N batches and use rayon to insert all at once let hashes = self.hash_batch::(&batch)?; - record_table.insert_batch(batch, hashes); + record_table.insert_batch(batch, hashes)?; } Err(e) => match e { RecvError::Closed => break, @@ -113,29 +113,39 @@ impl HashJoin { ) -> Result<()> { let hashes = self.hash_batch::(&right_batch)?; - let left_column_count = self.left_schema.fields().size(); - let right_column_count = self.right_schema.fields().size(); - let output_columns = left_column_count + right_column_count - self.equate_on.len(); + // let left_column_count = self.left_schema.fields().size(); + // let right_column_count = self.right_schema.fields().size(); + // let output_columns = left_column_count + right_column_count - self.equate_on.len(); - for (right_row, &hash) in hashes.iter().enumerate() { - // Construct a RecordBatch for each tuple that might get joined with tuples in the hash table - let mut out_columns: Vec<(String, ArrayRef)> = Vec::with_capacity(output_columns); + let right_rows = table.buffer.record_batch_to_rows(right_batch)?; + for (row, &hash) in hashes.iter().enumerate() { // For each of these hashes, check if it is in the table - let Some(records) = table.get_records(hash) else { + let Some(records) = table.get_record_indices(hash) else { return Ok(()); }; assert!(!records.is_empty()); + // TODO + todo!("create a new RowConverter with the joined schema"); + // There are records associated with this hash value, so we need to emit things for &record in records { - let (left_batch, left_row) = table.get(record).unwrap(); + let left_tuple = table.buffer.get(record).unwrap(); + let right_tuple = right_rows.row(row); - // Left tuple is in `left_batch` at `left_row` offset - // Right tuple is in `right_batch` at `right_row` offset + todo!("Join the two tuples in some way, then append to a `Rows`") } - let joined_batch = RecordBatch::try_from_iter(out_columns)?; + let out_columns: Vec = todo!( + "Convert the `Rows` back into a `RecordBatch` with `RowConverter::convert_rows`" + ); + + todo!("Figure out names for each column"); + + let out_columns_iter = out_columns.into_iter().map(|col| ("name", col)); + + let joined_batch = RecordBatch::try_from_iter(out_columns_iter)?; tx.send(joined_batch) .expect("Unable to send the projected batch"); diff --git a/eggstrain/src/execution/record_buffer.rs b/eggstrain/src/execution/record_buffer.rs index 97de637..8043347 100644 --- a/eggstrain/src/execution/record_buffer.rs +++ b/eggstrain/src/execution/record_buffer.rs @@ -1,11 +1,13 @@ +use arrow::row::{Row, RowConverter, Rows, SortField}; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use datafusion_common::Result; /// Make this an opaque type that can be used later, we may want to make this contiguous and then /// use offsets instead. #[derive(Clone, Copy)] pub struct RecordIndex { - index: u32, - row: u32, // by default this is just 0 + index: u32, // index into the vector of rows + row: u32, // index into the row group } /// TODO docs @@ -14,32 +16,58 @@ impl RecordIndex { Self { index, row } } - pub fn update_row(&mut self, row: u32) { - self.row = row; + // Use functional style due to easy copying + pub fn with_row(&self, row: u32) -> Self { + Self { + index: self.index, + row, + } } } +pub fn schema_to_fields(schema: SchemaRef) -> Vec { + schema + .fields() + .iter() + .map(|f| SortField::new(f.data_type().clone())) + .collect::>() +} + pub struct RecordBuffer { - schema: SchemaRef, // The schema for all of the record batches - inner: Vec, // make this contiguous + schema: SchemaRef, + converter: RowConverter, + inner: Vec, // vector of row groups } impl RecordBuffer { pub fn new(schema: SchemaRef) -> Self { + let fields = schema_to_fields(schema.clone()); Self { schema, + converter: RowConverter::new(fields).expect("Unable to create a RowConverter"), inner: vec![], } } pub fn with_capacity(schema: SchemaRef, capacity: usize) -> Self { + let fields = schema_to_fields(schema.clone()); Self { schema, + converter: RowConverter::new(fields).expect("Unable to create a RowConverter"), inner: Vec::with_capacity(capacity), } } - pub fn insert(&mut self, batch: RecordBatch) -> RecordIndex { + pub fn converter(&self) -> &RowConverter { + &self.converter + } + + pub fn record_batch_to_rows(&self, batch: RecordBatch) -> Result { + // `Ok` to make use of `?` behavior + Ok(self.converter.convert_columns(batch.columns())?) + } + + pub fn insert(&mut self, batch: RecordBatch) -> Result { assert_eq!( self.schema, batch.schema(), @@ -50,20 +78,30 @@ impl RecordBuffer { "Maximum size for a RecordBuffer is u32::MAX" ); - self.inner.push(batch); + let rows = self.record_batch_to_rows(batch)?; + self.inner.push(rows); - RecordIndex { + Ok(RecordIndex { index: (self.inner.len() - 1) as u32, row: 0, - } + }) } - /// Retrieve the batch and row number associated with the RecordIndex - pub fn get(&self, index: RecordIndex) -> Option<(&RecordBatch, u32)> { + /// Retrieve the row group and row number associated with the RecordIndex + pub fn get_group(&self, index: RecordIndex) -> Option<(&Rows, u32)> { if (index.index as usize) >= self.inner.len() { return None; } Some((&self.inner[index.index as usize], index.row)) } + + /// Retrieve row / tuple associated with the RecordIndex + pub fn get(&self, index: RecordIndex) -> Option { + if (index.index as usize) >= self.inner.len() { + return None; + } + + Some(self.inner[index.index as usize].row(index.row as usize)) + } } diff --git a/eggstrain/src/execution/record_table.rs b/eggstrain/src/execution/record_table.rs index ec1ffaf..540fd84 100644 --- a/eggstrain/src/execution/record_table.rs +++ b/eggstrain/src/execution/record_table.rs @@ -1,11 +1,13 @@ use super::record_buffer::{RecordBuffer, RecordIndex}; +use arrow::row::RowConverter; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use datafusion_common::Result; use std::collections::HashMap; // TODO replace with a raw table pub struct RecordTable { /// Maps a Hash value to a `RecordIndex` into the `RecordBuffer` inner: HashMap>, - buffer: RecordBuffer, + pub(crate) buffer: RecordBuffer, } impl RecordTable { @@ -23,7 +25,11 @@ impl RecordTable { } } - pub fn insert_batch(&mut self, batch: RecordBatch, hashes: Vec) { + pub fn converter(&self) -> &RowConverter { + self.buffer.converter() + } + + pub fn insert_batch(&mut self, batch: RecordBatch, hashes: Vec) -> Result<()> { assert_eq!( batch.num_rows(), hashes.len(), @@ -31,24 +37,20 @@ impl RecordTable { ); // Points to the location of the base of the record batch - let base_record_id = self.buffer.insert(batch); + let base_record_id = self.buffer.insert(batch)?; for (row, &hash) in hashes.iter().enumerate() { - // Given the row, we can create a record id for a specific tuple by updating the row - // from the base_record_id - let mut record_id = base_record_id; - record_id.update_row(row as u32); - // Insert the record into the hashtable bucket - self.inner.entry(hash).or_default().push(record_id) + self.inner + .entry(hash) + .or_default() + .push(base_record_id.with_row(row as u32)) } - } - pub fn get_records(&self, hash: u64) -> Option<&Vec> { - self.inner.get(&hash) + Ok(()) } - pub fn get(&self, index: RecordIndex) -> Option<(&RecordBatch, u32)> { - self.buffer.get(index) + pub fn get_record_indices(&self, hash: u64) -> Option<&Vec> { + self.inner.get(&hash) } } From ff966c9bca181657c2046d9e463d4cd6d0b90cc8 Mon Sep 17 00:00:00 2001 From: Kyle Booker Date: Wed, 3 Apr 2024 13:01:39 -0400 Subject: [PATCH 7/8] Still wip, creates schema --- .../src/execution/operators/hash_join.rs | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/eggstrain/src/execution/operators/hash_join.rs b/eggstrain/src/execution/operators/hash_join.rs index a7fc6ae..90f3053 100644 --- a/eggstrain/src/execution/operators/hash_join.rs +++ b/eggstrain/src/execution/operators/hash_join.rs @@ -1,9 +1,10 @@ use super::{BinaryOperator, Operator}; use crate::execution::record_table::RecordTable; use arrow::array::ArrayRef; -use arrow::datatypes::SchemaRef; +use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::common::arrow::row::{Row, RowConverter, Rows}; use datafusion::logical_expr::ColumnarValue; use datafusion::physical_expr::PhysicalExprRef; use datafusion::physical_plan::ExecutionPlan; @@ -118,7 +119,6 @@ impl HashJoin { // let output_columns = left_column_count + right_column_count - self.equate_on.len(); let right_rows = table.buffer.record_batch_to_rows(right_batch)?; - for (row, &hash) in hashes.iter().enumerate() { // For each of these hashes, check if it is in the table let Some(records) = table.get_record_indices(hash) else { @@ -127,19 +127,26 @@ impl HashJoin { assert!(!records.is_empty()); // TODO - todo!("create a new RowConverter with the joined schema"); + // Create a new schema that is the join of the two schemas + let left_schema: Schema = (*self.left_schema).clone(); + let right_schema: Schema = (*self.right_schema).clone(); + + let new_schema = Schema::try_merge(vec![left_schema, right_schema])?; + let joined_schema: SchemaRef = Arc::new(new_schema); + + let row_converter = RowConverter::new(new_schema); // There are records associated with this hash value, so we need to emit things for &record in records { let left_tuple = table.buffer.get(record).unwrap(); - let right_tuple = right_rows.row(row); + let right_tuple: Row = right_rows.row(row); + + let joined_tuple = todo!("Join the two tuples in some way"); todo!("Join the two tuples in some way, then append to a `Rows`") } - - let out_columns: Vec = todo!( - "Convert the `Rows` back into a `RecordBatch` with `RowConverter::convert_rows`" - ); + todo!("Convert the `Rows` back into a `RecordBatch` with `RowConverter::convert_rows`"); + let out_columns: Vec = RowConverter::convert_rows(joined_schema, rows)?; todo!("Figure out names for each column"); From 58a56360c9e8a3004c0c7e31862229d49dec095b Mon Sep 17 00:00:00 2001 From: SarveshOO7 Date: Sat, 6 Apr 2024 17:31:14 -0400 Subject: [PATCH 8/8] Got schema built and columns indices --- .../src/execution/operators/hash_join.rs | 77 +++++++++++++++---- 1 file changed, 60 insertions(+), 17 deletions(-) diff --git a/eggstrain/src/execution/operators/hash_join.rs b/eggstrain/src/execution/operators/hash_join.rs index 90f3053..f2b6667 100644 --- a/eggstrain/src/execution/operators/hash_join.rs +++ b/eggstrain/src/execution/operators/hash_join.rs @@ -1,15 +1,17 @@ use super::{BinaryOperator, Operator}; use crate::execution::record_table::RecordTable; use arrow::array::ArrayRef; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{Schema, SchemaBuilder, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::common::arrow::row::{Row, RowConverter, Rows}; -use datafusion::logical_expr::ColumnarValue; +use datafusion::logical_expr::{left, ColumnarValue}; use datafusion::physical_expr::PhysicalExprRef; +use datafusion::physical_plan::joins::utils::{build_join_schema, ColumnIndex}; +use datafusion::physical_plan::joins::HashJoinExec; use datafusion::physical_plan::ExecutionPlan; use datafusion_common::hash_utils::create_hashes; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{DataFusionError, JoinSide, JoinType, Result}; use std::sync::Arc; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; @@ -24,17 +26,17 @@ pub struct HashJoin { /// TODO docs impl HashJoin { - pub(crate) fn new( - left_schema: SchemaRef, - right_schema: SchemaRef, - equate_on: Vec<(PhysicalExprRef, PhysicalExprRef)>, - children: Vec>, - ) -> Self { + pub(crate) fn new(hash_exec: &HashJoinExec) -> Self { Self { - children, - left_schema, - right_schema, - equate_on, + children: hash_exec.children().clone(), + left_schema: hash_exec.left().schema(), + right_schema: hash_exec.right().schema(), + //correctly convert to match the type + equate_on: hash_exec + .on() + .iter() + .map(|(left, right)| (left.clone().clone(), right.clone())) + .collect(), } } @@ -97,6 +99,44 @@ impl HashJoin { Ok(record_table) } + pub fn build_join_schema(left: &Schema, right: &Schema) -> (Schema, Vec) { + let (fields, column_indices): (SchemaBuilder, Vec) = { + let left_fields = + left.fields() + .iter() + .map(|f| f.clone()) + .enumerate() + .map(|(index, f)| { + ( + f, + ColumnIndex { + index, + side: JoinSide::Left, + }, + ) + }); + let right_fields = + right + .fields() + .iter() + .map(|f| f.clone()) + .enumerate() + .map(|(index, f)| { + ( + f, + ColumnIndex { + index, + side: JoinSide::Right, + }, + ) + }); + + // left then right + left_fields.chain(right_fields).unzip() + }; + (fields.finish(), column_indices) + } + /// Given a single batch (coming from the right child), probes the hash table and outputs a /// [`RecordBatch`] for every tuple on the right that gets matched with a tuple in the hash table. /// @@ -132,7 +172,9 @@ impl HashJoin { let left_schema: Schema = (*self.left_schema).clone(); let right_schema: Schema = (*self.right_schema).clone(); - let new_schema = Schema::try_merge(vec![left_schema, right_schema])?; + // let new_schema = Schema::try_merge(vec![left_schema, right_schema])?; + let (new_schema, column_indices) = + HashJoin::build_join_schema(&left_schema, &right_schema); let joined_schema: SchemaRef = Arc::new(new_schema); let row_converter = RowConverter::new(new_schema); @@ -140,10 +182,11 @@ impl HashJoin { for &record in records { let left_tuple = table.buffer.get(record).unwrap(); let right_tuple: Row = right_rows.row(row); - let joined_tuple = todo!("Join the two tuples in some way"); - - todo!("Join the two tuples in some way, then append to a `Rows`") + // let cols = vec[cols] + for col in column_indices {} + self.column_index = + todo!("Join the two tuples in some way, then append to a `Rows`") } todo!("Convert the `Rows` back into a `RecordBatch` with `RowConverter::convert_rows`"); let out_columns: Vec = RowConverter::convert_rows(joined_schema, rows)?;