From 4cbad4d84afe0a391a2fead4aacc0696a31d7c9d Mon Sep 17 00:00:00 2001
From: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
Date: Thu, 23 Feb 2023 12:53:52 +0000
Subject: [PATCH 1/5] Zero-copy Vec conversion (#3516) (#1176)

---
 arrow-array/src/array/list_array.rs  |   2 +
 arrow-buffer/src/alloc/mod.rs        |  14 ++-
 arrow-buffer/src/buffer/immutable.rs | 147 ++++++++++++++++++++++++++-
 arrow-buffer/src/buffer/mutable.rs   |  30 ++++--
 arrow-buffer/src/buffer/scalar.rs    |   9 ++
 arrow-buffer/src/bytes.rs            |  13 +--
 6 files changed, 190 insertions(+), 25 deletions(-)

diff --git a/arrow-array/src/array/list_array.rs b/arrow-array/src/array/list_array.rs
index 6b63269d1615..178139f810e7 100644
--- a/arrow-array/src/array/list_array.rs
+++ b/arrow-array/src/array/list_array.rs
@@ -829,6 +829,7 @@ mod tests {
 
     #[test]
     #[should_panic(expected = "memory is not aligned")]
+    #[allow(deprecated)]
     fn test_primitive_array_alignment() {
         let ptr = arrow_buffer::alloc::allocate_aligned(8);
         let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
@@ -845,6 +846,7 @@ mod tests {
     // Different error messages, so skip for now
     // https://github.com/apache/arrow-rs/issues/1545
     #[cfg(not(feature = "force_validate"))]
+    #[allow(deprecated)]
     fn test_list_array_alignment() {
         let ptr = arrow_buffer::alloc::allocate_aligned(8);
         let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
diff --git a/arrow-buffer/src/alloc/mod.rs b/arrow-buffer/src/alloc/mod.rs
index 1493d839f5ab..3dfe7848d3f9 100644
--- a/arrow-buffer/src/alloc/mod.rs
+++ b/arrow-buffer/src/alloc/mod.rs
@@ -45,6 +45,7 @@ fn dangling_ptr() -> NonNull<u8> {
 /// Allocates a cache-aligned memory region of `size` bytes with uninitialized values.
 /// This is more performant than using [allocate_aligned_zeroed] when all bytes will have
 /// an unknown or non-zero value and is semantically similar to `malloc`.
+#[deprecated(note = "Use Vec")]
 pub fn allocate_aligned(size: usize) -> NonNull<u8> {
     unsafe {
         if size == 0 {
@@ -60,6 +61,7 @@ pub fn allocate_aligned(size: usize) -> NonNull<u8> {
 /// Allocates a cache-aligned memory region of `size` bytes with `0` on all of them.
 /// This is more performant than using [allocate_aligned] and setting all bytes to zero
 /// and is semantically similar to `calloc`.
+#[deprecated(note = "Use Vec")]
 pub fn allocate_aligned_zeroed(size: usize) -> NonNull<u8> {
     unsafe {
         if size == 0 {
@@ -80,6 +82,7 @@ pub fn allocate_aligned_zeroed(size: usize) -> NonNull<u8> {
 /// * ptr must denote a block of memory currently allocated via this allocator,
 ///
 /// * size must be the same size that was used to allocate that block of memory,
+#[deprecated(note = "Use Vec")]
 pub unsafe fn free_aligned(ptr: NonNull<u8>, size: usize) {
     if size != 0 {
         std::alloc::dealloc(
@@ -100,6 +103,8 @@ pub unsafe fn free_aligned(ptr: NonNull<u8>, size: usize) {
 ///
 /// * new_size, when rounded up to the nearest multiple of [ALIGNMENT], must not overflow (i.e.,
 /// the rounded value must be less than usize::MAX).
+#[deprecated(note = "Use Vec")]
+#[allow(deprecated)]
 pub unsafe fn reallocate(
     ptr: NonNull<u8>,
     old_size: usize,
@@ -132,9 +137,8 @@ impl<T: RefUnwindSafe + Send + Sync> Allocation for T {}
 
 /// Mode of deallocating memory regions
 pub(crate) enum Deallocation {
-    /// An allocation of the given capacity that needs to be deallocated using arrows's cache aligned allocator.
-    /// See [allocate_aligned] and [free_aligned].
-    Arrow(usize),
+    /// An allocation using [`std::alloc`]
+    Standard(Layout),
     /// An allocation from an external source like the FFI interface or a Rust Vec.
     /// Deallocation will happen
     Custom(Arc<dyn Allocation>),
@@ -143,8 +147,8 @@ pub(crate) enum Deallocation {
 impl Debug for Deallocation {
     fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
         match self {
-            Deallocation::Arrow(capacity) => {
-                write!(f, "Deallocation::Arrow {{ capacity: {capacity} }}")
+            Deallocation::Standard(layout) => {
+                write!(f, "Deallocation::Standard {layout:?}")
             }
             Deallocation::Custom(_) => {
                 write!(f, "Deallocation::Custom {{ capacity: unknown }}")
diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs
index cbfba1e0540c..042992efd7c3 100644
--- a/arrow-buffer/src/buffer/immutable.rs
+++ b/arrow-buffer/src/buffer/immutable.rs
@@ -15,13 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::convert::AsRef;
+use std::alloc::Layout;
 use std::fmt::Debug;
 use std::iter::FromIterator;
 use std::ptr::NonNull;
 use std::sync::Arc;
 
-use crate::alloc::{Allocation, Deallocation};
+use crate::alloc::{Allocation, Deallocation, ALIGNMENT};
 use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
 use crate::{bytes::Bytes, native::ArrowNativeType};
 
@@ -42,6 +42,8 @@ pub struct Buffer {
     ptr: *const u8,
 
     /// Byte length of the buffer.
+    ///
+    /// Must be less than or equal to `data.len()`
     length: usize,
 }
 
@@ -69,6 +71,21 @@ impl Buffer {
         }
     }
 
+    /// Create a [`Buffer`] from the provided `Vec` without copying
+    #[inline]
+    pub fn from_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
+        // Safety
+        // Vec::as_ptr guaranteed to not be null and ArrowNativeType are trivially transmutable
+        let ptr = unsafe { NonNull::new_unchecked(vec.as_ptr() as _) };
+        let len = vec.len() * std::mem::size_of::<T>();
+        // Safety
+        // Layout guaranteed to be valid
+        let layout = unsafe { Layout::array::<T>(vec.capacity()).unwrap_unchecked() };
+        std::mem::forget(vec);
+        let b = unsafe { Bytes::new(ptr, len, Deallocation::Standard(layout)) };
+        Self::from_bytes(b)
+    }
+
     /// Initializes a [Buffer] from a slice of items.
     pub fn from_slice_ref<U: ArrowNativeType, T: AsRef<[U]>>(items: T) -> Self {
         let slice = items.as_ref();
@@ -78,7 +95,7 @@ impl Buffer {
         buffer.into()
     }
 
-    /// Creates a buffer from an existing memory region (must already be byte-aligned), this
+    /// Creates a buffer from an existing aligned memory region (must already be byte-aligned), this
     /// `Buffer` will free this piece of memory when dropped.
     ///
     /// # Arguments
@@ -91,9 +108,11 @@ impl Buffer {
     ///
     /// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
     /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
+    #[deprecated(note = "Use From<Vec<T>>")]
     pub unsafe fn from_raw_parts(ptr: NonNull<u8>, len: usize, capacity: usize) -> Self {
         assert!(len <= capacity);
-        Buffer::build_with_arguments(ptr, len, Deallocation::Arrow(capacity))
+        let layout = Layout::from_size_align(capacity, ALIGNMENT).unwrap();
+        Buffer::build_with_arguments(ptr, len, Deallocation::Standard(layout))
     }
 
     /// Creates a buffer from an existing memory region. Ownership of the memory is tracked via reference counting
@@ -253,7 +272,8 @@ impl Buffer {
     }
 
     /// Returns `MutableBuffer` for mutating the buffer if this buffer is not shared.
-    /// Returns `Err` if this is shared or its allocation is from an external source.
+    /// Returns `Err` if this is shared or its allocation is from an external source or
+    /// it is not allocated with alignment [`ALIGNMENT`]
     pub fn into_mutable(self) -> Result<MutableBuffer, Self> {
         let ptr = self.ptr;
         let length = self.length;
@@ -269,6 +289,43 @@ impl Buffer {
                 length,
             })
     }
+
+    /// Returns `Vec` for mutating the buffer if this buffer is not offset and was
+    /// allocated with the correct layout for `Vec<T>`
+    pub fn into_vec<T: ArrowNativeType>(self) -> Result<Vec<T>, Self> {
+        let layout = match self.data.deallocation() {
+            Deallocation::Standard(l) => l,
+            _ => return Err(self), // Custom allocation
+        };
+
+        if self.ptr != self.data.as_ptr() {
+            return Err(self); // Data is offset
+        }
+
+        let v_capacity = layout.size() / std::mem::size_of::<T>();
+        match Layout::array::<T>(v_capacity) {
+            Ok(expected) if layout == &expected => {}
+            _ => return Err(self), // Incorrect layout
+        }
+
+        let length = self.length;
+        let ptr = self.ptr;
+        let v_len = self.length / std::mem::size_of::<T>();
+
+        Arc::try_unwrap(self.data)
+            .map(|bytes| unsafe {
+                let ptr = bytes.ptr().as_ptr() as _;
+                std::mem::forget(bytes);
+                // Safety
+                // Verified that bytes layout matches that of Vec
+                Vec::from_raw_parts(ptr, v_len, v_capacity)
+            })
+            .map_err(|bytes| Buffer {
+                data: bytes,
+                ptr,
+                length,
+            })
+    }
 }
 
 /// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
@@ -378,6 +435,7 @@ impl<T: ArrowNativeType> FromIterator<T> for Buffer {
 
 #[cfg(test)]
 mod tests {
+    use crate::i256;
     use std::panic::{RefUnwindSafe, UnwindSafe};
     use std::thread;
 
@@ -632,4 +690,83 @@ mod tests {
         let buffer = Buffer::from(MutableBuffer::from_len_zeroed(12));
         buffer.slice_with_length(2, usize::MAX);
     }
+
+    #[test]
+    fn test_vec_interop() {
+        // Test empty vec
+        let a: Vec<i128> = Vec::new();
+        let b = Buffer::from_vec(a);
+        b.into_vec::<i128>().unwrap();
+
+        // Test vec with capacity
+        let a: Vec<i128> = Vec::with_capacity(20);
+        let b = Buffer::from_vec(a);
+        let back = b.into_vec::<i128>().unwrap();
+        assert_eq!(back.len(), 0);
+        assert_eq!(back.capacity(), 20);
+
+        // Test vec with values
+        let mut a: Vec<i128> = Vec::with_capacity(3);
+        a.extend_from_slice(&[1, 2, 3]);
+        let b = Buffer::from_vec(a);
+        let back = b.into_vec::<i128>().unwrap();
+        assert_eq!(back.len(), 3);
+        assert_eq!(back.capacity(), 3);
+
+        // Test vec with values and spare capacity
+        let mut a: Vec<i128> = Vec::with_capacity(20);
+        a.extend_from_slice(&[1, 4, 7, 8, 9, 3, 6]);
+        let b = Buffer::from_vec(a);
+        let back = b.into_vec::<i128>().unwrap();
+        assert_eq!(back.len(), 7);
+        assert_eq!(back.capacity(), 20);
+
+        // Test incorrect alignment
+        let a: Vec<i128> = Vec::new();
+        let b = Buffer::from_vec(a);
+        let b = b.into_vec::<i32>().unwrap_err();
+        b.into_vec::<i8>().unwrap_err();
+
+        // Test convert between types with same alignment
+        // This is an implementation quirk, but isn't harmful
+        // as ArrowNativeType are trivially transmutable
+        let a: Vec<i64> = vec![1, 2, 3, 4];
+        let b = Buffer::from_vec(a);
+        let back = b.into_vec::<u64>().unwrap();
+        assert_eq!(back.len(), 4);
+        assert_eq!(back.capacity(), 4);
+
+        // i256 has the same layout as i128 so this is valid
+        let mut b: Vec<i128> = Vec::with_capacity(4);
+        b.extend_from_slice(&[1, 2, 3, 4]);
+        let b = Buffer::from_vec(b);
+        let back = b.into_vec::<i256>().unwrap();
+        assert_eq!(back.len(), 2);
+        assert_eq!(back.capacity(), 2);
+
+        // Invalid layout
+        let b: Vec<i128> = vec![1, 2, 3];
+        let b = Buffer::from_vec(b);
+        b.into_vec::<i256>().unwrap_err();
+
+        // Invalid layout
+        let mut b: Vec<i128> = Vec::with_capacity(5);
+        b.extend_from_slice(&[1, 2, 3, 4]);
+        let b = Buffer::from_vec(b);
+        b.into_vec::<i256>().unwrap_err();
+
+        // Truncates length
+        // This is an implementation quirk, but isn't harmful
+        let mut b: Vec<i128> = Vec::with_capacity(4);
+        b.extend_from_slice(&[1, 2, 3]);
+        let b = Buffer::from_vec(b);
+        let back = b.into_vec::<i256>().unwrap();
+        assert_eq!(back.len(), 1);
+        assert_eq!(back.capacity(), 2);
+
+        // Cannot use aligned allocation
+        let b = Buffer::from(MutableBuffer::new(10));
+        let b = b.into_vec::<u8>().unwrap_err();
+        b.into_vec::<u64>().unwrap_err();
+    }
 }
diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs
index 2e6e2f1d7b08..35218eb80376 100644
--- a/arrow-buffer/src/buffer/mutable.rs
+++ b/arrow-buffer/src/buffer/mutable.rs
@@ -16,23 +16,28 @@
 // under the License.
 
 use super::Buffer;
-use crate::alloc::Deallocation;
+use crate::alloc::{Deallocation, ALIGNMENT};
 use crate::{
     alloc,
     bytes::Bytes,
     native::{ArrowNativeType, ToByteSlice},
     util::bit_util,
 };
+use std::alloc::Layout;
 use std::mem;
 use std::ptr::NonNull;
 
 /// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items.
+///
 /// [`Buffer`]s created from [`MutableBuffer`] (via `into`) are guaranteed to have its pointer aligned
 /// along cache lines and in multiple of 64 bytes.
+///
 /// Use [MutableBuffer::push] to insert an item, [MutableBuffer::extend_from_slice]
 /// to insert many items, and `into` to convert it to [`Buffer`].
 ///
-/// For a safe, strongly typed API consider using `arrow::array::BufferBuilder`
+/// For a safe, strongly typed API consider using `Vec`
+///
+/// Note: this may be deprecated in a future release (#1176)[https://github.com/apache/arrow-rs/issues/1176]
 ///
 /// # Example
 ///
@@ -62,6 +67,7 @@ impl MutableBuffer {
 
     /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`.
     #[inline]
+    #[allow(deprecated)]
     pub fn with_capacity(capacity: usize) -> Self {
         let capacity = bit_util::round_upto_multiple_of_64(capacity);
         let ptr = alloc::allocate_aligned(capacity);
@@ -83,6 +89,7 @@ impl MutableBuffer {
     /// let data = buffer.as_slice_mut();
     /// assert_eq!(data[126], 0u8);
     /// ```
+    #[allow(deprecated)]
     pub fn from_len_zeroed(len: usize) -> Self {
         let new_capacity = bit_util::round_upto_multiple_of_64(len);
         let ptr = alloc::allocate_aligned_zeroed(new_capacity);
@@ -95,12 +102,14 @@ impl MutableBuffer {
 
     /// Allocates a new [MutableBuffer] from given `Bytes`.
     pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
-        if !matches!(bytes.deallocation(), Deallocation::Arrow(_)) {
-            return Err(bytes);
-        }
+        let capacity = match bytes.deallocation() {
+            Deallocation::Standard(layout) if layout.align() == ALIGNMENT => {
+                layout.size()
+            }
+            _ => return Err(bytes),
+        };
 
         let len = bytes.len();
-        let capacity = bytes.capacity();
         let ptr = bytes.ptr();
         mem::forget(bytes);
 
@@ -224,6 +233,7 @@ impl MutableBuffer {
     /// buffer.shrink_to_fit();
     /// assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
     /// ```
+    #[allow(deprecated)]
     pub fn shrink_to_fit(&mut self) {
         let new_capacity = bit_util::round_upto_multiple_of_64(self.len);
         if new_capacity < self.capacity {
@@ -300,9 +310,9 @@ impl MutableBuffer {
 
     #[inline]
     pub(super) fn into_buffer(self) -> Buffer {
-        let bytes = unsafe {
-            Bytes::new(self.data, self.len, Deallocation::Arrow(self.capacity))
-        };
+        let layout = Layout::from_size_align(self.capacity, ALIGNMENT).unwrap();
+        let bytes =
+            unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(layout)) };
         std::mem::forget(self);
         Buffer::from_bytes(bytes)
     }
@@ -448,6 +458,7 @@ impl MutableBuffer {
 /// # Safety
 /// `ptr` must be allocated for `old_capacity`.
 #[cold]
+#[allow(deprecated)]
 unsafe fn reallocate(
     ptr: NonNull<u8>,
     old_capacity: usize,
@@ -630,6 +641,7 @@ impl std::ops::DerefMut for MutableBuffer {
 }
 
 impl Drop for MutableBuffer {
+    #[allow(deprecated)]
     fn drop(&mut self) {
         unsafe { alloc::free_aligned(self.data, self.capacity) };
     }
diff --git a/arrow-buffer/src/buffer/scalar.rs b/arrow-buffer/src/buffer/scalar.rs
index e688e52fea5c..01a64633f532 100644
--- a/arrow-buffer/src/buffer/scalar.rs
+++ b/arrow-buffer/src/buffer/scalar.rs
@@ -90,6 +90,15 @@ impl<T: ArrowNativeType> From<Buffer> for ScalarBuffer<T> {
     }
 }
 
+impl<T: ArrowNativeType> From<Vec<T>> for ScalarBuffer<T> {
+    fn from(value: Vec<T>) -> Self {
+        Self {
+            buffer: Buffer::from_vec(value),
+            phantom: Default::default(),
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/arrow-buffer/src/bytes.rs b/arrow-buffer/src/bytes.rs
index 3320dfc261c7..2820fda781e6 100644
--- a/arrow-buffer/src/bytes.rs
+++ b/arrow-buffer/src/bytes.rs
@@ -23,10 +23,10 @@ use core::slice;
 use std::ptr::NonNull;
 use std::{fmt::Debug, fmt::Formatter};
 
-use crate::alloc;
 use crate::alloc::Deallocation;
 
 /// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself.
+///
 /// This structs' API is inspired by the `bytes::Bytes`, but it is not limited to using rust's
 /// global allocator nor u8 alignment.
 ///
@@ -53,7 +53,7 @@ impl Bytes {
     ///
     /// * `ptr` - Pointer to raw parts
     /// * `len` - Length of raw parts in **bytes**
-    /// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes**
+    /// * `deallocation` - Type of allocation
     ///
     /// # Safety
     ///
@@ -93,7 +93,7 @@ impl Bytes {
 
     pub fn capacity(&self) -> usize {
         match self.deallocation {
-            Deallocation::Arrow(capacity) => capacity,
+            Deallocation::Standard(layout) => layout.size(),
             // we cannot determine this in general,
             // and thus we state that this is externally-owned memory
             Deallocation::Custom(_) => 0,
@@ -115,9 +115,10 @@ impl Drop for Bytes {
     #[inline]
     fn drop(&mut self) {
         match &self.deallocation {
-            Deallocation::Arrow(capacity) => {
-                unsafe { alloc::free_aligned(self.ptr, *capacity) };
-            }
+            Deallocation::Standard(layout) => match layout.size() {
+                0 => {} // Nothing to do
+                _ => unsafe { std::alloc::dealloc(self.ptr.as_ptr(), *layout) },
+            },
             // The automatic drop implementation will free the memory once the reference count reaches zero
             Deallocation::Custom(_allocation) => (),
         }

From d84e499ab977c3ca2e7fe969ecf15b01512d45c1 Mon Sep 17 00:00:00 2001
From: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
Date: Thu, 23 Feb 2023 14:38:07 +0000
Subject: [PATCH 2/5] Fix doc

---
 arrow-buffer/src/buffer/mutable.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs
index 35218eb80376..250ac9f31595 100644
--- a/arrow-buffer/src/buffer/mutable.rs
+++ b/arrow-buffer/src/buffer/mutable.rs
@@ -37,7 +37,7 @@ use std::ptr::NonNull;
 ///
 /// For a safe, strongly typed API consider using `Vec`
 ///
-/// Note: this may be deprecated in a future release (#1176)[https://github.com/apache/arrow-rs/issues/1176]
+/// Note: this may be deprecated in a future release ([#1176](https://github.com/apache/arrow-rs/issues/1176))
 ///
 /// # Example
 ///

From a4f1435cc55a1cc0288a7bb14bcf7893dc59aaef Mon Sep 17 00:00:00 2001
From: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
Date: Thu, 23 Feb 2023 15:27:09 +0000
Subject: [PATCH 3/5] More tests

---
 arrow-buffer/src/buffer/immutable.rs | 32 ++++++++++++++++++++++++++++
 1 file changed, 32 insertions(+)

diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs
index 042992efd7c3..a9652dc64647 100644
--- a/arrow-buffer/src/buffer/immutable.rs
+++ b/arrow-buffer/src/buffer/immutable.rs
@@ -768,5 +768,37 @@ mod tests {
         let b = Buffer::from(MutableBuffer::new(10));
         let b = b.into_vec::<u8>().unwrap_err();
         b.into_vec::<u64>().unwrap_err();
+
+        // Test slicing
+        let mut a: Vec<i128> = Vec::with_capacity(20);
+        a.extend_from_slice(&[1, 4, 7, 8, 9, 3, 6]);
+        let b = Buffer::from_vec(a);
+        let slice = b.slice_with_length(0, 64);
+
+        // Shared reference fails
+        let slice = slice.into_vec::<i128>().unwrap_err();
+        drop(b);
+
+        // Succeeds as no outstanding shared reference
+        let back = slice.into_vec::<i128>().unwrap();
+        assert_eq!(&back, &[1, 4, 7, 8]);
+        assert_eq!(back.capacity(), 20);
+
+        // Slicing by non-multiple length truncates
+        let mut a: Vec<i128> = Vec::with_capacity(8);
+        a.extend_from_slice(&[1, 4, 7, 3]);
+
+        let b = Buffer::from_vec(a);
+        let slice = b.slice_with_length(0, 34);
+        drop(b);
+
+        let back = slice.into_vec::<i128>().unwrap();
+        assert_eq!(&back, &[1, 4]);
+        assert_eq!(back.capacity(), 8);
+
+        // Offset prevents conversion
+        let a: Vec<u32> = vec![1, 3, 4, 6];
+        let b = Buffer::from_vec(a).slice(2);
+        b.into_vec::<u32>().unwrap_err();
     }
 }

From f07f39655af8c900948c1108e2ed142a630a922d Mon Sep 17 00:00:00 2001
From: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
Date: Mon, 27 Feb 2023 14:33:39 +0000
Subject: [PATCH 4/5] Review feedback

---
 arrow-buffer/src/alloc/mod.rs        | 4 ++--
 arrow-buffer/src/buffer/immutable.rs | 9 ++++++---
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git a/arrow-buffer/src/alloc/mod.rs b/arrow-buffer/src/alloc/mod.rs
index 3dfe7848d3f9..7600a28d8754 100644
--- a/arrow-buffer/src/alloc/mod.rs
+++ b/arrow-buffer/src/alloc/mod.rs
@@ -139,8 +139,8 @@ impl<T: RefUnwindSafe + Send + Sync> Allocation for T {}
 pub(crate) enum Deallocation {
     /// An allocation using [`std::alloc`]
     Standard(Layout),
-    /// An allocation from an external source like the FFI interface or a Rust Vec.
-    /// Deallocation will happen
+    /// An allocation from an external source like the FFI interface
+    /// Deallocation will happen on `Allocation::drop`
     Custom(Arc<dyn Allocation>),
 }
 
diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs
index a9652dc64647..248446fcce25 100644
--- a/arrow-buffer/src/buffer/immutable.rs
+++ b/arrow-buffer/src/buffer/immutable.rs
@@ -79,7 +79,8 @@ impl Buffer {
         let ptr = unsafe { NonNull::new_unchecked(vec.as_ptr() as _) };
         let len = vec.len() * std::mem::size_of::<T>();
         // Safety
-        // Layout guaranteed to be valid
+        // Vec guaranteed to have a valid layout matching that of `Layout::array`
+        // This is based on `RawVec::current_memory`
         let layout = unsafe { Layout::array::<T>(vec.capacity()).unwrap_unchecked() };
         std::mem::forget(vec);
         let b = unsafe { Bytes::new(ptr, len, Deallocation::Standard(layout)) };
@@ -290,8 +291,10 @@ impl Buffer {
             })
     }
 
-    /// Returns `Vec` for mutating the buffer if this buffer is not offset and was
-    /// allocated with the correct layout for `Vec<T>`
+    /// Returns `Vec` for mutating the buffer
+    ///
+    /// Returns `Err(self)` if this buffer does not have the same [`Layout`] as
+    /// the destination Vec or contains a non-zero offset
     pub fn into_vec<T: ArrowNativeType>(self) -> Result<Vec<T>, Self> {
         let layout = match self.data.deallocation() {
             Deallocation::Standard(l) => l,

From 4a882f54d2aa3235b087101c8cf1a7df9c0d571a Mon Sep 17 00:00:00 2001
From: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
Date: Mon, 27 Feb 2023 14:36:29 +0000
Subject: [PATCH 5/5] More tests

---
 arrow-buffer/src/buffer/immutable.rs | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs
index 248446fcce25..5f42035c9e30 100644
--- a/arrow-buffer/src/buffer/immutable.rs
+++ b/arrow-buffer/src/buffer/immutable.rs
@@ -803,5 +803,15 @@ mod tests {
         let a: Vec<u32> = vec![1, 3, 4, 6];
         let b = Buffer::from_vec(a).slice(2);
         b.into_vec::<u32>().unwrap_err();
+
+        let b = MutableBuffer::new(16).into_buffer();
+        let b = b.into_vec::<u8>().unwrap_err(); // Invalid layout
+        let b = b.into_vec::<u32>().unwrap_err(); // Invalid layout
+        b.into_mutable().unwrap();
+
+        let b = Buffer::from_vec(vec![1_u32, 3, 5]);
+        let b = b.into_mutable().unwrap_err(); // Invalid layout
+        let b = b.into_vec::<u32>().unwrap();
+        assert_eq!(b, &[1, 3, 5]);
     }
 }