From 466c8a2959c0b0b3906a89d90f12e3e8a1fe5f80 Mon Sep 17 00:00:00 2001 From: "Stephen M. Coakley" Date: Fri, 11 May 2018 23:49:16 -0500 Subject: [PATCH] Refactor modules and add atomic bounded buffer Modules are now more organized with room for more queue and buffer types in the future. All buffers now implement a common set of traits. --- Cargo.toml | 2 +- src/arrays.rs | 144 +++++++++++++++++++++- src/buffer.rs | 249 --------------------------------------- src/buffers/atomic.rs | 213 +++++++++++++++++++++++++++++++++ src/buffers/mod.rs | 54 +++++++++ src/buffers/unbounded.rs | 193 ++++++++++++++++++++++++++++++ src/lib.rs | 17 ++- 7 files changed, 616 insertions(+), 256 deletions(-) delete mode 100644 src/buffer.rs create mode 100644 src/buffers/atomic.rs create mode 100644 src/buffers/mod.rs create mode 100644 src/buffers/unbounded.rs diff --git a/Cargo.toml b/Cargo.toml index cab6ee6..5c272ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ringtail" -version = "0.1.1" +version = "0.2.0" description = "Efficient ring buffer for byte buffers, FIFO queues, and SPSC channels" authors = ["Stephen M. Coakley "] license = "MIT" diff --git a/src/arrays.rs b/src/arrays.rs index a642dad..27b3d33 100644 --- a/src/arrays.rs +++ b/src/arrays.rs @@ -1,4 +1,5 @@ //! Provides functions for dynamic array manipulation. +use std::ops::{Index, IndexMut, Range}; /// Allocate an uninitialized array of a given size. /// @@ -9,7 +10,7 @@ pub unsafe fn allocate(len: usize) -> Box<[T]> { vec.into_boxed_slice() } -/// Copy as many elements as possible from one array to another. +/// Copy as many elements as possible from one slice to another. /// /// Returns the number of elements copied. #[inline] @@ -19,6 +20,23 @@ pub fn copy(src: &[T], dest: &mut [T]) -> usize { len } +/// Copy as many elements as possible from a slice of slices to another. +/// +/// Returns the number of elements copied. +pub fn copy_seq(seq: &[&[T]], dest: &mut [T]) -> usize { + let mut copied = 0; + + for slice in seq { + if copied < dest.len() { + copied += copy(slice, &mut dest[copied..]); + } else { + break; + } + } + + copied +} + /// Extension trait for slices for working with wrapping ranges and indicies. pub trait WrappingSlice { /// Gets a pair of slices in the given range, wrapping around length. @@ -47,3 +65,127 @@ impl WrappingSlice for [T] { } } } + +/// A heap-allocated circular array, useful for implementing ring buffers. +/// +/// This array type uses a _virtual indexing_ system. Indexing into the array applies a virtual mapping such that any +/// index is always mapped to a valid position in the array. More than one virtual index may map to the same position. +pub struct CircularArray { + array: Box<[T]>, + mask: usize, +} + +impl CircularArray { + /// Create a new array of a given length containing uninitialized data. + pub unsafe fn uninitialized(len: usize) -> Self { + let len = len.next_power_of_two(); + + Self { + array: allocate(len), + mask: len - 1, + } + } + + /// Get the length of the array. + #[inline] + pub fn len(&self) -> usize { + self.array.len() + } + + /// Gets a pair of slices in the given range, wrapping around length. + pub fn as_slices(&self, range: Range) -> [&[T]; 2] { + let start = self.internal_index(range.start); + let end = self.internal_index(range.end); + + if start < end { + [&self.array[start..end], &[]] + } else { + [&self.array[start..], &self.array[..end]] + } + } + + /// Gets a pair of mutable slices in the given range, wrapping around length. + pub fn as_slices_mut(&mut self, range: Range) -> [&mut [T]; 2] { + let start = self.internal_index(range.start); + let end = self.internal_index(range.end); + + if start < end { + [&mut self.array[start..end], &mut []] + } else { + let (mid, right) = self.array.split_at_mut(start); + let left = mid.split_at_mut(end).0; + [right, left] + } + } + + // /// Copies elements from this array into + // pub fn copy_to_slice(&self, dest: &mut [T]) -> usize { + // if self.is_empty() { + // return 0; + // } + + // let slices = self.array + // .wrapping_range(self.mask(self.head), self.mask(self.tail)); + + // let mut copied = arrays::copy(slices.0, dest); + // copied += arrays::copy(slices.1, &mut dest[copied..]); + + // copied + // } + + /// Get the internal index the given virtual index is mapped to. + #[inline] + fn internal_index(&self, virtual_index: usize) -> usize { + virtual_index & self.mask + } +} + +impl AsRef<[T]> for CircularArray { + fn as_ref(&self) -> &[T] { + &self.array + } +} + +impl AsMut<[T]> for CircularArray { + fn as_mut(&mut self) -> &mut [T] { + &mut self.array + } +} + +impl Index for CircularArray { + type Output = T; + + fn index(&self, index: usize) -> &T { + self.array.index(self.internal_index(index)) + } +} + +impl IndexMut for CircularArray { + fn index_mut(&mut self, index: usize) -> &mut T { + let index = self.internal_index(index); + self.array.index_mut(index) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn copy_seq_with_less_elements() { + let chunks: [&[i32]; 3] = [&[], &[1, 2], &[3]]; + let mut dest = [0; 6]; + + assert_eq!(copy_seq(&chunks, &mut dest), 3); + assert_eq!(&dest, &[1, 2, 3, 0, 0, 0]); + } + + #[test] + fn copy_seq_with_more_elements() { + let chunks: [&[i32]; 5] = [&[], &[1, 2], &[], &[3], &[4, 5, 6]]; + let mut dest = [0; 4]; + + assert_eq!(copy_seq(&chunks, &mut dest), 4); + assert_eq!(&dest, &[1, 2, 3, 4]); + } +} diff --git a/src/buffer.rs b/src/buffer.rs deleted file mode 100644 index ed70897..0000000 --- a/src/buffer.rs +++ /dev/null @@ -1,249 +0,0 @@ -use arrays::{self, WrappingSlice}; -use std::io::{self, Read, Write}; - -/// Growable ring buffer. -/// -/// Optimized for repeated appending of bytes to the end and removing bytes from the front of the buffer. -#[derive(Clone)] -pub struct Buffer { - /// Backing array where elements are stored. Size is always a power of two. - array: Box<[T]>, - - /// The "head" index into the backing array that marks the start of the buffer elements. - /// - /// This index may exceed the length of the backing array during the lifetime of the buffer, and is only ever - /// incremented. - head: usize, - - /// The "tail" index into the backing array that marks the end of the buffer elements. - /// - /// Same as `head`, this is incremented unbounded. - tail: usize, -} - -impl Default for Buffer { - fn default() -> Buffer { - Buffer::new() - } -} - -impl Buffer { - pub const DEFAULT_CAPACITY: usize = 4096; - - /// Create a new buffer with the default capacity. - pub fn new() -> Self { - Self::with_capacity(Self::DEFAULT_CAPACITY) - } - - /// Create a new buffer with a given minimum capacity pre-allocated. - pub fn with_capacity(capacity: usize) -> Self { - let capacity = capacity.next_power_of_two(); - Self { - array: unsafe { arrays::allocate(capacity) }, - head: 0, - tail: 0, - } - } - - /// Returns `true` if the buffer is empty. - #[inline] - pub fn is_empty(&self) -> bool { - // The head and tail can only be equal to each other if: (a) the number of inserted elements over time is equal - // to the number of removed elements over time, and is thus empty; or (b) exactly `usize::max_value()` elements - // were inserted without being removed such that `tail` overflowed and wrapped around to equal `head`. This is - // improbable since the buffer would have to be at least the size of max pointer value. If the OS does let you - // allocate more memory than fits in a pointer, you have bigger problems. - self.head == self.tail - } - - /// Returns the number of elements in the buffer. - #[inline] - pub fn len(&self) -> usize { - // Even if `tail` overflows and becomes less than `head`, subtracting will underflow and result in the correct - // length. - self.tail.wrapping_sub(self.head) - } - - /// Returns the current capacity of the buffer. - #[inline] - pub fn capacity(&self) -> usize { - self.array.len() - } - - /// Copy the given elements and insert them into the back of the buffer. - /// - /// Returns the number of elements pushed. - pub fn push(&mut self, src: &[T]) -> usize { - // If the number of bytes to add would exceed the capacity, grow the internal array first. - let new_len = self.len() + src.len(); - if new_len > self.capacity() { - self.resize(new_len); - } - - let head_index = self.mask(self.head); - let tail_index = self.mask(self.tail); - - let slices = self.array.wrapping_range_mut(tail_index, head_index); - - let mut pushed = arrays::copy(src, slices.0); - pushed += arrays::copy(&src[pushed..], slices.1); - - self.tail = self.tail.wrapping_add(pushed); - pushed - } - - /// Pull bytes from the front of the buffer into the given location, up to the length of the destination buffer. - /// - /// Returns the number of elements pulled. - pub fn pull(&mut self, dest: &mut [T]) -> usize { - let count = self.copy_to(dest); - self.consume(count) - } - - /// Copy elements from the front of the buffer into the given slice. - /// - /// Returns the number of elements copied. If there are less elements in the buffer than the length of `dest`, then - /// only part of `dest` will be written to. - pub fn copy_to(&self, dest: &mut [T]) -> usize { - if self.is_empty() { - return 0; - } - - let slices = self.array - .wrapping_range(self.mask(self.head), self.mask(self.tail)); - - let mut copied = arrays::copy(slices.0, dest); - copied += arrays::copy(slices.1, &mut dest[copied..]); - - copied - } - - /// Consume up to `count` elements from the front of the buffer and discards them. - /// - /// Returns the number of elements consumed, which may be less than `count` if `count` was greater than the number - /// of elements in the buffer. - /// - /// This operation has a runtime cost of `O(1)`. - pub fn consume(&mut self, count: usize) -> usize { - // We can only consume as many elements as are in the buffer. - let count = count.min(self.len()); - self.head = self.head.wrapping_add(count); - count - } - - /// Remove all elements from the buffer. - pub fn clear(&mut self) { - self.head = 0; - self.tail = 0; - } - - fn resize(&mut self, size: usize) { - // Size must always be a power of 2. - let size = size.next_power_of_two(); - - let mut array = unsafe { arrays::allocate(size) }; - - self.tail = self.copy_to(&mut array); - self.head = 0; - self.array = array; - } - - #[inline] - fn mask(&self, index: usize) -> usize { - index & (self.capacity() - 1) - } -} - -impl Read for Buffer { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - Ok(self.pull(buf)) - } -} - -impl Write for Buffer { - fn write(&mut self, buf: &[u8]) -> io::Result { - Ok(self.push(buf)) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::Buffer; - - #[test] - fn test_capacity() { - let buffer = Buffer::::with_capacity(16); - assert!(buffer.capacity() == 16); - } - - #[test] - fn test_push() { - let mut buffer = Buffer::new(); - - assert!(buffer.is_empty()); - - let bytes = b"hello world"; - buffer.push(bytes); - - assert!(!buffer.is_empty()); - assert!(buffer.len() == bytes.len()); - } - - #[test] - fn test_push_and_consume() { - let mut buffer = Buffer::with_capacity(12); - - buffer.push(b"hello world"); - - assert!(buffer.consume(6) == 6); - assert!(buffer.len() == 5); - - buffer.push(b" hello"); - - assert!(buffer.len() == 11); - } - - #[test] - fn test_pull_more_than_buffer() { - let mut buffer = Buffer::new(); - let bytes = b"hello world"; - buffer.push(bytes); - - let mut dst = [0; 1024]; - assert!(buffer.pull(&mut dst) == bytes.len()); - assert!(&dst[0..bytes.len()] == bytes); - assert!(buffer.is_empty()); - } - - #[test] - fn test_pull_less_than_buffer() { - let mut buffer = Buffer::new(); - let bytes = b"hello world"; - buffer.push(bytes); - - let mut dst = [0; 4]; - assert!(buffer.pull(&mut dst) == dst.len()); - assert!(&dst == &bytes[0..4]); - assert!(!buffer.is_empty()); - assert!(buffer.len() == bytes.len() - dst.len()); - } - - #[test] - fn test_force_resize() { - let mut buffer = Buffer::with_capacity(8); - - buffer.push(b"hello"); - assert!(buffer.capacity() == 8); - - buffer.push(b" world"); - assert!(buffer.capacity() > 8); - - let mut out = [0; 11]; - buffer.copy_to(&mut out); - assert!(&out == b"hello world"); - } -} diff --git a/src/buffers/atomic.rs b/src/buffers/atomic.rs new file mode 100644 index 0000000..d9dc05e --- /dev/null +++ b/src/buffers/atomic.rs @@ -0,0 +1,213 @@ +//! Atomic buffers useful for producer-consumer problems. +use arrays; +use arrays::CircularArray; +use buffers::{Buffer, ReadableBuffer, WritableBuffer}; +use std::sync::Arc; +use std::sync::atomic::*; + +/// Create a new atomic buffer with a given fixed capacity. +pub fn bounded(capacity: usize) -> (Reader, Writer) { + let inner = Arc::new(Inner::new(capacity)); + + ( + Reader { + inner: inner.clone(), + }, + Writer { + inner: inner, + }, + ) +} + +/// Reading half of an atomic buffer. +pub struct Reader { + inner: Arc>, +} + +impl Buffer for Reader { + #[inline] + fn len(&self) -> usize { + self.inner.len() + } + + #[inline] + fn capacity(&self) -> usize { + self.inner.capacity() + } + + fn clear(&mut self) { + let tail = self.inner.tail.load(Ordering::SeqCst); + self.inner.head.store(tail, Ordering::SeqCst); + } +} + +impl ReadableBuffer for Reader { + fn copy_to(&self, dest: &mut [T]) -> usize { + let head = self.inner.head.load(Ordering::SeqCst); + let tail = self.inner.tail.load(Ordering::SeqCst); + + let slices = self.inner.array.as_slices(head..tail); + arrays::copy_seq(&slices, dest) + } + + fn consume(&mut self, count: usize) -> usize { + // We can only consume as many elements as are in the buffer. + let count = count.min(self.len()); + self.inner.head.fetch_add(count, Ordering::SeqCst); + + count + } +} + +/// Writing half of an atomic buffer. +pub struct Writer { + inner: Arc>, +} + +impl Buffer for Writer { + #[inline] + fn len(&self) -> usize { + self.inner.len() + } + + #[inline] + fn capacity(&self) -> usize { + self.inner.capacity() + } + + fn clear(&mut self) { + let tail = self.inner.tail.load(Ordering::SeqCst); + self.inner.head.store(tail, Ordering::SeqCst); + } +} + +impl WritableBuffer for Writer { + fn push(&mut self, src: &[T]) -> usize { + let head = self.inner.head.load(Ordering::SeqCst); + let tail = self.inner.tail.load(Ordering::SeqCst); + + unsafe { + let array_mut = &self.inner.array as *const _ as *mut CircularArray; + let slices = (&mut *array_mut).as_slices_mut(tail..head); + + let mut pushed = arrays::copy(src, slices[0]); + pushed += arrays::copy(&src[pushed..], slices[1]); + + self.inner.tail.fetch_add(pushed, Ordering::SeqCst); + pushed + } + } +} + +/// Contains the shared data between the reader and writer. +struct Inner { + array: CircularArray, + head: AtomicUsize, + tail: AtomicUsize, +} + +impl Inner { + fn new(capacity: usize) -> Self { + Self { + array: unsafe { + CircularArray::uninitialized(capacity) + }, + head: AtomicUsize::new(0), + tail: AtomicUsize::new(0), + } + } +} + +impl Buffer for Inner { + fn len(&self) -> usize { + let head = self.head.load(Ordering::SeqCst); + let tail = self.tail.load(Ordering::SeqCst); + + // Even if `tail` overflows and becomes less than `head`, subtracting will underflow and result in the + // correct length. + tail.wrapping_sub(head) + } + + #[inline] + fn capacity(&self) -> usize { + self.array.len() + } + + fn clear(&mut self) { + let tail = self.tail.load(Ordering::SeqCst); + self.head.store(tail, Ordering::SeqCst); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_capacity() { + let buffer = bounded::(16); + + assert_eq!(buffer.0.capacity(), 16); + assert_eq!(buffer.1.capacity(), 16); + } + + #[test] + fn test_push() { + let mut buffer = bounded::(16); + + assert!(buffer.0.is_empty()); + assert!(buffer.1.is_empty()); + + let bytes = b"hello world"; + assert_eq!(buffer.1.push(bytes), bytes.len()); + + assert!(!buffer.0.is_empty()); + assert!(!buffer.1.is_empty()); + + assert_eq!(buffer.0.len(), bytes.len()); + assert_eq!(buffer.1.len(), bytes.len()); + } + + #[test] + fn test_push_and_consume() { + let mut buffer = bounded::(12); + + assert_eq!(buffer.1.push(b"hello world"), 11); + + assert_eq!(buffer.0.consume(6), 6); + assert_eq!(buffer.0.len(), 5); + + assert_eq!(buffer.1.push(b" hello"), 6); + + assert_eq!(buffer.0.len(), 11); + + let mut dest = [0; 11]; + assert_eq!(buffer.0.copy_to(&mut dest), 11); + assert_eq!(&dest, b"world hello"); + } + + #[test] + fn test_pull_more_than_buffer() { + let mut buffer = bounded(32); + let bytes = b"hello world"; + buffer.1.push(bytes); + + let mut dst = [0; 1024]; + assert_eq!(buffer.0.pull(&mut dst), bytes.len()); + assert_eq!(&dst[0..bytes.len()], bytes); + assert!(buffer.0.is_empty()); + } + + #[test] + fn test_pull_less_than_buffer() { + let mut buffer = bounded(32); + let bytes = b"hello world"; + buffer.1.push(bytes); + + let mut dst = [0; 4]; + assert_eq!(buffer.0.pull(&mut dst), dst.len()); + assert_eq!(&dst, &bytes[0..4]); + assert!(!buffer.0.is_empty()); + assert_eq!(buffer.0.len(), bytes.len() - dst.len()); + } +} diff --git a/src/buffers/mod.rs b/src/buffers/mod.rs new file mode 100644 index 0000000..f8f07ac --- /dev/null +++ b/src/buffers/mod.rs @@ -0,0 +1,54 @@ +//! Ring buffers for inserting and removing primitive types in bulk. +//! +//! Buffers are designed for reading and writing bytes in memory, and are useful as networking buffers, audio streams, +//! or as in-memory byte pipes between threads. +pub mod atomic; +pub mod unbounded; + +/// Base trait that all buffers implement. +pub trait Buffer { + /// Returns `true` if the buffer is empty. + #[inline] + fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Returns the number of elements in the buffer. + fn len(&self) -> usize; + + /// Returns the current capacity of the buffer. + fn capacity(&self) -> usize; + + /// Clears all elements from the buffer and resets the length to zero. + fn clear(&mut self); +} + +/// A buffer that can be read from. +pub trait ReadableBuffer: Buffer { + /// Pull elements from the front of the buffer into the given location, up to the length of the destination buffer. + /// + /// Returns the number of elements pulled. + fn pull(&mut self, dest: &mut [T]) -> usize where T: Copy { + let count = self.copy_to(dest); + self.consume(count) + } + + /// Copy elements from the front of the buffer into the given slice. + /// + /// Returns the number of elements copied. If there are less elements in the buffer than the length of `dest`, then + /// only part of `dest` will be written to. + fn copy_to(&self, dest: &mut [T]) -> usize where T: Copy; + + /// Consume up to `count` elements from the front of the buffer and discards them. + /// + /// Returns the number of elements consumed, which may be less than `count` if `count` was greater than the number + /// of elements in the buffer. + fn consume(&mut self, count: usize) -> usize; +} + +pub trait WritableBuffer: Buffer { + /// Copy the given elements and insert them into the back of the buffer. + /// + /// Returns the number of elements pushed. + fn push(&mut self, src: &[T]) -> usize; +} diff --git a/src/buffers/unbounded.rs b/src/buffers/unbounded.rs new file mode 100644 index 0000000..f6cfb4e --- /dev/null +++ b/src/buffers/unbounded.rs @@ -0,0 +1,193 @@ +use arrays::{self, CircularArray}; +use buffers::{Buffer, ReadableBuffer, WritableBuffer}; + +/// Growable ring buffer. +/// +/// Optimized for repeated appending of bytes to the end and removing bytes from the front of the buffer. +pub struct UnboundedBuffer { + /// Backing array where elements are stored. Size is always a power of two. + array: CircularArray, + + /// The "head" index into the backing array that marks the start of the buffer elements. + /// + /// This index may exceed the length of the backing array during the lifetime of the buffer, and is only ever + /// incremented. + head: usize, + + /// The "tail" index into the backing array that marks the end of the buffer elements. + /// + /// Same as `head`, this is incremented unbounded. + tail: usize, +} + +impl Default for UnboundedBuffer { + fn default() -> UnboundedBuffer { + UnboundedBuffer::new() + } +} + +impl UnboundedBuffer { + pub const DEFAULT_CAPACITY: usize = 4096; + + /// Create a new unbounded buffer with the default capacity. + pub fn new() -> Self { + Self::with_capacity(Self::DEFAULT_CAPACITY) + } + + /// Create a new unbounded buffer with a given minimum capacity pre-allocated. + pub fn with_capacity(capacity: usize) -> Self { + let capacity = capacity.next_power_of_two(); + Self { + array: unsafe { CircularArray::uninitialized(capacity) }, + head: 0, + tail: 0, + } + } + + fn resize(&mut self, size: usize) { + let mut array = unsafe { CircularArray::uninitialized(size) }; + + self.tail = self.copy_to(array.as_mut()); + self.head = 0; + self.array = array; + } +} + +impl Buffer for UnboundedBuffer { + #[inline] + fn is_empty(&self) -> bool { + // The head and tail can only be equal to each other if: (a) the number of inserted elements over time is equal + // to the number of removed elements over time, and is thus empty; or (b) exactly `usize::max_value()` elements + // were inserted without being removed such that `tail` overflowed and wrapped around to equal `head`. This is + // improbable since the buffer would have to be at least the size of max pointer value. If the OS does let you + // allocate more memory than fits in a pointer, you have bigger problems. + self.head == self.tail + } + + #[inline] + fn len(&self) -> usize { + // Even if `tail` overflows and becomes less than `head`, subtracting will underflow and result in the correct + // length. + self.tail.wrapping_sub(self.head) + } + + #[inline] + fn capacity(&self) -> usize { + self.array.len() + } + + fn clear(&mut self) { + self.head = 0; + self.tail = 0; + } +} + +impl ReadableBuffer for UnboundedBuffer { + fn copy_to(&self, dest: &mut [T]) -> usize { + let slices = self.array.as_slices(self.head..self.tail); + arrays::copy_seq(&slices, dest) + } + + fn consume(&mut self, count: usize) -> usize { + // We can only consume as many elements as are in the buffer. + let count = count.min(self.len()); + self.head = self.head.wrapping_add(count); + count + } +} + +impl WritableBuffer for UnboundedBuffer { + fn push(&mut self, src: &[T]) -> usize { + // If the number of bytes to add would exceed the capacity, grow the internal array first. + let new_len = self.len() + src.len(); + if new_len > self.capacity() { + self.resize(new_len); + } + + let slices = self.array.as_slices_mut(self.tail..self.head); + + let mut pushed = arrays::copy(src, slices[0]); + pushed += arrays::copy(&src[pushed..], slices[1]); + + self.tail = self.tail.wrapping_add(pushed); + pushed + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_capacity() { + let buffer = UnboundedBuffer::::with_capacity(16); + assert_eq!(buffer.capacity(), 16); + } + + #[test] + fn test_push() { + let mut buffer = UnboundedBuffer::new(); + + assert!(buffer.is_empty()); + + let bytes = b"hello world"; + buffer.push(bytes); + + assert!(!buffer.is_empty()); + assert_eq!(buffer.len(), bytes.len()); + } + + #[test] + fn test_push_and_consume() { + let mut buffer = UnboundedBuffer::with_capacity(12); + + buffer.push(b"hello world"); + + assert_eq!(buffer.consume(6), 6); + assert_eq!(buffer.len(), 5); + + buffer.push(b" hello"); + + assert_eq!(buffer.len(), 11); + } + + #[test] + fn test_pull_more_than_buffer() { + let mut buffer = UnboundedBuffer::new(); + let bytes = b"hello world"; + buffer.push(bytes); + + let mut dst = [0; 1024]; + assert_eq!(buffer.pull(&mut dst), bytes.len()); + assert_eq!(&dst[0..bytes.len()], bytes); + assert!(buffer.is_empty()); + } + + #[test] + fn test_pull_less_than_buffer() { + let mut buffer = UnboundedBuffer::new(); + let bytes = b"hello world"; + buffer.push(bytes); + + let mut dst = [0; 4]; + assert_eq!(buffer.pull(&mut dst), dst.len()); + assert_eq!(&dst, &bytes[0..4]); + assert!(!buffer.is_empty()); + assert_eq!(buffer.len(), bytes.len() - dst.len()); + } + + #[test] + fn test_force_resize() { + let mut buffer = UnboundedBuffer::with_capacity(8); + + buffer.push(b"hello"); + assert_eq!(buffer.capacity(), 8); + + buffer.push(b" world"); + assert!(buffer.capacity() > 8); + + let mut out = [0; 11]; + buffer.copy_to(&mut out); + assert_eq!(&out, b"hello world"); + } +} diff --git a/src/lib.rs b/src/lib.rs index a36c003..cb80465 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,13 @@ -//! Efficient ring buffer implementations for networking, audio, or queues. +//! Ringtail is a collection of buffers and queues, useful for networking, thread communication, and real-time +//! programming. +//! +//! Provided data structures are designed for efficiency first and foremost, so some common operations you might expect +//! of queues may be missing in order to allow certain optimizations. These are not general-purpose structures; several +//! versions of one structure may be provided with different trade-offs. +//! +//! ## Buffers +//! +//! In Ringtail, a _buffer_ is a queue optimized for reading and writing multiple elements in bulk, like an in-memory +//! version of an I/O stream. mod arrays; -pub mod buffer; - -/// A growable byte buffer implemented as a ring buffer. -pub type ByteBuffer = buffer::Buffer; +pub mod buffers;