Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added buffer interoperability with arrow-rs #1437

Merged
merged 3 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "Unofficial implementation of Apache Arrow spec in safe Rust"
homepage = "https://github.com/jorgecarleitao/arrow2"
repository = "https://github.com/jorgecarleitao/arrow2"
authors = ["Jorge C. Leitao <[email protected]>", "Apache Arrow <[email protected]>"]
keywords = [ "arrow", "analytics" ]
keywords = ["arrow", "analytics"]
edition = "2021"
exclude = ["testing/"]

Expand Down Expand Up @@ -100,6 +100,9 @@ odbc-api = { version = "0.36", optional = true }
# Faster hashing
ahash = "0.8"

# Support conversion to/from arrow-rs
arrow-buffer = { version = "35.0.0", optional = true }

[target.wasm32-unknown-unknown.dependencies]
getrandom = { version = "0.2", features = ["js"] }

Expand Down Expand Up @@ -131,6 +134,7 @@ rustdoc-args = ["--cfg", "docsrs"]
[features]
default = []
full = [
"arrow",
"io_odbc",
"io_csv",
"io_csv_async",
Expand All @@ -154,6 +158,7 @@ full = [
# parses timezones used in timestamp conversions
"chrono-tz",
]
arrow = ["arrow-buffer"]
io_odbc = ["odbc-api"]
io_csv = ["io_csv_read", "io_csv_write"]
io_csv_async = ["io_csv_read_async"]
Expand Down Expand Up @@ -195,7 +200,7 @@ io_avro_compression = [
]
io_avro_async = ["avro-schema/async"]

io_orc = [ "orc-format" ]
io_orc = ["orc-format"]

# serde+serde_json: its dependencies + error handling
# serde_derive: there is some derive around
Expand Down
27 changes: 27 additions & 0 deletions src/bitmap/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,22 @@ impl Bitmap {
) -> std::result::Result<Self, E> {
Ok(MutableBitmap::try_from_trusted_len_iter_unchecked(iterator)?.into())
}

/// Create a new [`Bitmap`] from an arrow [`NullBuffer`]
///
/// [`NullBuffer`]: arrow_buffer::buffer::NullBuffer
#[cfg(feature = "arrow")]
pub fn from_null_buffer(value: arrow_buffer::buffer::NullBuffer) -> Self {
let offset = value.offset();
let length = value.len();
let unset_bits = value.null_count();
Self {
offset,
length,
unset_bits,
bytes: Arc::new(crate::buffer::to_bytes(value.buffer().clone())),
}
}
}

impl<'a> IntoIterator for &'a Bitmap {
Expand All @@ -394,3 +410,14 @@ impl IntoIterator for Bitmap {
IntoIter::new(self)
}
}

#[cfg(feature = "arrow")]
impl From<Bitmap> for arrow_buffer::buffer::NullBuffer {
fn from(value: Bitmap) -> Self {
let null_count = value.unset_bits;
let buffer = crate::buffer::to_buffer(value.bytes);
let buffer = arrow_buffer::buffer::BooleanBuffer::new(buffer, value.offset, value.length);
// Safety: null count is accurate
unsafe { arrow_buffer::buffer::NullBuffer::new_unchecked(buffer, null_count) }
}
}
14 changes: 14 additions & 0 deletions src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,17 @@ impl<T: Copy> IntoIterator for Buffer<T> {
IntoIter::new(self)
}
}

#[cfg(feature = "arrow")]
impl<T: crate::types::NativeType> From<arrow_buffer::Buffer> for Buffer<T> {
fn from(value: arrow_buffer::Buffer) -> Self {
Self::from_bytes(crate::buffer::to_bytes(value))
}
}

#[cfg(feature = "arrow")]
impl<T: crate::types::NativeType> From<Buffer<T>> for arrow_buffer::Buffer {
fn from(value: Buffer<T>) -> Self {
crate::buffer::to_buffer(value.data)
}
}
37 changes: 36 additions & 1 deletion src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,42 @@ mod iterator;

