Skip to content

POC: Sketch out cached filter result API #7513

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,18 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {

l_full_data.cmp(r_full_data)
}

/// return the total number of bytes required to hold all strings pointed to by views in this array
pub fn minimum_buffer_size(&self) -> usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably compiles to the same, but I think using sum is slightly more idiomatic.

self.views.iter()
.map(|v| {
  let len = (*v as u32) as usize;
  if len > 12 {
    len
  } else {
    0
}
})
.sum()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I used sum before and it showed up in a profile -- I was trying to see if I could get the code faster but I didn't do it scientifically

let mut used = 0;
for v in self.views().iter() {
let len = (*v as u32) as usize;
if len > 12 {
used += len;
}
}
used
}
}

impl<T: ByteViewType + ?Sized> Debug for GenericByteViewArray<T> {
Expand Down
177 changes: 173 additions & 4 deletions arrow-array/src/builder/generic_bytes_view_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use hashbrown::HashTable;
use crate::builder::ArrayBuilder;
use crate::types::bytes::ByteArrayNativeType;
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
use crate::{ArrayRef, GenericByteViewArray};
use crate::{Array, ArrayRef, GenericByteViewArray};

const STARTING_BLOCK_SIZE: u32 = 8 * 1024; // 8KiB
const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; // 2MiB
Expand Down Expand Up @@ -84,10 +84,26 @@ pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
completed: Vec<Buffer>,
in_progress: Vec<u8>,
block_size: BlockSizeGrowthStrategy,
/// When appending views from an existing Array, the builder will copy
/// the underlying strings into a new buffer if the array is sparse.
///
/// If None, the builder will not copy long strings
///
/// If Some, the builder will *copy* long strings if the total size of the used
/// buffer bytes / the total size is less than than `append_load_factor`
///
/// So if `append_load_factor` is `Some(0.5)`, the builder will copy long
/// strings if the total size of the used buffers is less than 50% of the
/// total size of the buffers.
target_buffer_load_factor: Option<f32>,
/// Some if deduplicating strings
/// map `<string hash> -> <index to the views>`
string_tracker: Option<(HashTable<usize>, ahash::RandomState)>,
phantom: PhantomData<T>,
/// How much space to reserve for newly created buffers.
///
/// Defaults to 0
initial_capacity: Option<usize>,
}

impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
Expand All @@ -103,12 +119,39 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
null_buffer_builder: NullBufferBuilder::new(capacity),
completed: vec![],
in_progress: vec![],
target_buffer_load_factor: Some(0.5),
block_size: BlockSizeGrowthStrategy::Exponential {
current_size: STARTING_BLOCK_SIZE,
},
string_tracker: None,
phantom: Default::default(),
initial_capacity: None,
}
}

/// Set the initial capacity for buffers after finish is called
pub fn with_initial_capacity(mut self, initial_capacity: usize) -> Self {
self.initial_capacity = Some(initial_capacity);
self
}

/// Set the target buffer load factor for appending views from existing arrays
///
/// Defaults to 50% if not set.
///
/// Panics if the load factor is not between 0 and 1.
pub fn with_target_buffer_load_factor(
mut self,
target_buffer_load_factor: Option<f32>,
) -> Self {
if let Some(load_factor) = target_buffer_load_factor {
assert!(
load_factor > 0.0 && load_factor <= 1.0,
"Target buffer load factor must be between 0 and 1"
);
}
self.target_buffer_load_factor = target_buffer_load_factor;
self
}

