From a8824c79a9adb3e4c611eb7148989059ca0c50e9 Mon Sep 17 00:00:00 2001 From: "Stephen M. Coakley" Date: Fri, 18 May 2018 19:34:03 -0500 Subject: [PATCH] Fix issue #1 --- src/arrays.rs | 93 +++++++++++++++++++++------------------- src/buffers/atomic.rs | 45 +++++++++++++++---- src/buffers/unbounded.rs | 8 ++-- 3 files changed, 87 insertions(+), 59 deletions(-) diff --git a/src/arrays.rs b/src/arrays.rs index 27b3d33..91775cc 100644 --- a/src/arrays.rs +++ b/src/arrays.rs @@ -20,15 +20,15 @@ pub fn copy(src: &[T], dest: &mut [T]) -> usize { len } -/// Copy as many elements as possible from a slice of slices to another. +/// Copy as many elements as possible from a slice of slices to a slice. /// /// Returns the number of elements copied. -pub fn copy_seq(seq: &[&[T]], dest: &mut [T]) -> usize { +pub fn copy_from_seq(src_seq: &[&[T]], dest: &mut [T]) -> usize { let mut copied = 0; - for slice in seq { + for src in src_seq { if copied < dest.len() { - copied += copy(slice, &mut dest[copied..]); + copied += copy(src, &mut dest[copied..]); } else { break; } @@ -37,33 +37,21 @@ pub fn copy_seq(seq: &[&[T]], dest: &mut [T]) -> usize { copied } -/// Extension trait for slices for working with wrapping ranges and indicies. -pub trait WrappingSlice { - /// Gets a pair of slices in the given range, wrapping around length. - fn wrapping_range(&self, from: usize, to: usize) -> (&[T], &[T]); - - /// Gets a pair of mutable slices in the given range, wrapping around length. - fn wrapping_range_mut(&mut self, from: usize, to: usize) -> (&mut [T], &mut [T]); -} +/// Copy as many elements as possible from a slice to a slice of slices. +/// +/// Returns the number of elements copied. +pub fn copy_to_seq(src: &[T], dest_seq: &mut [&mut [T]]) -> usize { + let mut copied = 0; -impl WrappingSlice for [T] { - fn wrapping_range(&self, from: usize, to: usize) -> (&[T], &[T]) { - if from < to { - (&self[from..to], &[]) + for dest in dest_seq { + if copied < src.len() { + copied += copy(&src[copied..], *dest); } else { - (&self[from..], &self[..to]) + break; } } - fn wrapping_range_mut(&mut self, from: usize, to: usize) -> (&mut [T], &mut [T]) { - if from < to { - (&mut self[from..to], &mut []) - } else { - let (mid, right) = self.split_at_mut(from); - let left = mid.split_at_mut(to).0; - (right, left) - } - } + copied } /// A heap-allocated circular array, useful for implementing ring buffers. @@ -118,21 +106,6 @@ impl CircularArray { } } - // /// Copies elements from this array into - // pub fn copy_to_slice(&self, dest: &mut [T]) -> usize { - // if self.is_empty() { - // return 0; - // } - - // let slices = self.array - // .wrapping_range(self.mask(self.head), self.mask(self.tail)); - - // let mut copied = arrays::copy(slices.0, dest); - // copied += arrays::copy(slices.1, &mut dest[copied..]); - - // copied - // } - /// Get the internal index the given virtual index is mapped to. #[inline] fn internal_index(&self, virtual_index: usize) -> usize { @@ -172,20 +145,50 @@ mod tests { use super::*; #[test] - fn copy_seq_with_less_elements() { + fn copy_from_seq_with_less_elements() { let chunks: [&[i32]; 3] = [&[], &[1, 2], &[3]]; let mut dest = [0; 6]; - assert_eq!(copy_seq(&chunks, &mut dest), 3); + assert_eq!(copy_from_seq(&chunks, &mut dest), 3); assert_eq!(&dest, &[1, 2, 3, 0, 0, 0]); } #[test] - fn copy_seq_with_more_elements() { + fn copy_from_seq_with_more_elements() { let chunks: [&[i32]; 5] = [&[], &[1, 2], &[], &[3], &[4, 5, 6]]; let mut dest = [0; 4]; - assert_eq!(copy_seq(&chunks, &mut dest), 4); + assert_eq!(copy_from_seq(&chunks, &mut dest), 4); assert_eq!(&dest, &[1, 2, 3, 4]); } + + #[test] + fn copy_to_seq_with_less_elements() { + let src = [1, 2, 3]; + let mut dest_1 = [0; 1]; + let mut dest_2 = [0; 4]; + + { + let mut dest: [&mut [u8]; 2] = [&mut dest_1, &mut dest_2]; + assert_eq!(copy_to_seq(&src, &mut dest), 3); + } + + assert_eq!(&dest_1, &[1]); + assert_eq!(&dest_2, &[2, 3, 0, 0]); + } + + #[test] + fn copy_to_seq_with_more_elements() { + let src = [1, 2, 3, 4]; + let mut dest_1 = [0; 1]; + let mut dest_2 = [0; 2]; + + { + let mut dest: [&mut [u8]; 2] = [&mut dest_1, &mut dest_2]; + assert_eq!(copy_to_seq(&src, &mut dest), 3); + } + + assert_eq!(&dest_1, &[1]); + assert_eq!(&dest_2, &[2, 3]); + } } diff --git a/src/buffers/atomic.rs b/src/buffers/atomic.rs index d9dc05e..4275552 100644 --- a/src/buffers/atomic.rs +++ b/src/buffers/atomic.rs @@ -2,6 +2,7 @@ use arrays; use arrays::CircularArray; use buffers::{Buffer, ReadableBuffer, WritableBuffer}; +use std::cell::UnsafeCell; use std::sync::Arc; use std::sync::atomic::*; @@ -46,8 +47,16 @@ impl ReadableBuffer for Reader { let head = self.inner.head.load(Ordering::SeqCst); let tail = self.inner.tail.load(Ordering::SeqCst); - let slices = self.inner.array.as_slices(head..tail); - arrays::copy_seq(&slices, dest) + if head == tail { + return 0; + } + + unsafe { + let array = &*self.inner.array.get(); + + let slices = array.as_slices(head..tail); + arrays::copy_from_seq(&slices, dest) + } } fn consume(&mut self, count: usize) -> usize { @@ -86,12 +95,15 @@ impl WritableBuffer for Writer { let head = self.inner.head.load(Ordering::SeqCst); let tail = self.inner.tail.load(Ordering::SeqCst); + if tail.wrapping_sub(head) == self.capacity() { + return 0; + } + unsafe { - let array_mut = &self.inner.array as *const _ as *mut CircularArray; - let slices = (&mut *array_mut).as_slices_mut(tail..head); + let array = &mut *self.inner.array.get(); - let mut pushed = arrays::copy(src, slices[0]); - pushed += arrays::copy(&src[pushed..], slices[1]); + let mut slices = array.as_slices_mut(tail..head); + let pushed = arrays::copy_to_seq(src, &mut slices); self.inner.tail.fetch_add(pushed, Ordering::SeqCst); pushed @@ -101,7 +113,7 @@ impl WritableBuffer for Writer { /// Contains the shared data between the reader and writer. struct Inner { - array: CircularArray, + array: UnsafeCell>, head: AtomicUsize, tail: AtomicUsize, } @@ -110,7 +122,7 @@ impl Inner { fn new(capacity: usize) -> Self { Self { array: unsafe { - CircularArray::uninitialized(capacity) + UnsafeCell::new(CircularArray::uninitialized(capacity)) }, head: AtomicUsize::new(0), tail: AtomicUsize::new(0), @@ -130,7 +142,9 @@ impl Buffer for Inner { #[inline] fn capacity(&self) -> usize { - self.array.len() + unsafe { + (&*self.array.get()).len() + } } fn clear(&mut self) { @@ -168,6 +182,19 @@ mod tests { assert_eq!(buffer.1.len(), bytes.len()); } + #[test] + fn test_push_more_than_buffer() { + let mut buffer = bounded::(2); + assert_eq!(buffer.0.capacity(), 2); + + assert_eq!(buffer.1.push(&[100]), 1); + assert_eq!(buffer.1.push(&[200]), 1); + assert_eq!(buffer.1.push(&[300]), 0); + assert_eq!(buffer.1.push(&[400]), 0); + + assert_eq!(buffer.0.len(), 2); + } + #[test] fn test_push_and_consume() { let mut buffer = bounded::(12); diff --git a/src/buffers/unbounded.rs b/src/buffers/unbounded.rs index f6cfb4e..5d4503c 100644 --- a/src/buffers/unbounded.rs +++ b/src/buffers/unbounded.rs @@ -85,7 +85,7 @@ impl Buffer for UnboundedBuffer { impl ReadableBuffer for UnboundedBuffer { fn copy_to(&self, dest: &mut [T]) -> usize { let slices = self.array.as_slices(self.head..self.tail); - arrays::copy_seq(&slices, dest) + arrays::copy_from_seq(&slices, dest) } fn consume(&mut self, count: usize) -> usize { @@ -104,10 +104,8 @@ impl WritableBuffer for UnboundedBuffer { self.resize(new_len); } - let slices = self.array.as_slices_mut(self.tail..self.head); - - let mut pushed = arrays::copy(src, slices[0]); - pushed += arrays::copy(&src[pushed..], slices[1]); + let mut slices = self.array.as_slices_mut(self.tail..self.head); + let pushed = arrays::copy_to_seq(src, &mut slices); self.tail = self.tail.wrapping_add(pushed); pushed