use crate::ffi::InternalArrowArray;

pub(crate) type Bytes<T> = foreign_vec::ForeignVec<InternalArrowArray, T>;
pub(crate) enum BytesAllocator {
InternalArrowArray(InternalArrowArray),

#[cfg(feature = "arrow")]
Arrow(arrow_buffer::Buffer),
}

pub(crate) type Bytes<T> = foreign_vec::ForeignVec<BytesAllocator, T>;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the real meat of the conversion

#[cfg(feature = "arrow")]
pub(crate) fn to_buffer<T: crate::types::NativeType>(
value: std::sync::Arc<Bytes<T>>,
) -> arrow_buffer::Buffer {
// This should never panic as ForeignVec pointer must be non-null
let ptr = std::ptr::NonNull::new(value.as_ptr() as _).unwrap();
let len = value.len() * std::mem::size_of::<T>();
// Safety: allocation is guaranteed to be valid for `len` bytes
unsafe { arrow_buffer::Buffer::from_custom_allocation(ptr, len, value) }
}

#[cfg(feature = "arrow")]
pub(crate) fn to_bytes<T: crate::types::NativeType>(value: arrow_buffer::Buffer) -> Bytes<T> {
let ptr = value.as_ptr();
let align = ptr.align_offset(std::mem::align_of::<T>());
assert_eq!(align, 0, "not aligned");
let len = value.len() / std::mem::size_of::<T>();

// Valid as `NativeType: Pod` and checked alignment above
let ptr = value.as_ptr() as *const T;

let owner = crate::buffer::BytesAllocator::Arrow(value);

// Safety: slice is valid for len elements of T
unsafe { Bytes::from_foreign(ptr, len, owner) }
}

pub(super) use iterator::IntoIter;

pub use immutable::Buffer;
5 changes: 3 additions & 2 deletions src/ffi/array.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Contains functionality to load an ArrayData from the C Data Interface
use std::sync::Arc;

