Skip to content

Commit

Permalink
Fix panic when reading from a pipe that has already returned EOF
Browse files Browse the repository at this point in the history
Fixes #6.
  • Loading branch information
sagebind committed Sep 5, 2019
1 parent 83fb08a commit ea3cd4f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 18 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ license = "MIT"
nightly = []

[dependencies]
futures-channel-preview = "0.3.0-alpha.17"
futures-core-preview = "0.3.0-alpha.17"
futures-io-preview = "0.3.0-alpha.17"
futures-channel-preview = "0.3.0-alpha.18"
futures-core-preview = "0.3.0-alpha.18"
futures-io-preview = "0.3.0-alpha.18"

[dev-dependencies]
criterion = "0.3"
Expand Down
45 changes: 30 additions & 15 deletions src/pipe/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
//! individual vector to fit larger chunks of bytes that don't already fit.
use futures_channel::mpsc;
use futures_core::Stream;
use futures_core::{FusedStream, Stream};
use futures_io::{AsyncRead, AsyncWrite};
use std::io::{self, Cursor};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{
io,
io::Cursor,
pin::Pin,
task::{Context, Poll},
};

/// Create a new chunked pipe with room for a fixed number of chunks.
///
Expand Down Expand Up @@ -80,15 +83,22 @@ impl AsyncRead for Reader {
let mut chunk = match self.chunk.take() {
Some(chunk) => chunk,

None => match Pin::new(&mut self.buf_stream_rx).poll_next(cx) {
// Wait for a new chunk to be delivered.
Poll::Pending => return Poll::Pending,
None => {
// If the stream has terminated, then do not poll it again.
if self.buf_stream_rx.is_terminated() {
return Poll::Ready(Ok(0));
}

match Pin::new(&mut self.buf_stream_rx).poll_next(cx) {
// Wait for a new chunk to be delivered.
Poll::Pending => return Poll::Pending,

// Pipe has closed, so return EOF.
Poll::Ready(None) => return Poll::Ready(Ok(0)),
// Pipe has closed, so return EOF.
Poll::Ready(None) => return Poll::Ready(Ok(0)),

// Accept the new chunk.
Poll::Ready(Some(buf)) => buf,
// Accept the new chunk.
Poll::Ready(Some(buf)) => buf,
}
}
};

Expand Down Expand Up @@ -228,7 +238,10 @@ mod tests {
assert_eq!(reader.read(&mut dest).await.unwrap(), 5);
assert_eq!(&dest, b"hello");

assert_eq!(reader.read(&mut dest).await.unwrap(), 0);
// Continue returning Ok(0) forever.
for _ in 0..3 {
assert_eq!(reader.read(&mut dest).await.unwrap(), 0);
}
})
}

Expand All @@ -241,9 +254,11 @@ mod tests {

drop(reader);

match writer.write(b"hello").poll_unpin(&mut context) {
Poll::Ready(Err(e)) => assert_eq!(e.kind(), io::ErrorKind::BrokenPipe),
_ => panic!("expected poll to be ready"),
for _ in 0..3 {
match writer.write(b"hello").poll_unpin(&mut context) {
Poll::Ready(Err(e)) => assert_eq!(e.kind(), io::ErrorKind::BrokenPipe),
_ => panic!("expected poll to be ready"),
}
}
}
}

0 comments on commit ea3cd4f

Please sign in to comment.