diff --git a/src/pipe/chunked.rs b/src/pipe/chunked.rs index b60dbb1..5d34597 100644 --- a/src/pipe/chunked.rs +++ b/src/pipe/chunked.rs @@ -148,6 +148,13 @@ pub(crate) struct Writer { impl AsyncWrite for Writer { fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + // If the pipe is closed then return prematurely, otherwise we'd be + // spending time writing the entire buffer only to discover that it is + // closed afterward. + if self.buf_stream_tx.is_closed() { + return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())); + } + // Attempt to grab an available buffer to write the chunk to. match self.buf_pool_rx.poll_next_unpin(cx) { // Wait for the reader to finish reading a chunk. @@ -221,4 +228,19 @@ mod tests { assert_eq!(reader.read(&mut dest).await.unwrap(), 0); }) } + + #[test] + fn writer_errors_if_reader_is_dropped() { + let waker = noop_waker(); + let mut context = Context::from_waker(&waker); + + let (mut reader, mut writer) = new(2); + + drop(reader); + + match writer.write(b"hello").poll_unpin(&mut context) { + Poll::Ready(Err(e)) => assert_eq!(e.kind(), io::ErrorKind::BrokenPipe), + _ => panic!("expected poll to be ready"), + } + } }