use crate::buffer::BytesAllocator;
use crate::{
array::*,
bitmap::{utils::bytes_for, Bitmap},
Expand Down Expand Up @@ -237,7 +238,7 @@ unsafe fn create_buffer<T: NativeType>(

let len = buffer_len(array, data_type, index)?;
let offset = buffer_offset(array, data_type, index);
let bytes = Bytes::from_foreign(ptr, len, owner);
let bytes = Bytes::from_foreign(ptr, len, BytesAllocator::InternalArrowArray(owner));

Ok(Buffer::from_bytes(bytes).sliced(offset, len - offset))
}
Expand All @@ -258,7 +259,7 @@ unsafe fn create_bitmap(
let len: usize = array.length.try_into().expect("length to fit in `usize`");
let offset: usize = array.offset.try_into().expect("Offset to fit in `usize`");
let bytes_len = bytes_for(offset + len);
let bytes = Bytes::from_foreign(ptr, bytes_len, owner);
let bytes = Bytes::from_foreign(ptr, bytes_len, BytesAllocator::InternalArrowArray(owner));

Ok(Bitmap::from_bytes(bytes, offset + len).sliced(offset, len))
}
Expand Down
2 changes: 2 additions & 0 deletions src/types/native.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::convert::TryFrom;
use std::ops::Neg;
use std::panic::RefUnwindSafe;

use bytemuck::{Pod, Zeroable};

Expand All @@ -14,6 +15,7 @@ pub trait NativeType:
+ Send
+ Sync
+ Sized
+ RefUnwindSafe
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary to ensure that arrow2::Bytes<T> is RefUnwindSafe which is important to arrow::Buffer

+ std::fmt::Debug
+ std::fmt::Display
+ PartialEq
Expand Down
26 changes: 26 additions & 0 deletions tests/it/bitmap/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,29 @@ fn debug() {

assert_eq!(format!("{b:?}"), "[0b111110__, 0b_______1]");
}

#[test]
#[cfg(feature = "arrow")]
fn from_arrow() {
use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
let buffer = arrow_buffer::Buffer::from_iter(vec![true, true, true, false, false, false, true]);
let bools = BooleanBuffer::new(buffer, 0, 7);
let nulls = NullBuffer::new(bools);
assert_eq!(nulls.null_count(), 3);

let bitmap = Bitmap::from_null_buffer(nulls.clone());
assert_eq!(nulls.null_count(), bitmap.unset_bits());
assert_eq!(nulls.len(), bitmap.len());
let back = NullBuffer::from(bitmap);
assert_eq!(nulls, back);

let nulls = nulls.slice(1, 3);
assert_eq!(nulls.null_count(), 1);
assert_eq!(nulls.len(), 3);

let bitmap = Bitmap::from_null_buffer(nulls.clone());
assert_eq!(nulls.null_count(), bitmap.unset_bits());
assert_eq!(nulls.len(), bitmap.len());
let back = NullBuffer::from(bitmap);
assert_eq!(nulls, back);
}
64 changes: 64 additions & 0 deletions tests/it/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,67 @@ fn from_vec() {
assert_eq!(buffer.len(), 3);
assert_eq!(buffer.as_slice(), &[0, 1, 2]);
}

#[test]
#[cfg(feature = "arrow")]
fn from_arrow() {
let buffer = arrow_buffer::Buffer::from_vec(vec![1_i32, 2_i32, 3_i32]);
let b = Buffer::<i32>::from(buffer.clone());
assert_eq!(b.len(), 3);
assert_eq!(b.as_slice(), &[1, 2, 3]);
let back = arrow_buffer::Buffer::from(b);
assert_eq!(back, buffer);

let buffer = buffer.slice(4);
let b = Buffer::<i32>::from(buffer.clone());
assert_eq!(b.len(), 2);
assert_eq!(b.as_slice(), &[2, 3]);
let back = arrow_buffer::Buffer::from(b);
assert_eq!(back, buffer);

let buffer = arrow_buffer::Buffer::from_vec(vec![1_i64, 2_i64]);
let b = Buffer::<i32>::from(buffer.clone());
assert_eq!(b.len(), 4);
assert_eq!(b.as_slice(), &[1, 0, 2, 0]);
let back = arrow_buffer::Buffer::from(b);
assert_eq!(back, buffer);

let buffer = buffer.slice(4);
let b = Buffer::<i32>::from(buffer.clone());
assert_eq!(b.len(), 3);
assert_eq!(b.as_slice(), &[0, 2, 0]);
let back = arrow_buffer::Buffer::from(b);
assert_eq!(back, buffer);
}

#[test]
#[cfg(feature = "arrow")]
fn from_arrow_vec() {
// Zero-copy vec conversion in arrow-rs
let buffer = arrow_buffer::Buffer::from_vec(vec![1_i32, 2_i32, 3_i32]);
let back: Vec<i32> = buffer.into_vec().unwrap();

// Zero-copy vec conversion in arrow2
let buffer = Buffer::<i32>::from(back);
let back: Vec<i32> = buffer.into_mut().unwrap_right();

let buffer = arrow_buffer::Buffer::from_vec(back);
let buffer = Buffer::<i32>::from(buffer);

// But not possible after conversion between buffer representations
let _ = buffer.into_mut().unwrap_left();

let buffer = Buffer::<i32>::from(vec![1_i32]);
let buffer = arrow_buffer::Buffer::from(buffer);

// But not possible after conversion between buffer representations
let _ = buffer.into_vec::<i32>().unwrap_err();
}

#[test]
#[cfg(feature = "arrow")]
#[should_panic(expected = "not aligned")]
fn from_arrow_misaligned() {
let buffer = arrow_buffer::Buffer::from_vec(vec![1_i32, 2_i32, 3_i32]).slice(1);
let _ = Buffer::<i32>::from(buffer);
}