diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d8a611b..bc14d3d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -13,12 +13,33 @@ jobs: env: RUST_BACKTRACE: 1 steps: - - uses: actions/checkout@master + - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: "1.39.0" + toolchain: stable default: true - run: cargo test + + check: + strategy: + matrix: + platform: + - ubuntu-latest + - macos-latest + - windows-latest + runs-on: ${{ matrix.platform }} + env: + RUST_BACKTRACE: 1 + steps: + - uses: actions/checkout@v2 + + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: "1.41.1" + default: true + + - run: cargo check diff --git a/src/pipe/chunked.rs b/src/pipe/chunked.rs index de2c0bd..bff7257 100644 --- a/src/pipe/chunked.rs +++ b/src/pipe/chunked.rs @@ -169,6 +169,16 @@ impl AsyncBufRead for Reader { } } +impl Drop for Reader { + fn drop(&mut self) { + // Ensure we close the primary stream first before the pool stream so + // that the writer knows the pipe is closed before trying to poll the + // pool channel. + self.buf_stream_rx.close(); + self.buf_pool_tx.close_channel(); + } +} + /// Writing half of a chunked pipe. pub(crate) struct Writer { /// A channel of chunks to send to the reader. @@ -184,18 +194,18 @@ impl AsyncWrite for Writer { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { + // If the reading end of the pipe is closed then return an error now, + // otherwise we'd be spending time writing the entire buffer only to + // discover that it is closed afterward. + if self.buf_stream_tx.is_closed() { + return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())); + } + // Do not send empty buffers through the rotation. if buf.is_empty() { return Poll::Ready(Ok(0)); } - // If the pipe is closed then return prematurely, otherwise we'd be - // spending time writing the entire buffer only to discover that it is - // closed afterward. - if self.buf_stream_tx.is_closed() { - return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())); - } - // Attempt to grab an available buffer to write the chunk to. match Pin::new(&mut self.buf_pool_rx).poll_next(cx) { // Wait for the reader to finish reading a chunk. diff --git a/tests/pipe.rs b/tests/pipe.rs index c51b2d7..dd925fb 100644 --- a/tests/pipe.rs +++ b/tests/pipe.rs @@ -90,7 +90,7 @@ fn pipe_lots_of_data() { } #[quickcheck] -fn read_write_chunks_random(chunks: u16) { +fn read_write_chunks_random(chunks: u8) { block_on(async { let data = [0; 8192]; let (mut reader, mut writer) = pipe();