-
Notifications
You must be signed in to change notification settings - Fork 60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Part 2: propagate transform in visit_scan_files #612
base: main
Are you sure you want to change the base?
Changes from 7 commits
2a0257a
b6eb3e0
5d98eb1
8831535
d0f8e67
5843f85
de5fd07
779a662
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,7 @@ use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState}; | |
use delta_kernel::scan::{Scan, ScanData}; | ||
use delta_kernel::schema::Schema; | ||
use delta_kernel::snapshot::Snapshot; | ||
use delta_kernel::{DeltaResult, Error}; | ||
use delta_kernel::{DeltaResult, Error, ExpressionRef}; | ||
use delta_kernel_ffi_macros::handle_descriptor; | ||
use tracing::debug; | ||
use url::Url; | ||
|
@@ -211,6 +211,7 @@ pub unsafe extern "C" fn kernel_scan_data_next( | |
engine_context: NullableCvoid, | ||
engine_data: Handle<ExclusiveEngineData>, | ||
selection_vector: KernelBoolSlice, | ||
transforms: &CTransforms, | ||
), | ||
) -> ExternResult<bool> { | ||
let data = unsafe { data.as_ref() }; | ||
|
@@ -224,15 +225,17 @@ fn kernel_scan_data_next_impl( | |
engine_context: NullableCvoid, | ||
engine_data: Handle<ExclusiveEngineData>, | ||
selection_vector: KernelBoolSlice, | ||
transforms: &CTransforms, | ||
), | ||
) -> DeltaResult<bool> { | ||
let mut data = data | ||
.data | ||
.lock() | ||
.map_err(|_| Error::generic("poisoned mutex"))?; | ||
if let Some((data, sel_vec, _transforms)) = data.next().transpose()? { | ||
if let Some((data, sel_vec, transforms)) = data.next().transpose()? { | ||
let bool_slice = KernelBoolSlice::from(sel_vec); | ||
(engine_visitor)(engine_context, data.into(), bool_slice); | ||
let transform_map = CTransforms { transforms }; | ||
(engine_visitor)(engine_context, data.into(), bool_slice, &transform_map); | ||
Ok(true) | ||
} else { | ||
Ok(false) | ||
|
@@ -281,7 +284,7 @@ pub struct CStringMap { | |
/// # Safety | ||
/// | ||
/// The engine is responsible for providing a valid [`CStringMap`] pointer and [`KernelStringSlice`] | ||
pub unsafe extern "C" fn get_from_map( | ||
pub unsafe extern "C" fn get_from_string_map( | ||
map: &CStringMap, | ||
key: KernelStringSlice, | ||
allocate_fn: AllocateStringFn, | ||
|
@@ -293,6 +296,10 @@ pub unsafe extern "C" fn get_from_map( | |
.and_then(|v| allocate_fn(kernel_string_slice!(v))) | ||
} | ||
|
||
pub struct CTransforms { | ||
transforms: Vec<Option<ExpressionRef>>, | ||
} | ||
|
||
/// Get a selection vector out of a [`DvInfo`] struct | ||
/// | ||
/// # Safety | ||
|
@@ -355,6 +362,7 @@ fn rust_callback( | |
size: i64, | ||
kernel_stats: Option<delta_kernel::scan::state::Stats>, | ||
dv_info: DvInfo, | ||
_transform: Option<ExpressionRef>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any particular reason not to update the callback in this PR as well, so we can pass this on to the engine? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, mostly I was trying to keep it separated. This PR was focused on getting |
||
partition_values: HashMap<String, String>, | ||
) { | ||
let partition_map = CStringMap { | ||
|
@@ -388,6 +396,7 @@ struct ContextWrapper { | |
pub unsafe extern "C" fn visit_scan_data( | ||
data: Handle<ExclusiveEngineData>, | ||
selection_vec: KernelBoolSlice, | ||
transforms: &CTransforms, | ||
engine_context: NullableCvoid, | ||
callback: CScanCallback, | ||
) { | ||
|
@@ -398,5 +407,12 @@ pub unsafe extern "C" fn visit_scan_data( | |
callback, | ||
}; | ||
// TODO: return ExternResult to caller instead of panicking? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems pretty easy to address this todo? maybe we can go ahead and do that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That will require changes in the |
||
visit_scan_files(data, selection_vec, context_wrapper, rust_callback).unwrap(); | ||
visit_scan_files( | ||
data, | ||
selection_vec, | ||
&transforms.transforms, | ||
context_wrapper, | ||
rust_callback, | ||
) | ||
.unwrap(); | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -304,6 +304,20 @@ pub enum ColumnType { | |||||||||||||
/// A transform is ultimately a `Struct` expr. This holds the set of expressions that make that struct expr up | ||||||||||||||
type Transform = Vec<TransformExpr>; | ||||||||||||||
|
||||||||||||||
/// utility method making it easy to get a transform for a particular row. If the requested row is | ||||||||||||||
/// outside the range of the passed slice returns `None`, otherwise returns the element at the index | ||||||||||||||
/// of the specified row | ||||||||||||||
pub fn get_transform_for_row( | ||||||||||||||
row: usize, | ||||||||||||||
transforms: &[Option<ExpressionRef>], | ||||||||||||||
) -> Option<ExpressionRef> { | ||||||||||||||
if row < transforms.len() { | ||||||||||||||
transforms[row].clone() | ||||||||||||||
} else { | ||||||||||||||
None | ||||||||||||||
} | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like this only has a single call site. Now that it has reduced to a one-liner, should we just inline the code directly where it's used? Or do we anticipate other uses coming soon? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note it's |
||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/// Transforms aren't computed all at once. So static ones can just go straight to `Expression`, but | ||||||||||||||
/// things like partition columns need to filled in. This enum holds an expression that's part of a | ||||||||||||||
/// `Transform`. | ||||||||||||||
|
@@ -463,6 +477,7 @@ impl Scan { | |||||||||||||
size: i64, | ||||||||||||||
_: Option<Stats>, | ||||||||||||||
dv_info: DvInfo, | ||||||||||||||
_transform: Option<ExpressionRef>, | ||||||||||||||
partition_values: HashMap<String, String>, | ||||||||||||||
) { | ||||||||||||||
batches.push(ScanFile { | ||||||||||||||
|
@@ -487,9 +502,15 @@ impl Scan { | |||||||||||||
let scan_data = self.scan_data(engine.as_ref())?; | ||||||||||||||
let scan_files_iter = scan_data | ||||||||||||||
.map(|res| { | ||||||||||||||
let (data, vec, _transforms) = res?; | ||||||||||||||
let (data, vec, transforms) = res?; | ||||||||||||||
let scan_files = vec![]; | ||||||||||||||
state::visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback) | ||||||||||||||
state::visit_scan_files( | ||||||||||||||
data.as_ref(), | ||||||||||||||
&vec, | ||||||||||||||
&transforms, | ||||||||||||||
scan_files, | ||||||||||||||
scan_data_callback, | ||||||||||||||
) | ||||||||||||||
}) | ||||||||||||||
// Iterator<DeltaResult<Vec<ScanFile>>> to Iterator<DeltaResult<ScanFile>> | ||||||||||||||
.flatten_ok(); | ||||||||||||||
|
@@ -816,11 +837,12 @@ pub(crate) mod test_utils { | |||||||||||||
); | ||||||||||||||
let mut batch_count = 0; | ||||||||||||||
for res in iter { | ||||||||||||||
let (batch, sel, _transforms) = res.unwrap(); | ||||||||||||||
let (batch, sel, transforms) = res.unwrap(); | ||||||||||||||
assert_eq!(sel, expected_sel_vec); | ||||||||||||||
crate::scan::state::visit_scan_files( | ||||||||||||||
batch.as_ref(), | ||||||||||||||
&sel, | ||||||||||||||
&transforms, | ||||||||||||||
context.clone(), | ||||||||||||||
validate_callback, | ||||||||||||||
) | ||||||||||||||
|
@@ -1020,15 +1042,22 @@ mod tests { | |||||||||||||
_size: i64, | ||||||||||||||
_: Option<Stats>, | ||||||||||||||
dv_info: DvInfo, | ||||||||||||||
_transform: Option<ExpressionRef>, | ||||||||||||||
_partition_values: HashMap<String, String>, | ||||||||||||||
) { | ||||||||||||||
paths.push(path.to_string()); | ||||||||||||||
assert!(dv_info.deletion_vector.is_none()); | ||||||||||||||
} | ||||||||||||||
let mut files = vec![]; | ||||||||||||||
for data in scan_data { | ||||||||||||||
let (data, vec, _transforms) = data?; | ||||||||||||||
files = state::visit_scan_files(data.as_ref(), &vec, files, scan_data_callback)?; | ||||||||||||||
let (data, vec, transforms) = data?; | ||||||||||||||
files = state::visit_scan_files( | ||||||||||||||
data.as_ref(), | ||||||||||||||
&vec, | ||||||||||||||
&transforms, | ||||||||||||||
files, | ||||||||||||||
scan_data_callback, | ||||||||||||||
)?; | ||||||||||||||
} | ||||||||||||||
Ok(files) | ||||||||||||||
} | ||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,9 @@ use std::collections::HashMap; | |
use std::sync::LazyLock; | ||
|
||
use crate::actions::deletion_vector::deletion_treemap_to_bools; | ||
use crate::scan::get_transform_for_row; | ||
use crate::utils::require; | ||
use crate::ExpressionRef; | ||
use crate::{ | ||
actions::{deletion_vector::DeletionVectorDescriptor, visitors::visit_deletion_vector_at}, | ||
engine_data::{GetData, RowVisitor, TypedGetData as _}, | ||
|
@@ -104,6 +106,7 @@ pub type ScanCallback<T> = fn( | |
size: i64, | ||
stats: Option<Stats>, | ||
dv_info: DvInfo, | ||
transform: Option<ExpressionRef>, | ||
partition_values: HashMap<String, String>, | ||
); | ||
|
||
|
@@ -138,12 +141,14 @@ pub type ScanCallback<T> = fn( | |
pub fn visit_scan_files<T>( | ||
data: &dyn EngineData, | ||
selection_vector: &[bool], | ||
transforms: &Vec<Option<ExpressionRef>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any reason for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nope, good catch. usually clippy gets these, but it didn't this time :) |
||
context: T, | ||
callback: ScanCallback<T>, | ||
) -> DeltaResult<T> { | ||
let mut visitor = ScanFileVisitor { | ||
callback, | ||
selection_vector, | ||
transforms, | ||
context, | ||
}; | ||
visitor.visit_rows_of(data)?; | ||
|
@@ -154,6 +159,7 @@ pub fn visit_scan_files<T>( | |
struct ScanFileVisitor<'a, T> { | ||
callback: ScanCallback<T>, | ||
selection_vector: &'a [bool], | ||
transforms: &'a Vec<Option<ExpressionRef>>, | ||
context: T, | ||
} | ||
impl<T> RowVisitor for ScanFileVisitor<'_, T> { | ||
|
@@ -201,6 +207,7 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> { | |
size, | ||
stats, | ||
dv_info, | ||
get_transform_for_row(row_index, self.transforms), | ||
partition_values, | ||
) | ||
} | ||
|
@@ -213,7 +220,10 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> { | |
mod tests { | ||
use std::collections::HashMap; | ||
|
||
use crate::scan::test_utils::{add_batch_simple, run_with_validate_callback}; | ||
use crate::{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: maybe flatten these imports? |
||
scan::test_utils::{add_batch_simple, run_with_validate_callback}, | ||
ExpressionRef, | ||
}; | ||
|
||
use super::{DvInfo, Stats}; | ||
|
||
|
@@ -228,6 +238,7 @@ mod tests { | |
size: i64, | ||
stats: Option<Stats>, | ||
dv_info: DvInfo, | ||
transform: Option<ExpressionRef>, | ||
part_vals: HashMap<String, String>, | ||
) { | ||
assert_eq!( | ||
|
@@ -242,6 +253,7 @@ mod tests { | |
assert!(dv_info.deletion_vector.is_some()); | ||
let dv = dv_info.deletion_vector.unwrap(); | ||
assert_eq!(dv.unique_id(), "uvBn[lx{q8@P<9BNH/isA@1"); | ||
assert!(transform.is_none()); | ||
assert_eq!(context.id, 2); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a reasonable name change, but why this PR?
(also -- do we anticipate exposing other map types through FFI in the future?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think mostly I just noticed it while possibly having a "transform map" (which we no longer will have), and thought it was a good change. I can move it to another PR, but it does seem to make more sense this way. TBD on other types, but I imagine eventually we'll find something :)