Skip to content

Commit

Permalink
Fix issue #1
Browse files Browse the repository at this point in the history
  • Loading branch information
sagebind committed May 19, 2018
1 parent faee6ea commit a8824c7
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 59 deletions.
93 changes: 48 additions & 45 deletions src/arrays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ pub fn copy<T: Copy>(src: &[T], dest: &mut [T]) -> usize {
len
}

/// Copy as many elements as possible from a slice of slices to another.
/// Copy as many elements as possible from a slice of slices to a slice.
///
/// Returns the number of elements copied.
pub fn copy_seq<T: Copy>(seq: &[&[T]], dest: &mut [T]) -> usize {
pub fn copy_from_seq<T: Copy>(src_seq: &[&[T]], dest: &mut [T]) -> usize {
let mut copied = 0;

for slice in seq {
for src in src_seq {
if copied < dest.len() {
copied += copy(slice, &mut dest[copied..]);
copied += copy(src, &mut dest[copied..]);
} else {
break;
}
Expand All @@ -37,33 +37,21 @@ pub fn copy_seq<T: Copy>(seq: &[&[T]], dest: &mut [T]) -> usize {
copied
}

/// Extension trait for slices for working with wrapping ranges and indicies.
pub trait WrappingSlice<T> {
/// Gets a pair of slices in the given range, wrapping around length.
fn wrapping_range(&self, from: usize, to: usize) -> (&[T], &[T]);

/// Gets a pair of mutable slices in the given range, wrapping around length.
fn wrapping_range_mut(&mut self, from: usize, to: usize) -> (&mut [T], &mut [T]);
}
/// Copy as many elements as possible from a slice to a slice of slices.
///
/// Returns the number of elements copied.
pub fn copy_to_seq<T: Copy>(src: &[T], dest_seq: &mut [&mut [T]]) -> usize {
let mut copied = 0;

impl<T> WrappingSlice<T> for [T] {
fn wrapping_range(&self, from: usize, to: usize) -> (&[T], &[T]) {
if from < to {
(&self[from..to], &[])
for dest in dest_seq {
if copied < src.len() {
copied += copy(&src[copied..], *dest);
} else {
(&self[from..], &self[..to])
break;
}
}

fn wrapping_range_mut(&mut self, from: usize, to: usize) -> (&mut [T], &mut [T]) {
if from < to {
(&mut self[from..to], &mut [])
} else {
let (mid, right) = self.split_at_mut(from);
let left = mid.split_at_mut(to).0;
(right, left)
}
}
copied
}

/// A heap-allocated circular array, useful for implementing ring buffers.
Expand Down Expand Up @@ -118,21 +106,6 @@ impl<T> CircularArray<T> {
}
}

// /// Copies elements from this array into
// pub fn copy_to_slice(&self, dest: &mut [T]) -> usize {
// if self.is_empty() {
// return 0;
// }

// let slices = self.array
// .wrapping_range(self.mask(self.head), self.mask(self.tail));

// let mut copied = arrays::copy(slices.0, dest);
// copied += arrays::copy(slices.1, &mut dest[copied..]);

// copied
// }

/// Get the internal index the given virtual index is mapped to.
#[inline]
fn internal_index(&self, virtual_index: usize) -> usize {
Expand Down Expand Up @@ -172,20 +145,50 @@ mod tests {
use super::*;

#[test]
fn copy_seq_with_less_elements() {
fn copy_from_seq_with_less_elements() {
let chunks: [&[i32]; 3] = [&[], &[1, 2], &[3]];
let mut dest = [0; 6];

assert_eq!(copy_seq(&chunks, &mut dest), 3);
assert_eq!(copy_from_seq(&chunks, &mut dest), 3);
assert_eq!(&dest, &[1, 2, 3, 0, 0, 0]);
}

#[test]
fn copy_seq_with_more_elements() {
fn copy_from_seq_with_more_elements() {
let chunks: [&[i32]; 5] = [&[], &[1, 2], &[], &[3], &[4, 5, 6]];
let mut dest = [0; 4];

assert_eq!(copy_seq(&chunks, &mut dest), 4);
assert_eq!(copy_from_seq(&chunks, &mut dest), 4);
assert_eq!(&dest, &[1, 2, 3, 4]);
}

#[test]
fn copy_to_seq_with_less_elements() {
let src = [1, 2, 3];
let mut dest_1 = [0; 1];
let mut dest_2 = [0; 4];

{
let mut dest: [&mut [u8]; 2] = [&mut dest_1, &mut dest_2];
assert_eq!(copy_to_seq(&src, &mut dest), 3);
}

assert_eq!(&dest_1, &[1]);
assert_eq!(&dest_2, &[2, 3, 0, 0]);
}

#[test]
fn copy_to_seq_with_more_elements() {
let src = [1, 2, 3, 4];
let mut dest_1 = [0; 1];
let mut dest_2 = [0; 2];

{
let mut dest: [&mut [u8]; 2] = [&mut dest_1, &mut dest_2];
assert_eq!(copy_to_seq(&src, &mut dest), 3);
}

assert_eq!(&dest_1, &[1]);
assert_eq!(&dest_2, &[2, 3]);
}
}
45 changes: 36 additions & 9 deletions src/buffers/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use arrays;
use arrays::CircularArray;
use buffers::{Buffer, ReadableBuffer, WritableBuffer};
use std::cell::UnsafeCell;
use std::sync::Arc;
use std::sync::atomic::*;

Expand Down Expand Up @@ -46,8 +47,16 @@ impl<T: Copy> ReadableBuffer<T> for Reader<T> {
let head = self.inner.head.load(Ordering::SeqCst);
let tail = self.inner.tail.load(Ordering::SeqCst);

let slices = self.inner.array.as_slices(head..tail);
arrays::copy_seq(&slices, dest)
if head == tail {
return 0;
}

unsafe {
let array = &*self.inner.array.get();

let slices = array.as_slices(head..tail);
arrays::copy_from_seq(&slices, dest)
}
}

fn consume(&mut self, count: usize) -> usize {
Expand Down Expand Up @@ -86,12 +95,15 @@ impl<T: Copy> WritableBuffer<T> for Writer<T> {
let head = self.inner.head.load(Ordering::SeqCst);
let tail = self.inner.tail.load(Ordering::SeqCst);

if tail.wrapping_sub(head) == self.capacity() {
return 0;
}

unsafe {
let array_mut = &self.inner.array as *const _ as *mut CircularArray<T>;
let slices = (&mut *array_mut).as_slices_mut(tail..head);
let array = &mut *self.inner.array.get();

let mut pushed = arrays::copy(src, slices[0]);
pushed += arrays::copy(&src[pushed..], slices[1]);
let mut slices = array.as_slices_mut(tail..head);
let pushed = arrays::copy_to_seq(src, &mut slices);

self.inner.tail.fetch_add(pushed, Ordering::SeqCst);
pushed
Expand All @@ -101,7 +113,7 @@ impl<T: Copy> WritableBuffer<T> for Writer<T> {

/// Contains the shared data between the reader and writer.
struct Inner<T> {
array: CircularArray<T>,
array: UnsafeCell<CircularArray<T>>,
head: AtomicUsize,
tail: AtomicUsize,
}
Expand All @@ -110,7 +122,7 @@ impl<T> Inner<T> {
fn new(capacity: usize) -> Self {
Self {
array: unsafe {
CircularArray::uninitialized(capacity)
UnsafeCell::new(CircularArray::uninitialized(capacity))
},
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
Expand All @@ -130,7 +142,9 @@ impl<T> Buffer<T> for Inner<T> {

#[inline]
fn capacity(&self) -> usize {
self.array.len()
unsafe {
(&*self.array.get()).len()
}
}

fn clear(&mut self) {
Expand Down Expand Up @@ -168,6 +182,19 @@ mod tests {
assert_eq!(buffer.1.len(), bytes.len());
}

#[test]
fn test_push_more_than_buffer() {
let mut buffer = bounded::<u8>(2);
assert_eq!(buffer.0.capacity(), 2);

assert_eq!(buffer.1.push(&[100]), 1);
assert_eq!(buffer.1.push(&[200]), 1);
assert_eq!(buffer.1.push(&[300]), 0);
assert_eq!(buffer.1.push(&[400]), 0);

assert_eq!(buffer.0.len(), 2);
}

#[test]
fn test_push_and_consume() {
let mut buffer = bounded::<u8>(12);
Expand Down
8 changes: 3 additions & 5 deletions src/buffers/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl<T> Buffer<T> for UnboundedBuffer<T> {
impl<T: Copy> ReadableBuffer<T> for UnboundedBuffer<T> {
fn copy_to(&self, dest: &mut [T]) -> usize {
let slices = self.array.as_slices(self.head..self.tail);
arrays::copy_seq(&slices, dest)
arrays::copy_from_seq(&slices, dest)
}

fn consume(&mut self, count: usize) -> usize {
Expand All @@ -104,10 +104,8 @@ impl<T: Copy> WritableBuffer<T> for UnboundedBuffer<T> {
self.resize(new_len);
}

let slices = self.array.as_slices_mut(self.tail..self.head);

let mut pushed = arrays::copy(src, slices[0]);
pushed += arrays::copy(&src[pushed..], slices[1]);
let mut slices = self.array.as_slices_mut(self.tail..self.head);
let pushed = arrays::copy_to_seq(src, &mut slices);

self.tail = self.tail.wrapping_add(pushed);
pushed
Expand Down

0 comments on commit a8824c7

Please sign in to comment.