diff --git a/Cargo.toml b/Cargo.toml index d155f64..2dabafe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,9 +15,9 @@ license = "MIT" nightly = [] [dependencies] -futures-channel-preview = "0.3.0-alpha.17" -futures-core-preview = "0.3.0-alpha.17" -futures-io-preview = "0.3.0-alpha.17" +futures-channel-preview = "0.3.0-alpha.18" +futures-core-preview = "0.3.0-alpha.18" +futures-io-preview = "0.3.0-alpha.18" [dev-dependencies] criterion = "0.3" diff --git a/src/pipe/chunked.rs b/src/pipe/chunked.rs index a1b2b09..5701aec 100644 --- a/src/pipe/chunked.rs +++ b/src/pipe/chunked.rs @@ -21,11 +21,14 @@ //! individual vector to fit larger chunks of bytes that don't already fit. use futures_channel::mpsc; -use futures_core::Stream; +use futures_core::{FusedStream, Stream}; use futures_io::{AsyncRead, AsyncWrite}; -use std::io::{self, Cursor}; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{ + io, + io::Cursor, + pin::Pin, + task::{Context, Poll}, +}; /// Create a new chunked pipe with room for a fixed number of chunks. /// @@ -80,15 +83,22 @@ impl AsyncRead for Reader { let mut chunk = match self.chunk.take() { Some(chunk) => chunk, - None => match Pin::new(&mut self.buf_stream_rx).poll_next(cx) { - // Wait for a new chunk to be delivered. - Poll::Pending => return Poll::Pending, + None => { + // If the stream has terminated, then do not poll it again. + if self.buf_stream_rx.is_terminated() { + return Poll::Ready(Ok(0)); + } + + match Pin::new(&mut self.buf_stream_rx).poll_next(cx) { + // Wait for a new chunk to be delivered. + Poll::Pending => return Poll::Pending, - // Pipe has closed, so return EOF. - Poll::Ready(None) => return Poll::Ready(Ok(0)), + // Pipe has closed, so return EOF. + Poll::Ready(None) => return Poll::Ready(Ok(0)), - // Accept the new chunk. - Poll::Ready(Some(buf)) => buf, + // Accept the new chunk. + Poll::Ready(Some(buf)) => buf, + } } }; @@ -228,7 +238,10 @@ mod tests { assert_eq!(reader.read(&mut dest).await.unwrap(), 5); assert_eq!(&dest, b"hello"); - assert_eq!(reader.read(&mut dest).await.unwrap(), 0); + // Continue returning Ok(0) forever. + for _ in 0..3 { + assert_eq!(reader.read(&mut dest).await.unwrap(), 0); + } }) } @@ -241,9 +254,11 @@ mod tests { drop(reader); - match writer.write(b"hello").poll_unpin(&mut context) { - Poll::Ready(Err(e)) => assert_eq!(e.kind(), io::ErrorKind::BrokenPipe), - _ => panic!("expected poll to be ready"), + for _ in 0..3 { + match writer.write(b"hello").poll_unpin(&mut context) { + Poll::Ready(Err(e)) => assert_eq!(e.kind(), io::ErrorKind::BrokenPipe), + _ => panic!("expected poll to be ready"), + } } } }