From cdb8ef3593cf9fc502ee3bb4c9ef406593457add Mon Sep 17 00:00:00 2001 From: "Stephen M. Coakley" Date: Sun, 15 Aug 2021 13:49:23 -0500 Subject: [PATCH] Switch channel implementation to async-channel (#19) Switch the underlying channel implementation from futures-channel to async-channel, since the former has been shown to fall behind on performance metrics. In addition, crates from the futures project tend to have slower compile times compared to some alternatives. Fixes #18. --- Cargo.toml | 2 +- src/pipe/chunked.rs | 22 +++++++++++----------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 713276a..dde51ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ categories = ["asynchronous", "concurrency", "data-structures"] license = "MIT" [dependencies] -futures-channel = "0.3" +async-channel = "1" futures-core = "0.3" futures-io = "0.3" diff --git a/src/pipe/chunked.rs b/src/pipe/chunked.rs index bff7257..c08dba7 100644 --- a/src/pipe/chunked.rs +++ b/src/pipe/chunked.rs @@ -20,7 +20,7 @@ //! that happen during reads and writes are occasional reallocation for each //! individual vector to fit larger chunks of bytes that don't already fit. -use futures_channel::mpsc; +use async_channel::{bounded, Sender, Receiver}; use futures_core::{FusedStream, Stream}; use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; use std::{ @@ -42,8 +42,8 @@ use std::{ /// reader or writer can operate on the single buffer at one time and cannot be /// run in parallel. pub(crate) fn new(count: usize) -> (Reader, Writer) { - let (mut buf_pool_tx, buf_pool_rx) = mpsc::channel(count); - let (buf_stream_tx, buf_stream_rx) = mpsc::channel(count); + let (buf_pool_tx, buf_pool_rx) = bounded(count); + let (buf_stream_tx, buf_stream_rx) = bounded(count); // Fill up the buffer pool. for _ in 0..count { @@ -69,10 +69,10 @@ pub(crate) fn new(count: usize) -> (Reader, Writer) { /// The reading half of a chunked pipe. pub(crate) struct Reader { /// A channel of incoming chunks from the writer. - buf_pool_tx: mpsc::Sender>>, + buf_pool_tx: Sender>>, /// A channel of chunk buffers that have been consumed and can be reused. - buf_stream_rx: mpsc::Receiver>>, + buf_stream_rx: Receiver>>, /// A chunk currently being read from. chunk: Option>>, @@ -123,7 +123,7 @@ impl AsyncBufRead for Reader { // If the writer disconnects, then we'll just discard this // buffer and any subsequent buffers until we've read // everything still in the pipe. - else if e.is_disconnected() { + else if e.is_closed() { // Nothing! } // Some other error occurred. @@ -175,17 +175,17 @@ impl Drop for Reader { // that the writer knows the pipe is closed before trying to poll the // pool channel. self.buf_stream_rx.close(); - self.buf_pool_tx.close_channel(); + self.buf_pool_tx.close(); } } /// Writing half of a chunked pipe. pub(crate) struct Writer { /// A channel of chunks to send to the reader. - buf_pool_rx: mpsc::Receiver>>, + buf_pool_rx: Receiver>>, /// A channel of incoming buffers to write chunks to. - buf_stream_tx: mpsc::Sender>>, + buf_stream_tx: Sender>>, } impl AsyncWrite for Writer { @@ -239,8 +239,8 @@ impl AsyncWrite for Writer { Poll::Ready(Ok(())) } - fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - self.buf_stream_tx.close_channel(); + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + self.buf_stream_tx.close(); Poll::Ready(Ok(())) } }