From 7ddcd2287d996b46d6701a460f2a47f4fe340949 Mon Sep 17 00:00:00 2001 From: "Stephen M. Coakley" Date: Sat, 24 Feb 2018 14:24:33 -0600 Subject: [PATCH] improve internal algorithm and organization --- src/arrays.rs | 42 ++++++++- src/buffer.rs | 250 +++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 253 +------------------------------------------------- 3 files changed, 292 insertions(+), 253 deletions(-) create mode 100644 src/buffer.rs diff --git a/src/arrays.rs b/src/arrays.rs index bc85942..b2e1d0d 100644 --- a/src/arrays.rs +++ b/src/arrays.rs @@ -1,4 +1,5 @@ //! Provides functions for dynamic array manipulation. +use std::cmp::Ordering; /// Allocate an uninitialized array of a given size. /// @@ -9,10 +10,43 @@ pub unsafe fn allocate(len: usize) -> Box<[T]> { vec.into_boxed_slice() } -/// Copy elements from one array to another in a range. +/// Copy as many elements as possible from one array to another. /// -/// Panics if there are less than `len` items in either of the given regions. +/// Returns the number of elements copied. #[inline] -pub fn copy(src: &[T], src_offset: usize, dest: &mut [T], dest_offset: usize, len: usize) { - (&mut dest[dest_offset .. dest_offset + len]).copy_from_slice(&src[src_offset .. src_offset + len]) +pub fn copy(src: &[T], dest: &mut [T]) -> usize { + let len = src.len().min(dest.len()); + (&mut dest[..len]).copy_from_slice(&src[..len]); + len +} + +/// Extension trait for slices for working with wrapping ranges and indicies. +pub trait WrappingSlice { + /// Gets a pair of slices in the given range, wrapping around length. + fn wrapping_range(&self, from: usize, to: usize) -> (&[T], &[T]); + + /// Gets a pair of mutable slices in the given range, wrapping around length. + fn wrapping_range_mut(&mut self, from: usize, to: usize) -> (&mut [T], &mut [T]); +} + +impl WrappingSlice for [T] { + fn wrapping_range(&self, from: usize, to: usize) -> (&[T], &[T]) { + match from.cmp(&to) { + Ordering::Equal => (&[], &[]), + Ordering::Less => (&self[from..to], &[]), + Ordering::Greater => (&self[from..], &self[..to]), + } + } + + fn wrapping_range_mut(&mut self, from: usize, to: usize) -> (&mut [T], &mut [T]) { + match from.cmp(&to) { + Ordering::Equal => (&mut [], &mut []), + Ordering::Less => (&mut self[from..to], &mut []), + Ordering::Greater => { + let (mid, right) = self.split_at_mut(from); + let left = mid.split_at_mut(to).0; + (left, right) + }, + } + } } diff --git a/src/buffer.rs b/src/buffer.rs new file mode 100644 index 0000000..3194ec0 --- /dev/null +++ b/src/buffer.rs @@ -0,0 +1,250 @@ +use arrays::{self, WrappingSlice}; +use std::io::{self, Read, Write}; + +/// Growable ring buffer. +/// +/// Optimized for repeated appending of bytes to the end and removing bytes from the front of the buffer. +#[derive(Clone)] +pub struct Buffer { + /// Backing array where elements are stored. Size is always a power of two. + array: Box<[T]>, + + /// The "head" index into the backing array that marks the start of the buffer elements. + /// + /// This index may exceed the length of the backing array during the lifetime of the buffer, and is only ever + /// incremented. + head: usize, + + /// The "tail" index into the backing array that marks the end of the buffer elements. + /// + /// Same as `head`, this is incremented unbounded. + tail: usize, +} + +impl Default for Buffer { + fn default() -> Buffer { + Buffer::new() + } +} + +impl Buffer { + pub const DEFAULT_CAPACITY: usize = 4096; + + /// Create a new buffer with the default capacity. + pub fn new() -> Self { + Self::with_capacity(Self::DEFAULT_CAPACITY) + } + + /// Create a new buffer with a given minimum capacity pre-allocated. + pub fn with_capacity(capacity: usize) -> Self { + let capacity = capacity.next_power_of_two(); + Self { + array: unsafe { arrays::allocate(capacity) }, + head: 0, + tail: 0, + } + } + + /// Returns `true` if the buffer is empty. + #[inline] + pub fn is_empty(&self) -> bool { + // The head and tail can only be equal to each other if: (a) the number of inserted elements over time is equal + // to the number of removed elements over time, and is thus empty; or (b) exactly `usize::max_value()` elements + // were inserted without being removed such that `tail` overflowed and wrapped around to equal `head`. This is + // improbable since the buffer would have to be at least the size of max pointer value. If the OS does let you + // allocate more memory than fits in a pointer, you have bigger problems. + self.head == self.tail + } + + /// Returns the number of elements in the buffer. + #[inline] + pub fn len(&self) -> usize { + // Even if `tail` overflows and becomes less than `head`, subtracting will underflow and result in the correct + // length. + self.tail.wrapping_sub(self.head) + } + + /// Returns the current capacity of the buffer. + #[inline] + pub fn capacity(&self) -> usize { + self.array.len() + } + + /// Copy the given elements and insert them into the back of the buffer. + /// + /// Returns the number of elements pushed. + pub fn push(&mut self, src: &[T]) -> usize { + // If the number of bytes to add would exceed the capacity, grow the internal array first. + let new_len = self.len() + src.len(); + if new_len > self.capacity() { + self.resize(new_len); + } + + let head_index = self.mask(self.head); + let tail_index = self.mask(self.tail); + + let slices = if head_index == tail_index { + let slices = self.array.split_at_mut(head_index); + (slices.1, slices.0) + } else { + self.array.wrapping_range_mut(tail_index, head_index) + }; + + let mut pushed = arrays::copy(src, slices.0); + pushed += arrays::copy(&src[pushed..], slices.1); + + self.tail = self.tail.wrapping_add(pushed); + pushed + } + + /// Pull bytes from the front of the buffer into the given location, up to the length of the destination buffer. + /// + /// Returns the number of elements pulled. + pub fn pull(&mut self, dest: &mut [T]) -> usize { + let count = self.copy_to(dest); + self.consume(count) + } + + /// Copy elements from the front of the buffer into the given slice. + /// + /// Returns the number of elements copied. If there are less elements in the buffer than the length of `dest`, then + /// only part of `dest` will be written to. + pub fn copy_to(&self, dest: &mut [T]) -> usize { + let slices = self.array + .wrapping_range(self.mask(self.head), self.mask(self.tail)); + + let mut copied = arrays::copy(slices.0, dest); + copied += arrays::copy(slices.1, &mut dest[copied..]); + + copied + } + + /// Consume up to `count` elements from the front of the buffer and discards them. + /// + /// Returns the number of elements consumed, which may be less than `count` if `count` was greater than the number + /// of elements in the buffer. + /// + /// This operation has a runtime cost of `O(1)`. + pub fn consume(&mut self, count: usize) -> usize { + // We can only consume as many elements as are in the buffer. + let count = count.min(self.len()); + self.head = self.head.wrapping_add(count); + count + } + + /// Remove all elements from the buffer. + pub fn clear(&mut self) { + self.head = 0; + self.tail = 0; + } + + fn resize(&mut self, size: usize) { + // Size must always be a power of 2. + let size = size.next_power_of_two(); + + let mut array = unsafe { arrays::allocate(size) }; + + self.tail = self.copy_to(&mut array); + self.head = 0; + self.array = array; + } + + #[inline] + fn mask(&self, index: usize) -> usize { + index & (self.capacity() - 1) + } +} + +impl Read for Buffer { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + Ok(self.pull(buf)) + } +} + +impl Write for Buffer { + fn write(&mut self, buf: &[u8]) -> io::Result { + Ok(self.push(buf)) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::Buffer; + + #[test] + fn test_capacity() { + let buffer = Buffer::::with_capacity(16); + assert!(buffer.capacity() == 16); + } + + #[test] + fn test_push() { + let mut buffer = Buffer::new(); + + assert!(buffer.is_empty()); + + let bytes = b"hello world"; + buffer.push(bytes); + + assert!(!buffer.is_empty()); + assert!(buffer.len() == bytes.len()); + } + + #[test] + fn test_push_and_consume() { + let mut buffer = Buffer::with_capacity(12); + + buffer.push(b"hello world"); + + assert!(buffer.consume(6) == 6); + assert!(buffer.len() == 5); + + buffer.push(b" hello"); + + assert!(buffer.len() == 11); + } + + #[test] + fn test_pull_more_than_buffer() { + let mut buffer = Buffer::new(); + let bytes = b"hello world"; + buffer.push(bytes); + + let mut dst = [0; 1024]; + assert!(buffer.pull(&mut dst) == bytes.len()); + assert!(&dst[0..bytes.len()] == bytes); + assert!(buffer.is_empty()); + } + + #[test] + fn test_pull_less_than_buffer() { + let mut buffer = Buffer::new(); + let bytes = b"hello world"; + buffer.push(bytes); + + let mut dst = [0; 4]; + assert!(buffer.pull(&mut dst) == dst.len()); + assert!(&dst == &bytes[0..4]); + assert!(!buffer.is_empty()); + assert!(buffer.len() == bytes.len() - dst.len()); + } + + #[test] + fn test_force_resize() { + let mut buffer = Buffer::with_capacity(8); + + buffer.push(b"hello"); + assert!(buffer.capacity() == 8); + + buffer.push(b" world"); + assert!(buffer.capacity() > 8); + + let mut out = [0; 11]; + buffer.copy_to(&mut out); + assert!(&out == b"hello world"); + } +} diff --git a/src/lib.rs b/src/lib.rs index 343efbe..a36c003 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,251 +1,6 @@ -use std::io::{self, Read, Write}; - +//! Efficient ring buffer implementations for networking, audio, or queues. mod arrays; +pub mod buffer; - -/// Growable byte buffer implemented as a ring buffer. -/// -/// Optimized for repeated appending of bytes to the end and removing bytes from the front of the buffer. -#[derive(Clone, Debug)] -pub struct Buffer { - array: Box<[u8]>, - head: usize, - len: usize, -} - -impl Default for Buffer { - fn default() -> Buffer { - Buffer::new() - } -} - -impl Buffer { - pub const DEFAULT_CAPACITY: usize = 4096; - - /// Create a new buffer with the default capacity. - pub fn new() -> Self { - Self::with_capacity(Self::DEFAULT_CAPACITY) - } - - /// Create a new buffer with a given minimum capacity pre-allocated. - pub fn with_capacity(capacity: usize) -> Self { - Self { - array: unsafe { - arrays::allocate(capacity.next_power_of_two()) - }, - head: 0, - len: 0, - } - } - - /// Returns `true` if the buffer is empty. - #[inline] - pub fn is_empty(&self) -> bool { - self.len == 0 - } - - /// Returns the number of bytes in the buffer. - #[inline] - pub fn len(&self) -> usize { - self.len - } - - /// Returns the current capacity of the buffer in bytes. - #[inline] - pub fn capacity(&self) -> usize { - self.array.len() - } - - /// Calculate the internal offset of the given byte position. - #[inline] - fn offset(&self, index: usize) -> usize { - (index + self.head) & (self.capacity() - 1) - } - - /// Copy bytes from the front of the buffer into the given slice. - /// - /// Returns the number of bytes copied. If there are less bytes in the buffer than the length of `dest`, then only - /// part of `dest` will be written to. - pub fn copy_to(&self, dest: &mut [u8]) -> usize { - // Determine the number of bytes to copy. - let count = dest.len().min(self.len); - - // Nothing to do. - if count == 0 { - return 0; - } - - // Current buffer is wrapped; copy head segment and tail segment separately. - let tail = self.offset(count); - if tail <= self.head { - let head_len = self.capacity() - self.head; - arrays::copy(&self.array, self.head, dest, 0, head_len); - arrays::copy(&self.array, 0, dest, head_len, tail); - } - - // Buffer is contiguous; copy in one step. - else { - arrays::copy(&self.array, self.head, dest, 0, count); - } - - count - } - - /// Consume up to `count` bytes from the front of the buffer and discard them. - /// - /// Returns the number of bytes consumed, which may be less than `count` if `count` was greater than the number of - /// bytes in the buffer. - /// - /// This operation has a runtime cost of `O(1)`. - pub fn consume(&mut self, count: usize) -> usize { - let count = count.min(self.len); - - self.head = self.offset(count); - self.len -= count; - - count - } - - /// Copy the given bytes and insert them into the back of the buffer. - pub fn push(&mut self, src: &[u8]) { - let new_len = self.len + src.len(); - - // If the number of bytes to add would exceed the capacity, grow the internal array first. - if new_len > self.capacity() { - let new_capacity = new_len.next_power_of_two(); - let mut new_array = unsafe { - arrays::allocate(new_capacity) - }; - - self.copy_to(&mut new_array); - self.array = new_array; - self.head = 0; - } - - // Calculate how much of `src` should be copied to which regions. - let head_available = self.capacity().checked_sub(self.head + self.len).unwrap_or(0); - let copy_to_head = src.len().min(head_available); - let copy_to_tail = src.len() - copy_to_head; - - if copy_to_head > 0 { - let tail = self.offset(self.len); - arrays::copy(src, 0, &mut self.array, tail, copy_to_head); - } - - if copy_to_tail > 0 { - arrays::copy(src, copy_to_head, &mut self.array, 0, copy_to_tail); - } - - self.len = new_len; - } - - /// Pull bytes from the front of the buffer into the given location, up to the length of the destination buffer. - /// - /// Returns the number of bytes pulled. - pub fn pull(&mut self, dest: &mut [u8]) -> usize { - let count = self.copy_to(dest); - self.consume(count) - } - - /// Remove all bytes from the buffer. - pub fn clear(&mut self) { - self.head = 0; - self.len = 0; - } -} - -impl Read for Buffer { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - Ok(self.pull(buf)) - } -} - -impl Write for Buffer { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.push(buf); - Ok(buf.len()) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - - -#[cfg(test)] -mod tests { - use super::Buffer; - - #[test] - fn test_capacity() { - let buffer = Buffer::with_capacity(16); - assert!(buffer.capacity() == 16); - } - - #[test] - fn test_push() { - let mut buffer = Buffer::new(); - - assert!(buffer.is_empty()); - - let bytes = b"hello world"; - buffer.push(bytes); - - assert!(!buffer.is_empty()); - assert!(buffer.len() == bytes.len()); - } - - #[test] - fn test_push_and_consume() { - let mut buffer = Buffer::with_capacity(12); - - buffer.push(b"hello world"); - - assert!(buffer.consume(6) == 6); - assert!(buffer.len() == 5); - - buffer.push(b" hello"); - - assert!(buffer.len() == 11); - } - - #[test] - fn test_pull_more_than_buffer() { - let mut buffer = Buffer::new(); - let bytes = b"hello world"; - buffer.push(bytes); - - let mut dst = [0; 1024]; - assert!(buffer.pull(&mut dst) == bytes.len()); - assert!(&dst[0..bytes.len()] == bytes); - assert!(buffer.is_empty()); - } - - #[test] - fn test_pull_less_than_buffer() { - let mut buffer = Buffer::new(); - let bytes = b"hello world"; - buffer.push(bytes); - - let mut dst = [0; 4]; - assert!(buffer.pull(&mut dst) == dst.len()); - assert!(&dst == &bytes[0..4]); - assert!(!buffer.is_empty()); - assert!(buffer.len() == bytes.len() - dst.len()); - } - - #[test] - fn test_force_resize() { - let mut buffer = Buffer::with_capacity(8); - - buffer.push(b"hello"); - assert!(buffer.capacity() == 8); - - buffer.push(b" world"); - assert!(buffer.capacity() > 8); - - let mut out = [0; 11]; - buffer.copy_to(&mut out); - assert!(&out == b"hello world"); - } -} +/// A growable byte buffer implemented as a ring buffer. +pub type ByteBuffer = buffer::Buffer;