diff --git a/benches/pipe.rs b/benches/pipe.rs index 89dbb3f..2dffb0e 100644 --- a/benches/pipe.rs +++ b/benches/pipe.rs @@ -1,37 +1,36 @@ #![feature(async_await)] -#[macro_use] -extern crate criterion; +use criterion::*; -use criterion::Criterion; -use futures::executor::ThreadPool; -use futures::prelude::*; -use std::io; +fn benchmark(c: &mut Criterion) { + c.bench_function("write 100 1K chunks", |b| { + use futures::executor::ThreadPool; + use futures::prelude::*; -fn criterion_benchmark(c: &mut Criterion) { - c.bench_function("pipe_read_write", |b| { let mut pool = ThreadPool::new().unwrap(); - let data = [1; 0x1000]; - - b.iter(|| { - let (mut reader, mut writer) = sluice::pipe::pipe(); - - let producer = async { - for _ in 0..0x10 { - writer.write_all(&data).await.unwrap(); - } - writer.close().await.unwrap(); - }; - - let consumer = async { - let mut sink = io::sink(); - reader.copy_into(&mut sink).await.unwrap(); - }; - - pool.run(future::join(producer, consumer)); - }) + let data = [1; 1024]; + + b.iter_batched( + sluice::pipe::pipe, + |(mut reader, mut writer)| { + let producer = async { + for _ in 0..100 { + writer.write_all(&data).await.unwrap(); + } + writer.close().await.unwrap(); + }; + + let consumer = async { + let mut sink = std::io::sink(); + reader.copy_into(&mut sink).await.unwrap(); + }; + + pool.run(future::join(producer, consumer)); + }, + BatchSize::SmallInput, + ) }); } -criterion_group!(benches, criterion_benchmark); +criterion_group!(benches, benchmark); criterion_main!(benches); diff --git a/src/pipe/chunked.rs b/src/pipe/chunked.rs index 6af9214..c0ed1ad 100644 --- a/src/pipe/chunked.rs +++ b/src/pipe/chunked.rs @@ -44,7 +44,7 @@ pub fn new(count: usize) -> (Reader, Writer) { // Fill up the buffer pool. for _ in 0..count { - buf_pool_tx.try_send(Vec::new()).expect("buffer pool overflow"); + buf_pool_tx.try_send(Cursor::new(Vec::new())).expect("buffer pool overflow"); } let reader = Reader { @@ -64,10 +64,10 @@ pub fn new(count: usize) -> (Reader, Writer) { /// The reading half of a chunked pipe. pub struct Reader { /// A channel of incoming chunks from the writer. - buf_pool_tx: mpsc::Sender>, + buf_pool_tx: mpsc::Sender>>, /// A channel of chunk buffers that have been consumed and can be reused. - buf_stream_rx: mpsc::Receiver>, + buf_stream_rx: mpsc::Receiver>>, /// A chunk currently being read from. chunk: Option>>, @@ -88,7 +88,7 @@ impl AsyncRead for Reader { Poll::Ready(None) => return Poll::Ready(Ok(0)), // Accept the new chunk. - Poll::Ready(Some(buf)) => Cursor::new(buf), + Poll::Ready(Some(buf)) => buf, } }; @@ -106,8 +106,8 @@ impl AsyncRead for Reader { // Otherwise, return it to the writer to be reused. else { - let mut chunk = chunk.into_inner(); - chunk.clear(); + chunk.set_position(0); + chunk.get_mut().clear(); match self.buf_pool_tx.try_send(chunk) { Ok(()) => {} @@ -131,10 +131,10 @@ impl AsyncRead for Reader { /// Writing half of a chunked pipe. pub struct Writer { /// A channel of chunks to send to the reader. - buf_pool_rx: mpsc::Receiver>, + buf_pool_rx: mpsc::Receiver>>, /// A channel of incoming buffers to write chunks to. - buf_stream_tx: mpsc::Sender>, + buf_stream_tx: mpsc::Sender>>, } impl AsyncWrite for Writer { @@ -150,7 +150,7 @@ impl AsyncWrite for Writer { // An available buffer has been found. Poll::Ready(Some(mut chunk)) => { // Write the buffer to the chunk. - chunk.extend_from_slice(buf); + chunk.get_mut().extend_from_slice(buf); // Send the chunk to the reader. match self.buf_stream_tx.try_send(chunk) {