/// Set a fixed buffer size for variable length strings
Expand Down Expand Up @@ -236,7 +279,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {

/// Flushes the in progress block if any
#[inline]
fn flush_in_progress(&mut self) {
pub fn flush_in_progress(&mut self) {
if !self.in_progress.is_empty() {
let f = Buffer::from_vec(std::mem::take(&mut self.in_progress));
self.push_completed(f)
Expand Down Expand Up @@ -366,8 +409,18 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
self.flush_in_progress();
let completed = std::mem::take(&mut self.completed);
let len = self.views_builder.len();
let views = ScalarBuffer::new(self.views_builder.finish(), 0, len);
let nulls = self.null_buffer_builder.finish();
let (mut views_builder, mut null_buffer_builder) = match self.initial_capacity {
Some(initial_capacity) => (
BufferBuilder::new(initial_capacity),
NullBufferBuilder::new(initial_capacity),
),
None => (BufferBuilder::default(), NullBufferBuilder::new(len)),
};
std::mem::swap(&mut views_builder, &mut self.views_builder);
std::mem::swap(&mut null_buffer_builder, &mut self.null_buffer_builder);

let views = ScalarBuffer::new(views_builder.finish(), 0, len);
let nulls = null_buffer_builder.finish();
if let Some((ref mut ht, _)) = self.string_tracker.as_mut() {
ht.clear();
}
Expand Down Expand Up @@ -406,6 +459,122 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
};
buffer_size + in_progress + tracker + views + null
}

/// Append all views from the given array into the inprogress builder
///
/// Will copy the underlying views based on the value of target_buffer_load_factor
pub fn append_array(&mut self, array: &GenericByteViewArray<T>) {
Copy link
Contributor

@Dandandan Dandandan Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be used in concat as well (a similar implementation is almost 1.5x as fast on string views)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will file a ticket for this idea which we can pursue separately

let num_rows = array.len();
if num_rows == 0 {
return; // nothing to do
}

let null_buffer_builder = &mut self.null_buffer_builder;
let views = &mut self.views_builder;

// Copy nulls
if let Some(nulls) = array.nulls() {
null_buffer_builder.append_buffer(nulls);
} else {
null_buffer_builder.append_n_non_nulls(array.len());
}

// Copy views from the source array
let starting_view = views.len();
views.append_slice(array.views());

// Safety we only appended views from array
unsafe {
self.finalize_copied_views(starting_view, array);
}
}

/// Finalizes the views and buffers of the array
///
/// This must be called after appending views from `array` to the builder.
///
/// The views from `array` will point to the old buffers. This function
/// updates all views starting at `starting_view` to point to the new
/// buffers or copies the values into a new buffer if the array is sparse.
///
/// # Safety
///
/// * self.views[starting_view..] must be valid views from `array`.
pub unsafe fn finalize_copied_views(
&mut self,
starting_view: usize,
array: &GenericByteViewArray<T>,
) {
// Flush the in-progress buffer
self.flush_in_progress();

let buffers = &mut self.completed;
let views = &mut self.views_builder;

let mut used_buffer_size = 0;
let use_exising_buffers = match self.target_buffer_load_factor {
None => true,
Some(load_factor) => {
used_buffer_size = array.minimum_buffer_size();
let actual_buffer_size = array.get_buffer_memory_size();
// If the total size of the buffers is less than the load factor, copy them existing buffers
used_buffer_size >= (actual_buffer_size as f32 * load_factor) as usize
}
};

if use_exising_buffers {
let num_buffers_before: u32 = buffers.len().try_into().expect("buffer count overflow");
buffers.extend_from_slice(array.data_buffers()); //

// If there were no existing buffers, the views do not need to be updated
// as the buffers of `array` are the same
if num_buffers_before == 0 {
return;
}

// Update any views that point to the old buffers
for v in views.as_slice_mut()[starting_view..].iter_mut() {
Copy link
Contributor

@Dandandan Dandandan Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be faster to extend in one go, rather than first append_from_slice and later iter_mut (in views.append_slice(array.views()))

let view_len = *v as u32;
// if view_len is 12 or less, data is inlined and doesn't need an update
// if view is 12 or more, need to update the buffer offset
if view_len > 12 {
Copy link
Contributor

@Dandandan Dandandan Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the check can be omitted (and will be faster without it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Views with length less than 12 have the entire string inlined -- so if we update the buffer_index "field" for such small views we may well be updating the inlined string bytes. So I do think we need the check here

let mut view = ByteView::from(*v);
Copy link
Contributor

@Dandandan Dandandan Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteView can use std::mem::transmute for from / as_u128 which I think would be faster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I need to make some benchmarks so I can test these ideas more easily. Right now the cycle time is pretty long as I need to rebuild datafusion and then kick off a benchmark run.

let new_buffer_index = num_buffers_before + view.buffer_index;
view.buffer_index = new_buffer_index;
*v = view.into(); // update view
}
}
} else {
// otherwise the array is sparse so copy the data into a single new
// buffer as well as updating the views
let mut new_buffer: Vec<u8> = Vec::with_capacity(used_buffer_size);
Copy link
Contributor

@Dandandan Dandandan Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this doesn't create too small buffer size after filtering?

Shouldn't we create a in progress buffer with a larger buffer size if used_buffer_size is smaller than that (based on allocation strategy) and then use that one instead of creating small buffers and flushing them every time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(That could be one benefit from concat / potentially buffering filter - we can know the exact capacity upfront).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are 100% right - the copying should be handled in a similar way to the rest of the builder: fill up any remaining allocation first and allocate new buffers following the normal allocation strategy. I will pursue that approach

let new_buffer_index = buffers.len() as u32; // making one new buffer
// Update any views that point to the old buffers.
for v in views.as_slice_mut()[starting_view..].iter_mut() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same - better to extend

let view_len = *v as u32;
// if view_len is 12 or less, data is inlined and doesn't need an update
// if view is 12 or more, need to copy the data to the new buffer and update the index and buffer offset
if view_len > 12 {
let mut view = ByteView::from(*v);
let old_buffer = &array.data_buffers()[view.buffer_index as usize].as_slice();

let new_offset = new_buffer.len();
let old_offset = view.offset as usize;
let str_data = &old_buffer[old_offset..old_offset + view_len as usize];
new_buffer.extend_from_slice(str_data);
view.offset = new_offset as u32;
view.buffer_index = new_buffer_index;
*v = view.into(); // update view
}
}
buffers.push(new_buffer.into());
}
}

/// Returns the inner views and null buffer builders and buffers.
pub fn inner_mut(&mut self) -> (&mut BufferBuilder<u128>, &mut NullBufferBuilder) {
(&mut self.views_builder, &mut self.null_buffer_builder)
}
}

impl<T: ByteViewType + ?Sized> Default for GenericByteViewBuilder<T> {
Expand Down
74 changes: 66 additions & 8 deletions arrow-array/src/builder/primitive_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,17 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
T::DATA_TYPE,
data_type
);
Self { data_type, ..self }
// Type is Checked above
unsafe { self.with_data_type_unchecked(data_type) }
}

