Skip to content

Commit

Permalink
micro-optimization: reuse same cursor over and over
Browse files Browse the repository at this point in the history
  • Loading branch information
sagebind committed May 23, 2019
1 parent e0b94e9 commit 759556d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 37 deletions.
55 changes: 27 additions & 28 deletions benches/pipe.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,36 @@
#![feature(async_await)]

#[macro_use]
extern crate criterion;
use criterion::*;

use criterion::Criterion;
use futures::executor::ThreadPool;
use futures::prelude::*;
use std::io;
fn benchmark(c: &mut Criterion) {
c.bench_function("write 100 1K chunks", |b| {
use futures::executor::ThreadPool;
use futures::prelude::*;

fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("pipe_read_write", |b| {
let mut pool = ThreadPool::new().unwrap();
let data = [1; 0x1000];

b.iter(|| {
let (mut reader, mut writer) = sluice::pipe::pipe();

let producer = async {
for _ in 0..0x10 {
writer.write_all(&data).await.unwrap();
}
writer.close().await.unwrap();
};

let consumer = async {
let mut sink = io::sink();
reader.copy_into(&mut sink).await.unwrap();
};

pool.run(future::join(producer, consumer));
})
let data = [1; 1024];

b.iter_batched(
sluice::pipe::pipe,
|(mut reader, mut writer)| {
let producer = async {
for _ in 0..100 {
writer.write_all(&data).await.unwrap();
}
writer.close().await.unwrap();
};

let consumer = async {
let mut sink = std::io::sink();
reader.copy_into(&mut sink).await.unwrap();
};

pool.run(future::join(producer, consumer));
},
BatchSize::SmallInput,
)
});
}

criterion_group!(benches, criterion_benchmark);
criterion_group!(benches, benchmark);
criterion_main!(benches);
18 changes: 9 additions & 9 deletions src/pipe/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn new(count: usize) -> (Reader, Writer) {

// Fill up the buffer pool.
for _ in 0..count {
buf_pool_tx.try_send(Vec::new()).expect("buffer pool overflow");
buf_pool_tx.try_send(Cursor::new(Vec::new())).expect("buffer pool overflow");
}

let reader = Reader {
Expand All @@ -64,10 +64,10 @@ pub fn new(count: usize) -> (Reader, Writer) {
/// The reading half of a chunked pipe.
pub struct Reader {
/// A channel of incoming chunks from the writer.
buf_pool_tx: mpsc::Sender<Vec<u8>>,
buf_pool_tx: mpsc::Sender<Cursor<Vec<u8>>>,

/// A channel of chunk buffers that have been consumed and can be reused.
buf_stream_rx: mpsc::Receiver<Vec<u8>>,
buf_stream_rx: mpsc::Receiver<Cursor<Vec<u8>>>,

/// A chunk currently being read from.
chunk: Option<Cursor<Vec<u8>>>,
Expand All @@ -88,7 +88,7 @@ impl AsyncRead for Reader {
Poll::Ready(None) => return Poll::Ready(Ok(0)),

// Accept the new chunk.
Poll::Ready(Some(buf)) => Cursor::new(buf),
Poll::Ready(Some(buf)) => buf,
}
};

Expand All @@ -106,8 +106,8 @@ impl AsyncRead for Reader {

// Otherwise, return it to the writer to be reused.
else {
let mut chunk = chunk.into_inner();
chunk.clear();
chunk.set_position(0);
chunk.get_mut().clear();

match self.buf_pool_tx.try_send(chunk) {
Ok(()) => {}
Expand All @@ -131,10 +131,10 @@ impl AsyncRead for Reader {
/// Writing half of a chunked pipe.
pub struct Writer {
/// A channel of chunks to send to the reader.
buf_pool_rx: mpsc::Receiver<Vec<u8>>,
buf_pool_rx: mpsc::Receiver<Cursor<Vec<u8>>>,

/// A channel of incoming buffers to write chunks to.
buf_stream_tx: mpsc::Sender<Vec<u8>>,
buf_stream_tx: mpsc::Sender<Cursor<Vec<u8>>>,
}

impl AsyncWrite for Writer {
Expand All @@ -150,7 +150,7 @@ impl AsyncWrite for Writer {
// An available buffer has been found.
Poll::Ready(Some(mut chunk)) => {
// Write the buffer to the chunk.
chunk.extend_from_slice(buf);
chunk.get_mut().extend_from_slice(buf);

// Send the chunk to the reader.
match self.buf_stream_tx.try_send(chunk) {
Expand Down

0 comments on commit 759556d

Please sign in to comment.