Skip to content

Commit

Permalink
Switch channel implementation to async-channel (#19)
Browse files Browse the repository at this point in the history
Switch the underlying channel implementation from futures-channel to async-channel, since the former has been shown to fall behind on performance metrics. In addition, crates from the futures project tend to have slower compile times compared to some alternatives.

Fixes #18.
  • Loading branch information
sagebind authored Aug 15, 2021
1 parent 8cc16ad commit cdb8ef3
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ categories = ["asynchronous", "concurrency", "data-structures"]
license = "MIT"

[dependencies]
futures-channel = "0.3"
async-channel = "1"
futures-core = "0.3"
futures-io = "0.3"

Expand Down
22 changes: 11 additions & 11 deletions src/pipe/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//! that happen during reads and writes are occasional reallocation for each
//! individual vector to fit larger chunks of bytes that don't already fit.
use futures_channel::mpsc;
use async_channel::{bounded, Sender, Receiver};
use futures_core::{FusedStream, Stream};
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
use std::{
Expand All @@ -42,8 +42,8 @@ use std::{
/// reader or writer can operate on the single buffer at one time and cannot be
/// run in parallel.
pub(crate) fn new(count: usize) -> (Reader, Writer) {
let (mut buf_pool_tx, buf_pool_rx) = mpsc::channel(count);
let (buf_stream_tx, buf_stream_rx) = mpsc::channel(count);
let (buf_pool_tx, buf_pool_rx) = bounded(count);
let (buf_stream_tx, buf_stream_rx) = bounded(count);

// Fill up the buffer pool.
for _ in 0..count {
Expand All @@ -69,10 +69,10 @@ pub(crate) fn new(count: usize) -> (Reader, Writer) {
/// The reading half of a chunked pipe.
pub(crate) struct Reader {
/// A channel of incoming chunks from the writer.
buf_pool_tx: mpsc::Sender<Cursor<Vec<u8>>>,
buf_pool_tx: Sender<Cursor<Vec<u8>>>,

/// A channel of chunk buffers that have been consumed and can be reused.
buf_stream_rx: mpsc::Receiver<Cursor<Vec<u8>>>,
buf_stream_rx: Receiver<Cursor<Vec<u8>>>,

/// A chunk currently being read from.
chunk: Option<Cursor<Vec<u8>>>,
Expand Down Expand Up @@ -123,7 +123,7 @@ impl AsyncBufRead for Reader {
// If the writer disconnects, then we'll just discard this
// buffer and any subsequent buffers until we've read
// everything still in the pipe.
else if e.is_disconnected() {
else if e.is_closed() {
// Nothing!
}
// Some other error occurred.
Expand Down Expand Up @@ -175,17 +175,17 @@ impl Drop for Reader {
// that the writer knows the pipe is closed before trying to poll the
// pool channel.
self.buf_stream_rx.close();
self.buf_pool_tx.close_channel();
self.buf_pool_tx.close();
}
}

/// Writing half of a chunked pipe.
pub(crate) struct Writer {
/// A channel of chunks to send to the reader.
buf_pool_rx: mpsc::Receiver<Cursor<Vec<u8>>>,
buf_pool_rx: Receiver<Cursor<Vec<u8>>>,

/// A channel of incoming buffers to write chunks to.
buf_stream_tx: mpsc::Sender<Cursor<Vec<u8>>>,
buf_stream_tx: Sender<Cursor<Vec<u8>>>,
}

impl AsyncWrite for Writer {
Expand Down Expand Up @@ -239,8 +239,8 @@ impl AsyncWrite for Writer {
Poll::Ready(Ok(()))
}

fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
self.buf_stream_tx.close_channel();
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
self.buf_stream_tx.close();
Poll::Ready(Ok(()))
}
}

0 comments on commit cdb8ef3

Please sign in to comment.