Skip to content

Commit

Permalink
Add new I/O module with pipes (#2)
Browse files Browse the repository at this point in the history
Add a new io module for data structures specifically optimized for working with I/O and bytes.

Include "pipes" in this new module, which is simply a wrapper around an atomic byte buffer with blocking features.

Also move any internal types into an internal module.
  • Loading branch information
sagebind authored Aug 10, 2018
1 parent abebdf6 commit 3a5d58a
Show file tree
Hide file tree
Showing 9 changed files with 398 additions and 4 deletions.
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,10 @@ license = "MIT"
keywords = ["buffer", "fifo", "queue"]
categories = ["data-structures", "concurrency"]
repository = "https://github.com/sagebind/ringtail"

[dev-dependencies]
criterion = "0.2"

[[bench]]
name = "pipes"
harness = false
30 changes: 30 additions & 0 deletions benches/pipes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#[macro_use]
extern crate criterion;
extern crate ringtail;

use criterion::Criterion;
use std::io::{self, Write};
use std::thread;

fn pipe_read_write_benchmark(c: &mut Criterion) {
c.bench_function("pipe_read_write", |b| {
let data = [1; 0x1000];

b.iter(move || {
let (mut r, mut w) = ringtail::io::pipe();

let guard = thread::spawn(move || {
for _ in 0..0x10 {
w.write_all(&data).unwrap();
}
});

io::copy(&mut r, &mut io::sink()).unwrap();

guard.join().unwrap();
})
});
}

criterion_group!(benches, pipe_read_write_benchmark);
criterion_main!(benches);
19 changes: 17 additions & 2 deletions src/buffers/atomic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Atomic buffers useful for producer-consumer problems.
use arrays;
use arrays::CircularArray;
use buffers::{Buffer, ReadableBuffer, WritableBuffer};
use internal::arrays;
use internal::arrays::CircularArray;
use std::cell::UnsafeCell;
use std::sync::Arc;
use std::sync::atomic::*;
Expand Down Expand Up @@ -68,6 +68,8 @@ impl<T: Copy> ReadableBuffer<T> for Reader<T> {
}
}

unsafe impl<T: Copy> Send for Reader<T> {}

/// Writing half of an atomic buffer.
pub struct Writer<T> {
inner: Arc<Inner<T>>,
Expand Down Expand Up @@ -111,6 +113,8 @@ impl<T: Copy> WritableBuffer<T> for Writer<T> {
}
}

unsafe impl<T: Copy> Send for Writer<T> {}

/// Contains the shared data between the reader and writer.
struct Inner<T> {
array: UnsafeCell<CircularArray<T>>,
Expand Down Expand Up @@ -182,6 +186,17 @@ mod tests {
assert_eq!(buffer.1.len(), bytes.len());
}

#[test]
fn test_push_capacity() {
let (mut r, mut w) = bounded::<u8>(8192);

let mut bytes = [1; 4096];
assert_eq!(w.push(&bytes), bytes.len());
assert_eq!(w.push(&bytes), bytes.len());
assert_eq!(r.len(), 8192);
assert_eq!(r.pull(&mut bytes), bytes.len());
}

#[test]
fn test_push_more_than_buffer() {
let mut buffer = bounded::<u8>(2);
Expand Down
2 changes: 1 addition & 1 deletion src/buffers/unbounded.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrays::{self, CircularArray};
use buffers::{Buffer, ReadableBuffer, WritableBuffer};
use internal::arrays::{self, CircularArray};

/// Growable ring buffer.
///
Expand Down
File renamed without changes.
4 changes: 4 additions & 0 deletions src/internal/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//! Internal supporting structures.
pub mod arrays;
pub mod sync;
59 changes: 59 additions & 0 deletions src/internal/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! Synchronization primitives.
use std::sync::atomic::*;
use std::sync::{Condvar, Mutex};

/// A synchronization primitive used to put threads to sleep until another thread wakes it up.
pub struct Signal {
lock: Mutex<()>,
condvar: Condvar,
notified: AtomicBool,
waiting: AtomicUsize,
}

impl Default for Signal {
fn default() -> Self {
Self {
lock: Mutex::new(()),
condvar: Condvar::new(),
notified: AtomicBool::default(),
waiting: AtomicUsize::default(),
}
}
}

impl Signal {
pub fn notify(&self) {
// Set the notify flag.
self.notified.store(true, Ordering::SeqCst);

// If any threads are waiting, wake one up.
if self.waiting.load(Ordering::SeqCst) > 0 {
// Acquire the mutex to coordinate waking up a thread.
let _guard = self.lock.lock().unwrap();
self.condvar.notify_one();
}
}

pub fn wait(&self) {
// Fast path.
if self.notified.swap(false, Ordering::SeqCst) {
return;
}

// Indicate we have begun waiting.
self.waiting.fetch_add(1, Ordering::SeqCst);

// Acquire the mutex to coordinate waiting.
let mut guard = self.lock.lock().unwrap();

// Ensure the notify flag was not just set, then wait loop to ignore spurious wake-ups.
while !self.notified.swap(false, Ordering::SeqCst) {
guard = self.condvar.wait(guard).unwrap();
}

// We're finished waiting.
drop(guard);
self.waiting.fetch_sub(1, Ordering::SeqCst);
}
}
Loading

0 comments on commit 3a5d58a

Please sign in to comment.