diff --git a/Cargo.toml b/Cargo.toml index 919d239..b751256 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sluice" -version = "0.4.0" +version = "0.4.1" authors = ["Stephen M. Coakley "] edition = "2018" description = "Efficient ring buffer for byte buffers, FIFO queues, and SPSC channels" @@ -15,10 +15,13 @@ license = "MIT" nightly = [] [dependencies] -futures-preview = "0.3.0-alpha.17" +futures-channel-preview = "0.3.0-alpha.17" +futures-core-preview = "0.3.0-alpha.17" +futures-io-preview = "0.3.0-alpha.17" [dev-dependencies] criterion = "0.2" +futures-preview = "0.3.0-alpha.17" [[bench]] name = "pipe" diff --git a/src/pipe/chunked.rs b/src/pipe/chunked.rs index 5d34597..a1b2b09 100644 --- a/src/pipe/chunked.rs +++ b/src/pipe/chunked.rs @@ -20,11 +20,12 @@ //! that happen during reads and writes are occasional reallocation for each //! individual vector to fit larger chunks of bytes that don't already fit. -use futures::channel::mpsc; -use futures::prelude::*; -use futures::task::*; +use futures_channel::mpsc; +use futures_core::Stream; +use futures_io::{AsyncRead, AsyncWrite}; use std::io::{self, Cursor}; use std::pin::Pin; +use std::task::{Context, Poll}; /// Create a new chunked pipe with room for a fixed number of chunks. /// @@ -79,7 +80,7 @@ impl AsyncRead for Reader { let mut chunk = match self.chunk.take() { Some(chunk) => chunk, - None => match self.buf_stream_rx.poll_next_unpin(cx) { + None => match Pin::new(&mut self.buf_stream_rx).poll_next(cx) { // Wait for a new chunk to be delivered. Poll::Pending => return Poll::Pending, @@ -156,7 +157,7 @@ impl AsyncWrite for Writer { } // Attempt to grab an available buffer to write the chunk to. - match self.buf_pool_rx.poll_next_unpin(cx) { + match Pin::new(&mut self.buf_pool_rx).poll_next(cx) { // Wait for the reader to finish reading a chunk. Poll::Pending => Poll::Pending, @@ -197,6 +198,8 @@ impl AsyncWrite for Writer { #[cfg(all(test, feature = "nightly"))] mod tests { use futures::executor::block_on; + use futures::prelude::*; + use futures::task::noop_waker; use super::*; #[test] @@ -234,7 +237,7 @@ mod tests { let waker = noop_waker(); let mut context = Context::from_waker(&waker); - let (mut reader, mut writer) = new(2); + let (reader, mut writer) = new(2); drop(reader); diff --git a/src/pipe/mod.rs b/src/pipe/mod.rs index bfc89a5..10c4916 100644 --- a/src/pipe/mod.rs +++ b/src/pipe/mod.rs @@ -3,11 +3,11 @@ //! Pipes are like byte-oriented channels that implement I/O traits for reading //! and writing. -use futures::prelude::*; +use futures_io::{AsyncRead, AsyncWrite}; use std::fmt; use std::io; use std::pin::Pin; -use std::task::*; +use std::task::{Context, Poll}; mod chunked;