/// Set the data type of the builder without checking if it is compatible
///
/// # Safety
/// the DataType must be compatible with the type `T`
pub unsafe fn with_data_type_unchecked(mut self, data_type: DataType) -> Self {
self.data_type = data_type;
self
}

/// Returns the capacity of this builder measured in slots of type `T`
Expand Down Expand Up @@ -284,20 +294,49 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
/// the iterator implement `TrustedLen` once that is stabilized.
#[inline]
pub unsafe fn append_trusted_len_iter(&mut self, iter: impl IntoIterator<Item = T::Native>) {
let iter = iter.into_iter();
let len = iter
.size_hint()
.1
.expect("append_trusted_len_iter requires an upper bound");

self.null_buffer_builder.append_n_non_nulls(len);
let starting_len = self.len();
self.values_builder.append_trusted_len_iter(iter);
self.null_buffer_builder
.append_n_non_nulls(self.len() - starting_len);
}

/// Builds the [`PrimitiveArray`] and consumes this builder.
pub fn build(self) -> PrimitiveArray<T> {
let len = self.len();
let Self {
values_builder,
null_buffer_builder,
data_type,
} = self;
let nulls = null_buffer_builder.build();

if let Some(nulls) = &nulls {
assert_eq!(
nulls.len(),
values_builder.len(),
"nulls/values length mismatch"
);
}
let builder = ArrayData::builder(data_type)
.len(len)
.add_buffer(values_builder.build())
.nulls(nulls);

let array_data = unsafe { builder.build_unchecked() };
PrimitiveArray::<T>::from(array_data)
}

/// Builds the [`PrimitiveArray`] and reset this builder.
pub fn finish(&mut self) -> PrimitiveArray<T> {
let len = self.len();
let nulls = self.null_buffer_builder.finish();
if let Some(nulls) = &nulls {
assert_eq!(
nulls.len(),
self.values_builder.len(),
"nulls/values length mismatch"
);
}
let builder = ArrayData::builder(self.data_type.clone())
.len(len)
.add_buffer(self.values_builder.finish())
Expand All @@ -312,6 +351,18 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
let len = self.len();
let nulls = self.null_buffer_builder.finish_cloned();
let values_buffer = Buffer::from_slice_ref(self.values_builder.as_slice());
// Verify values and nulls buffers are the same length
// TODO for some reason this fails in the FixedSizeListBuilder
/*
if let Some(nulls) = &nulls {
assert_eq!(
nulls.len(),
values_buffer.len(),
"nulls/values length mismatch"
);
}

*/
let builder = ArrayData::builder(self.data_type.clone())
.len(len)
.add_buffer(values_buffer)
Expand Down Expand Up @@ -348,6 +399,13 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
self.null_buffer_builder.as_slice_mut(),
)
}

/// Returns the inner value and null buffer builders.
///
/// These must be kept in sync
pub fn inner_mut(&mut self) -> (&mut BufferBuilder<T::Native>, &mut NullBufferBuilder) {
(&mut self.values_builder, &mut self.null_buffer_builder)
}
}

impl<P: DecimalType> PrimitiveBuilder<P> {
Expand Down
Loading
Loading