diff --git a/Cargo.toml b/Cargo.toml index 94fca9a..a2437d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,3 +7,10 @@ license = "MIT" keywords = ["buffer", "fifo", "queue"] categories = ["data-structures", "concurrency"] repository = "https://github.com/sagebind/ringtail" + +[dev-dependencies] +criterion = "0.2" + +[[bench]] +name = "pipes" +harness = false diff --git a/benches/pipes.rs b/benches/pipes.rs new file mode 100644 index 0000000..9e708ac --- /dev/null +++ b/benches/pipes.rs @@ -0,0 +1,30 @@ +#[macro_use] +extern crate criterion; +extern crate ringtail; + +use criterion::Criterion; +use std::io::{self, Write}; +use std::thread; + +fn pipe_read_write_benchmark(c: &mut Criterion) { + c.bench_function("pipe_read_write", |b| { + let data = [1; 0x1000]; + + b.iter(move || { + let (mut r, mut w) = ringtail::io::pipe(); + + let guard = thread::spawn(move || { + for _ in 0..0x10 { + w.write_all(&data).unwrap(); + } + }); + + io::copy(&mut r, &mut io::sink()).unwrap(); + + guard.join().unwrap(); + }) + }); +} + +criterion_group!(benches, pipe_read_write_benchmark); +criterion_main!(benches); diff --git a/src/buffers/atomic.rs b/src/buffers/atomic.rs index 1f04908..e377671 100644 --- a/src/buffers/atomic.rs +++ b/src/buffers/atomic.rs @@ -1,7 +1,7 @@ //! Atomic buffers useful for producer-consumer problems. -use arrays; -use arrays::CircularArray; use buffers::{Buffer, ReadableBuffer, WritableBuffer}; +use internal::arrays; +use internal::arrays::CircularArray; use std::cell::UnsafeCell; use std::sync::Arc; use std::sync::atomic::*; @@ -68,6 +68,8 @@ impl ReadableBuffer for Reader { } } +unsafe impl Send for Reader {} + /// Writing half of an atomic buffer. pub struct Writer { inner: Arc>, @@ -111,6 +113,8 @@ impl WritableBuffer for Writer { } } +unsafe impl Send for Writer {} + /// Contains the shared data between the reader and writer. struct Inner { array: UnsafeCell>, @@ -182,6 +186,17 @@ mod tests { assert_eq!(buffer.1.len(), bytes.len()); } + #[test] + fn test_push_capacity() { + let (mut r, mut w) = bounded::(8192); + + let mut bytes = [1; 4096]; + assert_eq!(w.push(&bytes), bytes.len()); + assert_eq!(w.push(&bytes), bytes.len()); + assert_eq!(r.len(), 8192); + assert_eq!(r.pull(&mut bytes), bytes.len()); + } + #[test] fn test_push_more_than_buffer() { let mut buffer = bounded::(2); diff --git a/src/buffers/unbounded.rs b/src/buffers/unbounded.rs index f603a0e..7014322 100644 --- a/src/buffers/unbounded.rs +++ b/src/buffers/unbounded.rs @@ -1,5 +1,5 @@ -use arrays::{self, CircularArray}; use buffers::{Buffer, ReadableBuffer, WritableBuffer}; +use internal::arrays::{self, CircularArray}; /// Growable ring buffer. /// diff --git a/src/arrays.rs b/src/internal/arrays.rs similarity index 100% rename from src/arrays.rs rename to src/internal/arrays.rs diff --git a/src/internal/mod.rs b/src/internal/mod.rs new file mode 100644 index 0000000..b590c5a --- /dev/null +++ b/src/internal/mod.rs @@ -0,0 +1,4 @@ +//! Internal supporting structures. + +pub mod arrays; +pub mod sync; diff --git a/src/internal/sync.rs b/src/internal/sync.rs new file mode 100644 index 0000000..ec0ec57 --- /dev/null +++ b/src/internal/sync.rs @@ -0,0 +1,59 @@ +//! Synchronization primitives. + +use std::sync::atomic::*; +use std::sync::{Condvar, Mutex}; + +/// A synchronization primitive used to put threads to sleep until another thread wakes it up. +pub struct Signal { + lock: Mutex<()>, + condvar: Condvar, + notified: AtomicBool, + waiting: AtomicUsize, +} + +impl Default for Signal { + fn default() -> Self { + Self { + lock: Mutex::new(()), + condvar: Condvar::new(), + notified: AtomicBool::default(), + waiting: AtomicUsize::default(), + } + } +} + +impl Signal { + pub fn notify(&self) { + // Set the notify flag. + self.notified.store(true, Ordering::SeqCst); + + // If any threads are waiting, wake one up. + if self.waiting.load(Ordering::SeqCst) > 0 { + // Acquire the mutex to coordinate waking up a thread. + let _guard = self.lock.lock().unwrap(); + self.condvar.notify_one(); + } + } + + pub fn wait(&self) { + // Fast path. + if self.notified.swap(false, Ordering::SeqCst) { + return; + } + + // Indicate we have begun waiting. + self.waiting.fetch_add(1, Ordering::SeqCst); + + // Acquire the mutex to coordinate waiting. + let mut guard = self.lock.lock().unwrap(); + + // Ensure the notify flag was not just set, then wait loop to ignore spurious wake-ups. + while !self.notified.swap(false, Ordering::SeqCst) { + guard = self.condvar.wait(guard).unwrap(); + } + + // We're finished waiting. + drop(guard); + self.waiting.fetch_sub(1, Ordering::SeqCst); + } +} diff --git a/src/io.rs b/src/io.rs new file mode 100644 index 0000000..34bf52a --- /dev/null +++ b/src/io.rs @@ -0,0 +1,276 @@ +//! Provides types specifically designed for working with bytes and I/O. + +use buffers::{atomic, ReadableBuffer, WritableBuffer}; +use internal::sync::Signal; +use std::io; +use std::sync::atomic::*; +use std::sync::Arc; + +const DEFAULT_CAPACITY: usize = 8192; +const FLAG_NONBLOCKING: u8 = 0b0001; + +/// Open a new pipe and return a reader and writer pair. +/// +/// The pipe will be allocated with a default buffer size of 8 KiB. To customize the buffer size, use +/// [`PipeBuilder`](struct.PipeBuilder.html) instead. +pub fn pipe() -> (PipeReader, PipeWriter) { + PipeBuilder::default().build() +} + +/// Creates new pipes with configurable properties. +pub struct PipeBuilder { + flags: u8, + capacity: usize, +} + +impl Default for PipeBuilder { + fn default() -> Self { + Self { + flags: 0, + capacity: DEFAULT_CAPACITY, + } + } +} + +impl PipeBuilder { + /// Enable or disable non-blocking behavior. + /// + /// If non-blocking mode is enabled, any reads or writes that cannot be completed until later will return an + /// `WouldBlock` error instead of blocking the current thread. + pub fn nonblocking(&mut self, nonblocking: bool) -> &mut Self { + if nonblocking { + self.flags |= FLAG_NONBLOCKING; + } else { + self.flags &= !FLAG_NONBLOCKING; + } + self + } + + /// Set the maximum buffer capacity of the pipe in bytes. + pub fn capacity(&mut self, capacity: usize) -> &mut Self { + self.capacity = capacity; + self + } + + /// Create a new pipe using the current settings and return a reader and writer pair. + pub fn build(&self) -> (PipeReader, PipeWriter) { + let buffers = atomic::bounded(self.capacity); + let shared = Arc::::default(); + + ( + PipeReader { + flags: self.flags, + buffer: buffers.0, + shared: shared.clone(), + }, + PipeWriter { + flags: self.flags, + buffer: buffers.1, + shared: shared, + }, + ) + } +} + +/// The reading end of a pipe. +pub struct PipeReader { + flags: u8, + buffer: atomic::Reader, + shared: Arc, +} + +impl PipeReader { + /// Set the non-blocking mode for this end of the pipe. + /// + /// If non-blocking mode is enabled, attempting to read from an empty pipe will return an `WouldBlock` error instead + /// of blocking the current thread until data becomes available. + pub fn set_nonblocking(&mut self, nonblocking: bool) { + if nonblocking { + self.flags |= FLAG_NONBLOCKING; + } else { + self.flags &= !FLAG_NONBLOCKING; + } + } +} + +impl io::Read for PipeReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if buf.is_empty() { + return Ok(0); + } + + loop { + let len = self.buffer.pull(buf); + + // Successful read. + if len > 0 { + self.shared.full_signal.notify(); + return Ok(len); + } + + // Pipe is empty, check if it is closed. + if self.shared.drop_flag.load(Ordering::SeqCst) { + return Ok(0); + } + + // Pipe is empty, but we don't want to block. + if self.flags & FLAG_NONBLOCKING != 0 { + return Err(io::ErrorKind::WouldBlock.into()); + } + + // Pipe is empty, and we do want to block. + self.shared.empty_signal.wait(); + } + } +} + +impl Drop for PipeReader { + fn drop(&mut self) { + self.shared.drop_flag.store(true, Ordering::SeqCst); + self.shared.full_signal.notify(); + } +} + +/// The writing end of a pipe. +pub struct PipeWriter { + flags: u8, + buffer: atomic::Writer, + shared: Arc, +} + +impl PipeWriter { + /// Set the non-blocking mode for this end of the pipe. + /// + /// If non-blocking mode is enabled, attempting to read from an empty pipe will return an `WouldBlock` error instead + /// of blocking the current thread until data becomes available. + pub fn set_nonblocking(&mut self, nonblocking: bool) { + if nonblocking { + self.flags |= FLAG_NONBLOCKING; + } else { + self.flags &= !FLAG_NONBLOCKING; + } + } +} + +impl io::Write for PipeWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + if buf.is_empty() { + return Ok(0); + } + + loop { + // Early check for closed pipe. + if self.shared.drop_flag.load(Ordering::SeqCst) { + return Err(io::ErrorKind::BrokenPipe.into()); + } + + let len = self.buffer.push(buf); + + // Successful write. + if len > 0 { + self.shared.empty_signal.notify(); + return Ok(len); + } + + // Pipe is full, but we don't want to block. + if self.flags & FLAG_NONBLOCKING != 0 { + return Err(io::ErrorKind::WouldBlock.into()); + } + + // Pipe is full, and we do want to block. + self.shared.full_signal.wait(); + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl Drop for PipeWriter { + fn drop(&mut self) { + self.shared.drop_flag.store(true, Ordering::SeqCst); + self.shared.empty_signal.notify(); + } +} + +/// Used to coordinate synchronization between both ends of a pipe. +#[derive(Default)] +struct PipeShared { + empty_signal: Signal, + full_signal: Signal, + drop_flag: AtomicBool, +} + +#[cfg(test)] +mod tests { + use std::io::{self, Read, Write}; + use std::thread; + use std::time::Duration; + use super::*; + + #[test] + fn read_write() { + let (mut reader, mut writer) = pipe(); + + assert_eq!(writer.write(b"hello world").unwrap(), 11); + + let mut buf = [0; 11]; + assert_eq!(reader.read(&mut buf).unwrap(), 11); + assert_eq!(&buf, b"hello world"); + } + + #[test] + fn read_empty_blocking() { + let (mut reader, mut writer) = PipeBuilder::default() + .capacity(16) + .build(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(100)); + let buf = [1; 1]; + writer.write(&buf).unwrap(); + }); + + let mut buf = [0; 1]; + assert_eq!(reader.read(&mut buf).unwrap(), 1); + assert_eq!(buf[0], 1); + } + + #[test] + fn read_nonblocking() { + let (mut reader, _writer) = pipe(); + + let mut buf = [0; 4]; + reader.set_nonblocking(true); + assert_eq!(reader.read(&mut buf).err().unwrap().kind(), io::ErrorKind::WouldBlock); + } + + #[test] + fn write_nonblocking() { + let (_reader, mut writer) = PipeBuilder::default() + .capacity(16) + .build(); + + let buf = [0; 16]; + assert_eq!(writer.write(&buf).unwrap(), buf.len()); + + writer.set_nonblocking(true); + assert_eq!(writer.write(&buf).err().unwrap().kind(), io::ErrorKind::WouldBlock); + } + + #[test] + fn read_from_closed_pipe_returns_zero() { + let (mut reader, _) = pipe(); + + let mut buf = [0; 16]; + assert_eq!(reader.read(&mut buf).unwrap(), 0); + } + + #[test] + fn write_to_closed_pipe_returns_broken_pipe() { + let (_, mut writer) = pipe(); + + assert_eq!(writer.write(b"hi").err().unwrap().kind(), io::ErrorKind::BrokenPipe); + } +} diff --git a/src/lib.rs b/src/lib.rs index cb80465..f3889ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,5 +9,8 @@ //! //! In Ringtail, a _buffer_ is a queue optimized for reading and writing multiple elements in bulk, like an in-memory //! version of an I/O stream. -mod arrays; + pub mod buffers; +pub mod io; + +mod internal;