Skip to content

Commit

Permalink
Fix panic caused by race condition when writing while dropping reader (
Browse files Browse the repository at this point in the history
…#16)

`Writer` has a race condition that might cause a panic if `poll_write` is called more than once while the `Reader` is in the processed of being dropped on another thread.

The race condition is caused by the drop order of `Reader`: Since fields are dropped in the order they are defined in, the buffer pool channel is dropped _before_ the buffer stream channel. If `poll_write` is called more than once between these two drop calls, then the underlying channel will panic because we've polled it after already returning `None`.

The fix is to ensure the buffer stream channel is closed _first_ before the buffer pool channel, since that is the channel we use to detect when the reader is closed. One way of doing this would be to just reverse the order these two fields are defined in, but it is better to be clear that the drop order matters by using an explicit drop handler.

With this fix, we know that the second channel will be polled _at most_ once when closed, because subsequent calls to `poll_write` will check the first channel before polling the second.

See also sagebind/isahc#295.
  • Loading branch information
sagebind authored Jan 30, 2021
1 parent 3a12de6 commit 8ca411f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
25 changes: 23 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,33 @@ jobs:
env:
RUST_BACKTRACE: 1
steps:
- uses: actions/checkout@master
- uses: actions/checkout@v2

- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: "1.39.0"
toolchain: stable
default: true

- run: cargo test

check:
strategy:
matrix:
platform:
- ubuntu-latest
- macos-latest
- windows-latest
runs-on: ${{ matrix.platform }}
env:
RUST_BACKTRACE: 1
steps:
- uses: actions/checkout@v2

- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: "1.41.1"
default: true

- run: cargo check
24 changes: 17 additions & 7 deletions src/pipe/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ impl AsyncBufRead for Reader {
}
}

impl Drop for Reader {
fn drop(&mut self) {
// Ensure we close the primary stream first before the pool stream so
// that the writer knows the pipe is closed before trying to poll the
// pool channel.
self.buf_stream_rx.close();
self.buf_pool_tx.close_channel();
}
}

/// Writing half of a chunked pipe.
pub(crate) struct Writer {
/// A channel of chunks to send to the reader.
Expand All @@ -184,18 +194,18 @@ impl AsyncWrite for Writer {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
// If the reading end of the pipe is closed then return an error now,
// otherwise we'd be spending time writing the entire buffer only to
// discover that it is closed afterward.
if self.buf_stream_tx.is_closed() {
return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
}

// Do not send empty buffers through the rotation.
if buf.is_empty() {
return Poll::Ready(Ok(0));
}

// If the pipe is closed then return prematurely, otherwise we'd be
// spending time writing the entire buffer only to discover that it is
// closed afterward.
if self.buf_stream_tx.is_closed() {
return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
}

// Attempt to grab an available buffer to write the chunk to.
match Pin::new(&mut self.buf_pool_rx).poll_next(cx) {
// Wait for the reader to finish reading a chunk.
Expand Down
2 changes: 1 addition & 1 deletion tests/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ fn pipe_lots_of_data() {
}

#[quickcheck]
fn read_write_chunks_random(chunks: u16) {
fn read_write_chunks_random(chunks: u8) {
block_on(async {
let data = [0; 8192];
let (mut reader, mut writer) = pipe();
Expand Down

0 comments on commit 8ca411f

Please sign in to comment.