diff --git a/core-client/Cargo.toml b/core-client/Cargo.toml index 7bf00a1aa..a0e7953fe 100644 --- a/core-client/Cargo.toml +++ b/core-client/Cargo.toml @@ -19,16 +19,14 @@ categories = [ ] [features] -tls = ["jsonrpc-client-transports/tls", "futures01"] -http = ["jsonrpc-client-transports/http", "futures01"] -ws = ["jsonrpc-client-transports/ws", "futures01"] -ipc = ["jsonrpc-client-transports/ipc", "futures01"] +tls = ["jsonrpc-client-transports/tls"] +http = ["jsonrpc-client-transports/http"] +ws = ["jsonrpc-client-transports/ws"] +ipc = ["jsonrpc-client-transports/ipc"] arbitrary_precision = ["jsonrpc-client-transports/arbitrary_precision"] [dependencies] jsonrpc-client-transports = { version = "16.0", path = "./transports", default-features = false } -# Only for client transports, should be removed when we fully transition to futures=0.3 -futures01 = { version = "0.1", package = "futures", optional = true } futures = { version = "0.3", features = [ "compat" ] } [badges] diff --git a/core-client/src/lib.rs b/core-client/src/lib.rs index c7294c871..81b8b462e 100644 --- a/core-client/src/lib.rs +++ b/core-client/src/lib.rs @@ -9,6 +9,3 @@ pub use futures; pub use jsonrpc_client_transports::*; - -#[cfg(feature = "futures01")] -pub use futures01; diff --git a/core-client/transports/Cargo.toml b/core-client/transports/Cargo.toml index a1804b8c1..2cad8f8ad 100644 --- a/core-client/transports/Cargo.toml +++ b/core-client/transports/Cargo.toml @@ -21,23 +21,22 @@ categories = [ [features] default = ["http", "tls", "ws"] tls = ["hyper-tls", "http"] -http = ["hyper", "futures01"] +http = ["hyper", "tokio/full"] ws = [ "websocket", "tokio", - "futures01", + "futures/compat" ] ipc = [ "parity-tokio-ipc", "jsonrpc-server-utils", "tokio", - "futures01", ] arbitrary_precision = ["serde_json/arbitrary_precision", "jsonrpc-core/arbitrary_precision"] [dependencies] derive_more = "0.99" -futures = { version = "0.3", features = [ "compat" ] } +futures = "0.3" jsonrpc-core = { version = "16.0", path = "../../core" } jsonrpc-pubsub = { version = "16.0", path = "../../pubsub" } log = "0.4" @@ -45,12 +44,11 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" url = "1.7" -futures01 = { version = "0.1.26", package = "futures", optional = true } -hyper = { version = "0.12", optional = true } -hyper-tls = { version = "0.3.2", optional = true } +hyper = { version = "0.13", optional = true } +hyper-tls = { version = "0.4", optional = true } jsonrpc-server-utils = { version = "16.0", path = "../../server-utils", optional = true } -parity-tokio-ipc = { version = "0.2", optional = true } -tokio = { version = "0.1", optional = true } +parity-tokio-ipc = { version = "0.8", optional = true } +tokio = { version = "0.2", optional = true } websocket = { version = "0.24", optional = true } [dev-dependencies] diff --git a/core-client/transports/src/transports/http.rs b/core-client/transports/src/transports/http.rs index b4b7cb9ae..df518340a 100644 --- a/core-client/transports/src/transports/http.rs +++ b/core-client/transports/src/transports/http.rs @@ -4,75 +4,52 @@ use super::RequestBuilder; use crate::{RpcChannel, RpcError, RpcMessage, RpcResult}; -use futures::{Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; -use hyper::{http, rt, Client, Request, Uri}; +use futures::{future, Future, FutureExt, StreamExt, TryFutureExt}; +use hyper::{http, Client, Request, Uri}; /// Create a HTTP Client -pub fn connect(url: &str) -> impl Future> +pub async fn connect(url: &str) -> RpcResult where TClient: From, { - let (sender, receiver) = futures::channel::oneshot::channel(); - let url = url.to_owned(); - - std::thread::spawn(move || { - let connect = rt::lazy(move || { - do_connect(&url) - .map(|client| { - if sender.send(client).is_err() { - panic!("The caller did not wait for the server."); - } - Ok(()) - }) - .compat() - }); - rt::run(connect); - }); + let url: Uri = url.parse().map_err(|e| RpcError::Other(Box::new(e)))?; - receiver.map(|res| res.expect("Server closed prematurely.").map(TClient::from)) -} + let (client_api, client_worker) = do_connect(url).await; + tokio::spawn(client_worker); -fn do_connect(url: &str) -> impl Future> { - use futures::future::ready; + Ok(TClient::from(client_api)) +} +async fn do_connect(url: Uri) -> (RpcChannel, impl Future) { let max_parallel = 8; - let url: Uri = match url.parse() { - Ok(url) => url, - Err(e) => return ready(Err(RpcError::Other(Box::new(e)))), - }; #[cfg(feature = "tls")] - let connector = match hyper_tls::HttpsConnector::new(4) { - Ok(connector) => connector, - Err(e) => return ready(Err(RpcError::Other(Box::new(e)))), - }; + let connector = hyper_tls::HttpsConnector::new(); #[cfg(feature = "tls")] let client = Client::builder().build::<_, hyper::Body>(connector); #[cfg(not(feature = "tls"))] let client = Client::new(); - + // Keep track of internal request IDs when building subsequent requests let mut request_builder = RequestBuilder::new(); let (sender, receiver) = futures::channel::mpsc::unbounded(); - use futures01::{Future, Stream}; let fut = receiver - .map(Ok) - .compat() .filter_map(move |msg: RpcMessage| { - let (request, sender) = match msg { + future::ready(match msg { RpcMessage::Call(call) => { let (_, request) = request_builder.call_request(&call); - (request, Some(call.sender)) + Some((request, Some(call.sender))) } - RpcMessage::Notify(notify) => (request_builder.notification(¬ify), None), + RpcMessage::Notify(notify) => Some((request_builder.notification(¬ify), None)), RpcMessage::Subscribe(_) => { log::warn!("Unsupported `RpcMessage` type `Subscribe`."); - return None; + None } - }; - + }) + }) + .map(move |(request, sender)| { let request = Request::post(&url) .header( http::header::CONTENT_TYPE, @@ -85,46 +62,42 @@ fn do_connect(url: &str) -> impl Future> { .body(request.into()) .expect("Uri and request headers are valid; qed"); - Some(client.request(request).then(move |response| Ok((response, sender)))) + client + .request(request) + .then(|response| async move { (response, sender) }) }) .buffer_unordered(max_parallel) - .for_each(|(result, sender)| { - use futures01::future::{ - self, - Either::{A, B}, - }; - let future = match result { + .for_each(|(response, sender)| async { + let result = match response { Ok(ref res) if !res.status().is_success() => { log::trace!("http result status {}", res.status()); - A(future::err(RpcError::Client(format!( + Err(RpcError::Client(format!( "Unexpected response status code: {}", res.status() - )))) + ))) + } + Err(err) => Err(RpcError::Other(Box::new(err))), + Ok(res) => { + hyper::body::to_bytes(res.into_body()) + .map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e))) + .await } - Ok(res) => B(res - .into_body() - .map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e))) - .concat2()), - Err(err) => A(future::err(RpcError::Other(Box::new(err)))), }; - future.then(|result| { - if let Some(sender) = sender { - let response = result - .and_then(|response| { - let response_str = String::from_utf8_lossy(response.as_ref()).into_owned(); - super::parse_response(&response_str) - }) - .and_then(|r| r.1); - if let Err(err) = sender.send(response) { - log::warn!("Error resuming asynchronous request: {:?}", err); - } + + if let Some(sender) = sender { + let response = result + .and_then(|response| { + let response_str = String::from_utf8_lossy(response.as_ref()).into_owned(); + super::parse_response(&response_str) + }) + .and_then(|r| r.1); + if let Err(err) = sender.send(response) { + log::warn!("Error resuming asynchronous request: {:?}", err); } - Ok(()) - }) + } }); - rt::spawn(fut.map_err(|e: RpcError| log::error!("RPC Client error: {:?}", e))); - ready(Ok(sender.into())) + (sender.into(), fut) } #[cfg(test)] @@ -218,7 +191,7 @@ mod tests { Ok(()) as RpcResult<_> }; - futures::executor::block_on(run).unwrap(); + tokio::runtime::Runtime::new().unwrap().block_on(run).unwrap(); } #[test] @@ -227,18 +200,16 @@ mod tests { // given let server = TestServer::serve(id); - let (tx, rx) = std::sync::mpsc::channel(); // when - let run = async move { + let run = async { let client: TestClient = connect(&server.uri).await.unwrap(); client.notify(12).unwrap(); - tx.send(()).unwrap(); }; - let pool = futures::executor::ThreadPool::builder().pool_size(1).create().unwrap(); - pool.spawn_ok(run); - rx.recv().unwrap(); + tokio::runtime::Runtime::new().unwrap().block_on(run); + // Ensure that server has not been moved into runtime + drop(server); } #[test] @@ -249,7 +220,8 @@ mod tests { let invalid_uri = "invalid uri"; // when - let res: RpcResult = futures::executor::block_on(connect(invalid_uri)); + let fut = connect(invalid_uri); + let res: RpcResult = tokio::runtime::Runtime::new().unwrap().block_on(fut); // then assert_matches!( @@ -271,7 +243,7 @@ mod tests { let client: TestClient = connect(&server.uri).await?; client.fail().await }; - let res = futures::executor::block_on(run); + let res = tokio::runtime::Runtime::new().unwrap().block_on(run); // then if let Err(RpcError::JsonRpcError(err)) = res { @@ -312,6 +284,6 @@ mod tests { Ok(()) as RpcResult<_> }; - futures::executor::block_on(run).unwrap(); + tokio::runtime::Runtime::new().unwrap().block_on(run).unwrap(); } } diff --git a/core-client/transports/src/transports/ipc.rs b/core-client/transports/src/transports/ipc.rs index 4ad2ff429..d50022597 100644 --- a/core-client/transports/src/transports/ipc.rs +++ b/core-client/transports/src/transports/ipc.rs @@ -3,42 +3,34 @@ use crate::transports::duplex::duplex; use crate::{RpcChannel, RpcError}; -use futures::compat::{Sink01CompatExt, Stream01CompatExt}; -use futures::StreamExt; -use futures01::prelude::*; +use futures::{SinkExt, StreamExt, TryStreamExt}; use jsonrpc_server_utils::codecs::StreamCodec; -use parity_tokio_ipc::IpcConnection; +use jsonrpc_server_utils::tokio; +use jsonrpc_server_utils::tokio_util::codec::Decoder as _; +use parity_tokio_ipc::Endpoint; use std::path::Path; -use tokio::codec::Decoder; -use tokio::runtime::Runtime; /// Connect to a JSON-RPC IPC server. -pub fn connect, Client: From>( - path: P, -) -> impl futures::Future> { - let rt = Runtime::new().unwrap(); - #[allow(deprecated)] - let reactor = rt.reactor().clone(); - async move { - let connection = IpcConnection::connect(path, &reactor).map_err(|e| RpcError::Other(e.into()))?; - let (sink, stream) = StreamCodec::stream_incoming().framed(connection).split(); - let sink = sink.sink_map_err(|e| RpcError::Other(e.into())); - let stream = stream.map_err(|e| log::error!("IPC stream error: {}", e)); - - let (client, sender) = duplex( - Box::pin(sink.sink_compat()), - Box::pin( - stream - .compat() - .take_while(|x| futures::future::ready(x.is_ok())) - .map(|x| x.expect("Stream is closed upon first error.")), - ), - ); - - tokio::spawn(futures::compat::Compat::new(client).map_err(|_| unreachable!())); - - Ok(sender.into()) - } +pub async fn connect, Client: From>(path: P) -> Result { + let connection = Endpoint::connect(path) + .await + .map_err(|e| RpcError::Other(Box::new(e)))?; + let (sink, stream) = StreamCodec::stream_incoming().framed(connection).split(); + let sink = sink.sink_map_err(|e| RpcError::Other(Box::new(e))); + let stream = stream.map_err(|e| log::error!("IPC stream error: {}", e)); + + let (client, sender) = duplex( + Box::pin(sink), + Box::pin( + stream + .take_while(|x| futures::future::ready(x.is_ok())) + .map(|x| x.expect("Stream is closed upon first error.")), + ), + ); + + tokio::spawn(client); + + Ok(sender.into()) } #[cfg(test)] @@ -52,13 +44,10 @@ mod tests { #[test] fn should_call_one() { - let mut rt = Runtime::new().unwrap(); - #[allow(deprecated)] - let reactor = rt.reactor().clone(); let sock_path = dummy_endpoint(); let mut io = IoHandler::new(); - io.add_method("greeting", |params| { + io.add_method("greeting", |params| async { let map_obj = match params { Params::Map(obj) => obj, _ => return Err(Error::invalid_params("missing object")), @@ -69,66 +58,57 @@ mod tests { }; Ok(Value::String(format!("Hello {}!", name))) }); - let builder = ServerBuilder::new(io).event_loop_executor(rt.executor()); - let server = builder.start(&sock_path).expect("Couldn't open socket"); - - let client: RawClient = rt.block_on(connect(sock_path, &reactor).unwrap()).unwrap(); - let mut map = Map::new(); - map.insert("name".to_string(), "Jeffry".into()); - let fut = client.call_method("greeting", Params::Map(map)); - - // FIXME: it seems that IPC server on Windows won't be polled with - // default I/O reactor, work around with sending stop signal which polls - // the server (https://github.com/paritytech/jsonrpc/pull/459) - server.close(); - - match rt.block_on(fut) { - Ok(val) => assert_eq!(&val, "Hello Jeffry!"), - Err(err) => panic!("IPC RPC call failed: {}", err), - } - rt.shutdown_now().wait().unwrap(); + let builder = ServerBuilder::new(io); + let _server = builder.start(&sock_path).expect("Couldn't open socket"); + + let client_fut = async move { + let client: RawClient = connect(sock_path).await.unwrap(); + let mut map = Map::new(); + map.insert("name".to_string(), "Jeffry".into()); + let fut = client.call_method("greeting", Params::Map(map)); + + match fut.await { + Ok(val) => assert_eq!(&val, "Hello Jeffry!"), + Err(err) => panic!("IPC RPC call failed: {}", err), + } + }; + tokio::runtime::Runtime::new().unwrap().block_on(client_fut); } #[test] fn should_fail_without_server() { - let rt = Runtime::new().unwrap(); - #[allow(deprecated)] - let reactor = rt.reactor(); - - match connect::<_, RawClient>(dummy_endpoint(), reactor) { - Err(..) => {} - Ok(..) => panic!("Should not be able to connect to an IPC socket that's not open"), - } - rt.shutdown_now().wait().unwrap(); + let test_fut = async move { + match connect::<_, RawClient>(dummy_endpoint()).await { + Err(..) => {} + Ok(..) => panic!("Should not be able to connect to an IPC socket that's not open"), + } + }; + + tokio::runtime::Runtime::new().unwrap().block_on(test_fut); } #[test] fn should_handle_server_error() { - let mut rt = Runtime::new().unwrap(); - #[allow(deprecated)] - let reactor = rt.reactor().clone(); let sock_path = dummy_endpoint(); let mut io = IoHandler::new(); - io.add_method("greeting", |_params| Err(Error::invalid_params("test error"))); - let builder = ServerBuilder::new(io).event_loop_executor(rt.executor()); - let server = builder.start(&sock_path).expect("Couldn't open socket"); - - let client: RawClient = rt.block_on(connect(sock_path, &reactor).unwrap()).unwrap(); - let mut map = Map::new(); - map.insert("name".to_string(), "Jeffry".into()); - let fut = client.call_method("greeting", Params::Map(map)); - - // FIXME: it seems that IPC server on Windows won't be polled with - // default I/O reactor, work around with sending stop signal which polls - // the server (https://github.com/paritytech/jsonrpc/pull/459) - server.close(); - - match rt.block_on(fut) { - Err(RpcError::JsonRpcError(err)) => assert_eq!(err.code, ErrorCode::InvalidParams), - Ok(_) => panic!("Expected the call to fail"), - _ => panic!("Unexpected error type"), - } - rt.shutdown_now().wait().unwrap(); + io.add_method("greeting", |_params| async { Err(Error::invalid_params("test error")) }); + let builder = ServerBuilder::new(io); + let _server = builder.start(&sock_path).expect("Couldn't open socket"); + + let client_fut = async move { + let client: RawClient = connect(sock_path).await.unwrap(); + let mut map = Map::new(); + map.insert("name".to_string(), "Jeffry".into()); + let fut = client.call_method("greeting", Params::Map(map)); + + match fut.await { + Err(RpcError::JsonRpcError(err)) => assert_eq!(err.code, ErrorCode::InvalidParams), + Ok(_) => panic!("Expected the call to fail"), + _ => panic!("Unexpected error type"), + } + }; + + tokio::runtime::Runtime::new().unwrap().block_on(client_fut); } } diff --git a/core-client/transports/src/transports/ws.rs b/core-client/transports/src/transports/ws.rs index d74d5df44..976d48748 100644 --- a/core-client/transports/src/transports/ws.rs +++ b/core-client/transports/src/transports/ws.rs @@ -1,16 +1,18 @@ //! JSON-RPC websocket client implementation. -use crate::{RpcChannel, RpcError}; -use futures01::prelude::*; -use log::info; use std::collections::VecDeque; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::{RpcChannel, RpcError}; use websocket::{ClientBuilder, OwnedMessage}; /// Connect to a JSON-RPC websocket server. /// -/// Uses an unbuffered channel to queue outgoing rpc messages. +/// Uses an unbounded channel to queue outgoing rpc messages. /// /// Returns `Err` if the `url` is invalid. -pub fn try_connect(url: &str) -> Result, RpcError> +pub fn try_connect(url: &str) -> Result>, RpcError> where T: From, { @@ -20,40 +22,47 @@ where /// Connect to a JSON-RPC websocket server. /// -/// Uses an unbuffered channel to queue outgoing rpc messages. -pub fn connect(url: &url::Url) -> impl futures::Future> +/// Uses an unbounded channel to queue outgoing rpc messages. +pub fn connect(url: &url::Url) -> impl Future> where T: From, { let client_builder = ClientBuilder::from_url(url); - let fut = do_connect(client_builder); - futures::compat::Compat01As03::new(fut) + do_connect(client_builder) } -fn do_connect(client_builder: ClientBuilder) -> impl Future +fn do_connect(client_builder: ClientBuilder) -> impl Future> where T: From, { + use futures::compat::{Future01CompatExt, Sink01CompatExt, Stream01CompatExt}; + use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt}; + use websocket::futures::Stream; + client_builder .async_connect(None) - .map(|(client, _)| { - use futures::{StreamExt, TryFutureExt}; + .compat() + .map_err(|error| RpcError::Other(Box::new(error))) + .map_ok(|(client, _)| { let (sink, stream) = client.split(); + + let sink = sink.sink_compat().sink_map_err(|e| RpcError::Other(Box::new(e))); + let stream = stream.compat().map_err(|e| RpcError::Other(Box::new(e))); let (sink, stream) = WebsocketClient::new(sink, stream).split(); let (sink, stream) = ( - Box::pin(futures::compat::Compat01As03Sink::new(sink)), + Box::pin(sink), Box::pin( - futures::compat::Compat01As03::new(stream) + stream .take_while(|x| futures::future::ready(x.is_ok())) .map(|x| x.expect("Stream is closed upon first error.")), ), ); let (rpc_client, sender) = super::duplex(sink, stream); - let rpc_client = rpc_client.compat().map_err(|error| log::error!("{:?}", error)); + let rpc_client = rpc_client.map_err(|error| log::error!("{:?}", error)); tokio::spawn(rpc_client); + sender.into() }) - .map_err(|error| RpcError::Other(Box::new(error))) } struct WebsocketClient { @@ -64,8 +73,8 @@ struct WebsocketClient { impl WebsocketClient where - TSink: Sink, - TStream: Stream, + TSink: futures::Sink + Unpin, + TStream: futures::Stream> + Unpin, TError: std::error::Error + Send + 'static, { pub fn new(sink: TSink, stream: TStream) -> Self { @@ -75,67 +84,116 @@ where queue: VecDeque::new(), } } + + // Drains the internal buffer and attempts to forward as much of the items + // as possible to the underlying sink + fn try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = Pin::into_inner(self); + + match Pin::new(&mut this.sink).poll_ready(cx) { + Poll::Ready(value) => value?, + Poll::Pending => return Poll::Pending, + } + + while let Some(item) = this.queue.pop_front() { + Pin::new(&mut this.sink).start_send(item)?; + + if !this.queue.is_empty() { + match Pin::new(&mut this.sink).poll_ready(cx) { + Poll::Ready(value) => value?, + Poll::Pending => return Poll::Pending, + } + } + } + + Poll::Ready(Ok(())) + } } -impl Sink for WebsocketClient +// This mostly forwards to the underlying sink but also adds an unbounded queue +// for when the underlying sink is incapable of receiving more items. +// See https://docs.rs/futures-util/0.3.8/futures_util/sink/struct.Buffer.html +// for the variant with a fixed-size buffer. +impl futures::Sink for WebsocketClient where - TSink: Sink, - TStream: Stream, - TError: std::error::Error + Send + 'static, + TSink: futures::Sink + Unpin, + TStream: futures::Stream> + Unpin, { - type SinkItem = String; - type SinkError = RpcError; + type Error = RpcError; - fn start_send(&mut self, request: Self::SinkItem) -> Result, Self::SinkError> { - self.queue.push_back(OwnedMessage::Text(request)); - Ok(AsyncSink::Ready) + fn start_send(mut self: Pin<&mut Self>, request: String) -> Result<(), Self::Error> { + let request = OwnedMessage::Text(request); + + if self.queue.is_empty() { + let this = Pin::into_inner(self); + Pin::new(&mut this.sink).start_send(request) + } else { + self.queue.push_back(request); + Ok(()) + } } - fn poll_complete(&mut self) -> Result, Self::SinkError> { - loop { - match self.queue.pop_front() { - Some(request) => match self.sink.start_send(request) { - Ok(AsyncSink::Ready) => continue, - Ok(AsyncSink::NotReady(request)) => { - self.queue.push_front(request); - break; - } - Err(error) => return Err(RpcError::Other(Box::new(error))), - }, - None => break, - } + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = Pin::into_inner(self); + + if this.queue.is_empty() { + return Pin::new(&mut this.sink).poll_ready(cx); + } + + let _ = Pin::new(this).try_empty_buffer(cx)?; + + Poll::Ready(Ok(())) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = Pin::into_inner(self); + + match Pin::new(&mut *this).try_empty_buffer(cx) { + Poll::Ready(value) => value?, + Poll::Pending => return Poll::Pending, + } + debug_assert!(this.queue.is_empty()); + + Pin::new(&mut this.sink).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = Pin::into_inner(self); + + match Pin::new(&mut *this).try_empty_buffer(cx) { + Poll::Ready(value) => value?, + Poll::Pending => return Poll::Pending, } - self.sink - .poll_complete() - .map_err(|error| RpcError::Other(Box::new(error))) + debug_assert!(this.queue.is_empty()); + + Pin::new(&mut this.sink).poll_close(cx) } } -impl Stream for WebsocketClient +impl futures::Stream for WebsocketClient where - TSink: Sink, - TStream: Stream, - TError: std::error::Error + Send + 'static, + TSink: futures::Sink + Unpin, + TStream: futures::Stream> + Unpin, { - type Item = String; - type Error = RpcError; + type Item = Result; - fn poll(&mut self) -> Result>, Self::Error> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = Pin::into_inner(self); loop { - match self.stream.poll() { - Ok(Async::Ready(Some(message))) => match message { - OwnedMessage::Text(data) => return Ok(Async::Ready(Some(data))), - OwnedMessage::Binary(data) => info!("server sent binary data {:?}", data), - OwnedMessage::Ping(p) => self.queue.push_front(OwnedMessage::Pong(p)), + match Pin::new(&mut this.stream).poll_next(cx) { + Poll::Ready(Some(Ok(message))) => match message { + OwnedMessage::Text(data) => return Poll::Ready(Some(Ok(data))), + OwnedMessage::Binary(data) => log::info!("server sent binary data {:?}", data), + OwnedMessage::Ping(p) => this.queue.push_front(OwnedMessage::Pong(p)), OwnedMessage::Pong(_) => {} - OwnedMessage::Close(c) => self.queue.push_front(OwnedMessage::Close(c)), + OwnedMessage::Close(c) => this.queue.push_front(OwnedMessage::Close(c)), }, - Ok(Async::Ready(None)) => { + Poll::Ready(None) => { // TODO try to reconnect (#411). - return Ok(Async::Ready(None)); + return Poll::Ready(None); } - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(error) => return Err(RpcError::Other(Box::new(error))), + Poll::Pending => return Poll::Pending, + Poll::Ready(Some(Err(error))) => return Poll::Ready(Some(Err(RpcError::Other(Box::new(error))))), } } } diff --git a/derive/examples/client-local.rs b/derive/examples/client-local.rs index 09a8c0148..a0fdf6fe1 100644 --- a/derive/examples/client-local.rs +++ b/derive/examples/client-local.rs @@ -50,8 +50,8 @@ fn main() { futures::pin_mut!(server); futures::select! { - server = server => {}, - client = client => {}, + _server = server => {}, + _client = client => {}, } }); } diff --git a/http/Cargo.toml b/http/Cargo.toml index fdccef242..e4356f294 100644 --- a/http/Cargo.toml +++ b/http/Cargo.toml @@ -11,9 +11,8 @@ repository = "https://github.com/paritytech/jsonrpc" version = "16.0.0" [dependencies] -futures01 = { version = "0.1", package = "futures" } -futures03 = { version = "0.3", package = "futures", features = ["compat"] } -hyper = "0.12" +futures = "0.3" +hyper = "0.13" jsonrpc-core = { version = "16.0", path = "../core" } jsonrpc-server-utils = { version = "16.0", path = "../server-utils" } log = "0.4" diff --git a/http/src/handler.rs b/http/src/handler.rs index b6abb796f..0466ee844 100644 --- a/http/src/handler.rs +++ b/http/src/handler.rs @@ -1,6 +1,9 @@ use crate::WeakRpc; +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; +use std::task::{self, Poll}; use std::{fmt, mem, str}; use hyper::header::{self, HeaderMap, HeaderValue}; @@ -10,7 +13,6 @@ use crate::jsonrpc::serde_json; use crate::jsonrpc::{self as core, middleware, Metadata, Middleware}; use crate::response::Response; use crate::server_utils::cors; -use futures01::{Async, Future, Poll, Stream}; use crate::{utils, AllowedHosts, CorsDomains, RequestMiddleware, RequestMiddlewareAction, RestApi}; @@ -57,17 +59,21 @@ impl> ServerHandler { } } -impl> Service for ServerHandler +impl> Service> for ServerHandler where S::Future: Unpin, S::CallFuture: Unpin, + M: Unpin, { - type ReqBody = Body; - type ResBody = Body; + type Response = hyper::Response; type Error = hyper::Error; type Future = Handler; - fn call(&mut self, request: hyper::Request) -> Self::Future { + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { + task::Poll::Ready(Ok(())) + } + + fn call(&mut self, request: hyper::Request) -> Self::Future { let is_host_allowed = utils::is_host_allowed(&request, &self.allowed_hosts); let action = self.middleware.on_request(request); @@ -118,27 +124,25 @@ where pub enum Handler> { Rpc(RpcHandler), Err(Option), - Middleware(Box, Error = hyper::Error> + Send>), + Middleware(Pin>> + Send>>), } impl> Future for Handler where S::Future: Unpin, S::CallFuture: Unpin, + M: Unpin, { - type Item = hyper::Response; - type Error = hyper::Error; - - fn poll(&mut self) -> Poll { - match *self { - Handler::Rpc(ref mut handler) => handler.poll(), - Handler::Middleware(ref mut middleware) => middleware.poll(), - Handler::Err(ref mut response) => Ok(Async::Ready( - response - .take() - .expect("Response always Some initialy. Returning `Ready` so will never be polled again; qed") - .into(), - )), + type Output = hyper::Result>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + match Pin::into_inner(self) { + Handler::Rpc(ref mut handler) => Pin::new(handler).poll(cx), + Handler::Middleware(ref mut middleware) => Pin::new(middleware).poll(cx), + Handler::Err(ref mut response) => Poll::Ready(Ok(response + .take() + .expect("Response always Some initialy. Returning `Ready` so will never be polled again; qed") + .into())), } } } @@ -181,8 +185,8 @@ enum RpcHandlerState { metadata: M, }, Writing(Response), - Waiting(Box, Error = ()> + Send>), - WaitingForResponse(Box + Send>), + Waiting(Pin> + Send>>), + WaitingForResponse(Pin + Send>>), Done, } @@ -220,12 +224,14 @@ impl> Future for RpcHandler where S::Future: Unpin, S::CallFuture: Unpin, + M: Unpin, { - type Item = hyper::Response; - type Error = hyper::Error; + type Output = hyper::Result>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let this = Pin::into_inner(self); - fn poll(&mut self) -> Poll { - let new_state = match mem::replace(&mut self.state, RpcHandlerState::Done) { + let new_state = match mem::replace(&mut this.state, RpcHandlerState::Done) { RpcHandlerState::ReadingHeaders { request, cors_domains, @@ -234,19 +240,19 @@ where keep_alive, } => { // Read cors header - self.cors_allow_origin = utils::cors_allow_origin(&request, &cors_domains); - self.cors_allow_headers = utils::cors_allow_headers(&request, &cors_headers); - self.keep_alive = utils::keep_alive(&request, keep_alive); - self.is_options = *request.method() == Method::OPTIONS; + this.cors_allow_origin = utils::cors_allow_origin(&request, &cors_domains); + this.cors_allow_headers = utils::cors_allow_headers(&request, &cors_headers); + this.keep_alive = utils::keep_alive(&request, keep_alive); + this.is_options = *request.method() == Method::OPTIONS; // Read other headers - RpcPollState::Ready(self.read_headers(request, continue_on_invalid_cors)) + RpcPollState::Ready(this.read_headers(request, continue_on_invalid_cors)) } RpcHandlerState::ReadingBody { body, request, metadata, uri, - } => match self.process_body(body, request, uri, metadata) { + } => match this.process_body(body, request, uri, metadata, cx) { Err(BodyError::Utf8(ref e)) => { let mesg = format!("utf-8 encoding error at byte {} in request body", e.valid_up_to()); let resp = Response::bad_request(mesg); @@ -256,19 +262,18 @@ where let resp = Response::too_large("request body size exceeds allowed maximum"); RpcPollState::Ready(RpcHandlerState::Writing(resp)) } - Err(BodyError::Hyper(e)) => return Err(e), + Err(BodyError::Hyper(e)) => return Poll::Ready(Err(e)), Ok(state) => state, }, - RpcHandlerState::ProcessRest { uri, metadata } => self.process_rest(uri, metadata)?, - RpcHandlerState::ProcessHealth { method, metadata } => self.process_health(method, metadata)?, - RpcHandlerState::WaitingForResponse(mut waiting) => match waiting.poll() { - Ok(Async::Ready(response)) => RpcPollState::Ready(RpcHandlerState::Writing(response)), - Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::WaitingForResponse(waiting)), - Err(e) => RpcPollState::Ready(RpcHandlerState::Writing(Response::internal_error(format!("{:?}", e)))), + RpcHandlerState::ProcessRest { uri, metadata } => this.process_rest(uri, metadata)?, + RpcHandlerState::ProcessHealth { method, metadata } => this.process_health(method, metadata)?, + RpcHandlerState::WaitingForResponse(mut waiting) => match Pin::new(&mut waiting).poll(cx) { + Poll::Ready(response) => RpcPollState::Ready(RpcHandlerState::Writing(response)), + Poll::Pending => RpcPollState::NotReady(RpcHandlerState::WaitingForResponse(waiting)), }, RpcHandlerState::Waiting(mut waiting) => { - match waiting.poll() { - Ok(Async::Ready(response)) => { + match Pin::new(&mut waiting).poll(cx) { + Poll::Ready(response) => { RpcPollState::Ready(RpcHandlerState::Writing(match response { // Notification, just return empty response. None => Response::ok(String::new()), @@ -276,10 +281,7 @@ where Some(result) => Response::ok(format!("{}\n", result)), })) } - Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::Waiting(waiting)), - Err(e) => { - RpcPollState::Ready(RpcHandlerState::Writing(Response::internal_error(format!("{:?}", e)))) - } + Poll::Pending => RpcPollState::NotReady(RpcHandlerState::Waiting(waiting)), } } state => RpcPollState::NotReady(state), @@ -289,25 +291,25 @@ where match new_state { RpcHandlerState::Writing(res) => { let mut response: hyper::Response = res.into(); - let cors_allow_origin = mem::replace(&mut self.cors_allow_origin, cors::AllowCors::Invalid); - let cors_allow_headers = mem::replace(&mut self.cors_allow_headers, cors::AllowCors::Invalid); + let cors_allow_origin = mem::replace(&mut this.cors_allow_origin, cors::AllowCors::Invalid); + let cors_allow_headers = mem::replace(&mut this.cors_allow_headers, cors::AllowCors::Invalid); Self::set_response_headers( response.headers_mut(), - self.is_options, - self.cors_max_age, + this.is_options, + this.cors_max_age, cors_allow_origin.into(), cors_allow_headers.into(), - self.keep_alive, + this.keep_alive, ); - Ok(Async::Ready(response)) + Poll::Ready(Ok(response)) } state => { - self.state = state; + this.state = state; if is_ready { - self.poll() + Pin::new(this).poll(cx) } else { - Ok(Async::NotReady) + Poll::Pending } } } @@ -394,7 +396,6 @@ where fn process_health(&self, method: String, metadata: M) -> Result, hyper::Error> { use self::core::types::{Call, Failure, Id, MethodCall, Output, Params, Request, Success, Version}; - use futures03::{FutureExt, TryFutureExt}; // Create a request let call = Request::Single(Call::MethodCall(MethodCall { @@ -408,28 +409,28 @@ where Some(h) => h.handler.handle_rpc_request(call, metadata), None => return Ok(RpcPollState::Ready(RpcHandlerState::Writing(Response::closing()))), }; - let response = response.map(Ok).compat(); - Ok(RpcPollState::Ready(RpcHandlerState::WaitingForResponse(Box::new( - response.map(|res| match res { - Some(core::Response::Single(Output::Success(Success { result, .. }))) => { - let result = serde_json::to_string(&result).expect("Serialization of result is infallible;qed"); + Ok(RpcPollState::Ready(RpcHandlerState::WaitingForResponse(Box::pin( + async { + match response.await { + Some(core::Response::Single(Output::Success(Success { result, .. }))) => { + let result = serde_json::to_string(&result).expect("Serialization of result is infallible;qed"); - Response::ok(result) - } - Some(core::Response::Single(Output::Failure(Failure { error, .. }))) => { - let result = serde_json::to_string(&error).expect("Serialization of error is infallible;qed"); + Response::ok(result) + } + Some(core::Response::Single(Output::Failure(Failure { error, .. }))) => { + let result = serde_json::to_string(&error).expect("Serialization of error is infallible;qed"); - Response::service_unavailable(result) + Response::service_unavailable(result) + } + e => Response::internal_error(format!("Invalid response for health request: {:?}", e)), } - e => Response::internal_error(format!("Invalid response for health request: {:?}", e)), - }), + }, )))) } fn process_rest(&self, uri: hyper::Uri, metadata: M) -> Result, hyper::Error> { use self::core::types::{Call, Id, MethodCall, Params, Request, Value, Version}; - use futures03::{FutureExt, TryFutureExt}; // skip the initial / let mut it = uri.path().split('/').skip(1); @@ -456,11 +457,12 @@ where Some(h) => h.handler.handle_rpc_request(call, metadata), None => return Ok(RpcPollState::Ready(RpcHandlerState::Writing(Response::closing()))), }; - let response = response.map(Ok).compat(); - Ok(RpcPollState::Ready(RpcHandlerState::Waiting(Box::new(response.map( - |res| res.map(|x| serde_json::to_string(&x).expect("Serialization of response is infallible;qed")), - ))))) + Ok(RpcPollState::Ready(RpcHandlerState::Waiting(Box::pin(async { + response + .await + .map(|x| serde_json::to_string(&x).expect("Serialization of response is infallible;qed")) + })))) } fn process_body( @@ -469,10 +471,14 @@ where mut request: Vec, uri: Option, metadata: M, + cx: &mut task::Context<'_>, ) -> Result, BodyError> { + use futures::Stream; + loop { - match body.poll()? { - Async::Ready(Some(chunk)) => { + let pinned_body = Pin::new(&mut body); + match pinned_body.poll_next(cx)? { + Poll::Ready(Some(chunk)) => { if request .len() .checked_add(chunk.len()) @@ -483,8 +489,7 @@ where } request.extend_from_slice(&*chunk) } - Async::Ready(None) => { - use futures03::{FutureExt, TryFutureExt}; + Poll::Ready(None) => { if let (Some(uri), true) = (uri, request.is_empty()) { return Ok(RpcPollState::Ready(RpcHandlerState::ProcessRest { uri, metadata })); } @@ -503,10 +508,9 @@ where }; // Content is ready - let response = response.map(Ok).compat(); - return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(Box::new(response)))); + return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(Box::pin(response)))); } - Async::NotReady => { + Poll::Pending => { return Ok(RpcPollState::NotReady(RpcHandlerState::ReadingBody { body, request, diff --git a/http/src/lib.rs b/http/src/lib.rs index 9fc4a2a7c..a229ae509 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -35,8 +35,11 @@ mod response; mod tests; mod utils; +use std::convert::Infallible; +use std::future::Future; use std::io; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::{mpsc, Arc, Weak}; use std::thread; @@ -44,15 +47,15 @@ use parking_lot::Mutex; use crate::jsonrpc::MetaIoHandler; use crate::server_utils::reactor::{Executor, UninitializedExecutor}; -use futures01::sync::oneshot; -use futures01::{future, Future, Stream}; -use hyper::{server, Body}; +use futures::{channel::oneshot, future}; +use hyper::Body; use jsonrpc_core as jsonrpc; pub use crate::handler::ServerHandler; pub use crate::response::Response; pub use crate::server_utils::cors::{self, AccessControlAllowOrigin, AllowCors, Origin}; pub use crate::server_utils::hosts::{DomainsValidation, Host}; +pub use crate::server_utils::reactor::TaskExecutor; pub use crate::server_utils::{tokio, SuspendableStream}; pub use crate::utils::{cors_allow_headers, cors_allow_origin, is_host_allowed}; @@ -71,7 +74,7 @@ pub enum RequestMiddlewareAction { /// Should standard hosts validation be performed? should_validate_hosts: bool, /// a future for server response - response: Box, Error = hyper::Error> + Send>, + response: Pin>> + Send>>, }, } @@ -79,7 +82,7 @@ impl From for RequestMiddlewareAction { fn from(o: Response) -> Self { RequestMiddlewareAction::Respond { should_validate_hosts: true, - response: Box::new(future::ok(o.into())), + response: Box::pin(async { Ok(o.into()) }), } } } @@ -88,7 +91,7 @@ impl From> for RequestMiddlewareAction { fn from(response: hyper::Response) -> Self { RequestMiddlewareAction::Respond { should_validate_hosts: true, - response: Box::new(future::ok(response)), + response: Box::pin(async { Ok(response) }), } } } @@ -251,6 +254,7 @@ impl> ServerBuilder> ServerBuilder where S::Future: Unpin, S::CallFuture: Unpin, + M: Unpin, { /// Creates new `ServerBuilder` for given `IoHandler`. /// @@ -300,7 +305,7 @@ where /// Utilize existing event loop executor to poll RPC results. /// /// Applies only to 1 of the threads. Other threads will spawn their own Event Loops. - pub fn event_loop_executor(mut self, executor: tokio::runtime::TaskExecutor) -> Self { + pub fn event_loop_executor(mut self, executor: TaskExecutor) -> Self { self.executor = UninitializedExecutor::Shared(executor); self } @@ -519,7 +524,7 @@ fn serve>( mpsc::Sender>, oneshot::Sender<()>, ), - executor: tokio::runtime::TaskExecutor, + executor: TaskExecutor, addr: SocketAddr, cors_domains: CorsDomains, cors_max_age: Option, @@ -535,11 +540,10 @@ fn serve>( ) where S::Future: Unpin, S::CallFuture: Unpin, + M: Unpin, { let (shutdown_signal, local_addr_tx, done_tx) = signals; - executor.spawn({ - let handle = tokio::reactor::Handle::default(); - + executor.spawn(async move { let bind = move || { let listener = match addr { SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?, @@ -549,26 +553,37 @@ fn serve>( listener.reuse_address(true)?; listener.bind(&addr)?; let listener = listener.listen(1024)?; - let listener = tokio::net::TcpListener::from_std(listener, &handle)?; + let local_addr = listener.local_addr()?; + + // NOTE: Future-proof by explicitly setting the listener socket to + // non-blocking mode of operation (future Tokio/Hyper versions + // require for the callers to do that manually) + listener.set_nonblocking(true)?; + // HACK: See below. + #[cfg(windows)] + let raw_socket = std::os::windows::io::AsRawSocket::as_raw_socket(&listener); + #[cfg(not(windows))] + let raw_socket = (); + + let server_builder = + hyper::Server::from_tcp(listener).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; // Add current host to allowed headers. // NOTE: we need to use `l.local_addr()` instead of `addr` // it might be different! - let local_addr = listener.local_addr()?; - - Ok((listener, local_addr)) + Ok((server_builder, local_addr, raw_socket)) }; let bind_result = match bind() { - Ok((listener, local_addr)) => { + Ok((server_builder, local_addr, raw_socket)) => { // Send local address match local_addr_tx.send(Ok(local_addr)) { - Ok(_) => future::ok((listener, local_addr)), + Ok(_) => Ok((server_builder, local_addr, raw_socket)), Err(_) => { warn!( "Thread {:?} unable to reach receiver, closing server", thread::current().name() ); - future::err(()) + Err(()) } } } @@ -576,54 +591,55 @@ fn serve>( // Send error let _send_result = local_addr_tx.send(Err(err)); - future::err(()) + Err(()) } }; - bind_result - .and_then(move |(listener, local_addr)| { - let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr); - - let mut http = server::conn::Http::new(); - http.keep_alive(keep_alive); - let tcp_stream = SuspendableStream::new(listener.incoming()); - - tcp_stream - .map(move |socket| { - let service = ServerHandler::new( - jsonrpc_handler.downgrade(), - cors_domains.clone(), - cors_max_age, - allowed_headers.clone(), - allowed_hosts.clone(), - request_middleware.clone(), - rest_api, - health_api.clone(), - max_request_body_size, - keep_alive, - ); + let (server_builder, local_addr, _raw_socket) = bind_result?; + + let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr); + + let server_builder = server_builder + .http1_keepalive(keep_alive) + .tcp_nodelay(true) + // Explicitly attempt to recover from accept errors (e.g. too many + // files opened) instead of erroring out the entire server. + .tcp_sleep_on_accept_errors(true); + + let service_fn = hyper::service::make_service_fn(move |_addr_stream| { + let service = ServerHandler::new( + jsonrpc_handler.downgrade(), + cors_domains.clone(), + cors_max_age, + allowed_headers.clone(), + allowed_hosts.clone(), + request_middleware.clone(), + rest_api, + health_api.clone(), + max_request_body_size, + keep_alive, + ); + async { Ok::<_, Infallible>(service) } + }); + + let server = server_builder.serve(service_fn).with_graceful_shutdown(async { + if let Err(err) = shutdown_signal.await { + debug!("Shutdown signaller dropped, closing server: {:?}", err); + } + }); - tokio::spawn( - http.serve_connection(socket, service) - .map_err(|e| error!("Error serving connection: {:?}", e)) - .then(|_| Ok(())), - ) - }) - .for_each(|_| Ok(())) - .map_err(|e| { - warn!("Incoming streams error, closing sever: {:?}", e); - }) - .select(shutdown_signal.map_err(|e| { - debug!("Shutdown signaller dropped, closing server: {:?}", e); - })) - .map_err(|_| ()) - }) - .and_then(|(_, server)| { - // We drop the server first to prevent a situation where main thread terminates - // before the server is properly dropped (see #504 for more details) - drop(server); - done_tx.send(()) - }) + if let Err(err) = server.await { + error!("Error running HTTP server: {:?}", err); + } + + // FIXME: Work around TCP listener socket not being properly closed + // in mio v0.6. This runs the std::net::TcpListener's destructor, + // which closes the underlying OS socket. + // Remove this once we migrate to Tokio 1.0. + #[cfg(windows)] + let _: std::net::TcpListener = unsafe { std::os::windows::io::FromRawSocket::from_raw_socket(_raw_socket) }; + + done_tx.send(()) }); } @@ -654,8 +670,9 @@ impl CloseHandle { pub fn close(self) { if let Some(executors) = self.0.lock().take() { for (executor, closer) in executors { - executor.close(); + // First send shutdown signal so we can proceed with underlying select let _ = closer.send(()); + executor.close(); } } } @@ -692,9 +709,9 @@ impl Server { fn wait_internal(&mut self) { if let Some(receivers) = self.done.take() { - for receiver in receivers { - let _ = receiver.wait(); - } + // NOTE: Gracefully handle the case where we may wait on a *nested* + // local task pool (for now, wait on a dedicated, spawned thread) + let _ = std::thread::spawn(move || futures::executor::block_on(future::try_join_all(receivers))).join(); } } } diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml index ff38dee14..63678c33c 100644 --- a/ipc/Cargo.toml +++ b/ipc/Cargo.toml @@ -10,13 +10,12 @@ repository = "https://github.com/paritytech/jsonrpc" version = "16.0.0" [dependencies] -futures01 = { version = "0.1", package = "futures" } -futures03 = { version = "0.3", package = "futures", features = [ "compat" ] } +futures = "0.3" log = "0.4" -tokio-service = "0.1" +tower-service = "0.3" jsonrpc-core = { version = "16.0", path = "../core" } -jsonrpc-server-utils = { version = "16.0", path = "../server-utils" } -parity-tokio-ipc = "0.4" +jsonrpc-server-utils = { version = "16.0", path = "../server-utils", default-features = false } +parity-tokio-ipc = "0.8" parking_lot = "0.11.0" [dev-dependencies] @@ -24,7 +23,7 @@ env_logger = "0.7" lazy_static = "1.0" [target.'cfg(not(windows))'.dev-dependencies] -tokio-uds = "0.2" +tokio = { version = "0.2", default-features = false, features = ["uds", "time", "rt-threaded", "io-driver"] } [badges] travis-ci = { repository = "paritytech/jsonrpc", branch = "master"} diff --git a/ipc/src/meta.rs b/ipc/src/meta.rs index a1ae788d4..497eaf086 100644 --- a/ipc/src/meta.rs +++ b/ipc/src/meta.rs @@ -1,3 +1,5 @@ +use std::path::Path; + use crate::jsonrpc::futures::channel::mpsc; use crate::jsonrpc::Metadata; use crate::server_utils::session; @@ -7,7 +9,7 @@ pub struct RequestContext<'a> { /// Session ID pub session_id: session::SessionId, /// Remote UDS endpoint - pub endpoint_addr: &'a ::parity_tokio_ipc::RemoteId, + pub endpoint_addr: &'a Path, /// Direct pipe sender pub sender: mpsc::UnboundedSender, } diff --git a/ipc/src/select_with_weak.rs b/ipc/src/select_with_weak.rs index 409aa46cd..5a68aa258 100644 --- a/ipc/src/select_with_weak.rs +++ b/ipc/src/select_with_weak.rs @@ -1,10 +1,13 @@ -use futures01::stream::{Fuse, Stream}; -use futures01::{Async, Poll}; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use futures::stream::{Fuse, Stream}; pub trait SelectWithWeakExt: Stream { fn select_with_weak(self, other: S) -> SelectWithWeak where - S: Stream, + S: Stream, Self: Sized; } @@ -14,7 +17,7 @@ where { fn select_with_weak(self, other: S) -> SelectWithWeak where - S: Stream, + S: Stream, Self: Sized, { new(self, other) @@ -39,8 +42,9 @@ pub struct SelectWithWeak { fn new(stream1: S1, stream2: S2) -> SelectWithWeak where S1: Stream, - S2: Stream, + S2: Stream, { + use futures::StreamExt; SelectWithWeak { strong: stream1.fuse(), weak: stream2.fuse(), @@ -50,36 +54,36 @@ where impl Stream for SelectWithWeak where - S1: Stream, - S2: Stream, + S1: Stream + Unpin, + S2: Stream + Unpin, { type Item = S1::Item; - type Error = S1::Error; - fn poll(&mut self) -> Poll, S1::Error> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = Pin::into_inner(self); let mut checked_strong = false; loop { - if self.use_strong { - match self.strong.poll()? { - Async::Ready(Some(item)) => { - self.use_strong = false; - return Ok(Some(item).into()); + if this.use_strong { + match Pin::new(&mut this.strong).poll_next(cx) { + Poll::Ready(Some(item)) => { + this.use_strong = false; + return Poll::Ready(Some(item)); } - Async::Ready(None) => return Ok(None.into()), - Async::NotReady => { + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => { if !checked_strong { - self.use_strong = false; + this.use_strong = false; } else { - return Ok(Async::NotReady); + return Poll::Pending; } } } checked_strong = true; } else { - self.use_strong = true; - match self.weak.poll()? { - Async::Ready(Some(item)) => return Ok(Some(item).into()), - Async::Ready(None) | Async::NotReady => (), + this.use_strong = true; + match Pin::new(&mut this.strong).poll_next(cx) { + Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), + Poll::Ready(None) | Poll::Pending => (), } } } diff --git a/ipc/src/server.rs b/ipc/src/server.rs index 47aaa9f29..107150304 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -1,19 +1,19 @@ +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use crate::jsonrpc::futures::channel::mpsc; use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware}; use crate::meta::{MetaExtractor, NoopExtractor, RequestContext}; use crate::select_with_weak::SelectWithWeakExt; -use futures01::{future, sync::oneshot, Future, Sink, Stream}; +use futures::channel::oneshot; +use futures::StreamExt; use parity_tokio_ipc::Endpoint; use parking_lot::Mutex; -use tokio_service::{self, Service as TokioService}; +use tower_service::Service as _; -use crate::server_utils::{ - codecs, reactor, session, - tokio::{reactor::Handle, runtime::TaskExecutor}, - tokio_codec::Framed, -}; +use crate::server_utils::{codecs, reactor, reactor::TaskExecutor, session, tokio_util}; pub use parity_tokio_ipc::SecurityAttributes; @@ -30,22 +30,24 @@ impl> Service { } } -impl> tokio_service::Service for Service +impl> tower_service::Service for Service where S::Future: Unpin, S::CallFuture: Unpin, { - type Request = String; type Response = Option; - type Error = (); - type Future = Box, Error = ()> + Send>; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } - fn call(&self, req: Self::Request) -> Self::Future { - use futures03::{FutureExt, TryFutureExt}; + fn call(&mut self, req: String) -> Self::Future { + use futures::FutureExt; trace!(target: "ipc", "Received request: {}", req); - Box::new(self.handler.handle_request(&req, self.meta.clone()).map(Ok).compat()) + Box::pin(self.handler.handle_request(&req, self.meta.clone()).map(Ok)) } } @@ -55,7 +57,6 @@ pub struct ServerBuilder = middleware::Noop> meta_extractor: Arc>, session_stats: Option>, executor: reactor::UninitializedExecutor, - reactor: Option, incoming_separator: codecs::Separator, outgoing_separator: codecs::Separator, security_attributes: SecurityAttributes, @@ -92,7 +93,6 @@ where meta_extractor: Arc::new(extractor), session_stats: None, executor: reactor::UninitializedExecutor::Unspawned, - reactor: None, incoming_separator: codecs::Separator::Empty, outgoing_separator: codecs::Separator::default(), security_attributes: SecurityAttributes::empty(), @@ -106,12 +106,6 @@ where self } - /// Sets different event loop I/O reactor. - pub fn event_loop_reactor(mut self, reactor: Handle) -> Self { - self.reactor = Some(reactor); - self - } - /// Sets session metadata extractor. pub fn session_meta_extractor(mut self, meta_extractor: X) -> Self where @@ -149,7 +143,6 @@ where /// Creates a new server from the given endpoint. pub fn start(self, path: &str) -> std::io::Result { let executor = self.executor.initialize()?; - let reactor = self.reactor; let rpc_handler = self.handler; let endpoint_addr = path.to_owned(); let meta_extractor = self.meta_extractor; @@ -157,12 +150,13 @@ where let incoming_separator = self.incoming_separator; let outgoing_separator = self.outgoing_separator; let (stop_signal, stop_receiver) = oneshot::channel(); - let (start_signal, start_receiver) = oneshot::channel(); - let (wait_signal, wait_receiver) = oneshot::channel(); + // NOTE: These channels are only waited upon in synchronous fashion + let (start_signal, start_receiver) = std::sync::mpsc::channel(); + let (wait_signal, wait_receiver) = std::sync::mpsc::channel(); let security_attributes = self.security_attributes; let client_buffer_size = self.client_buffer_size; - executor.spawn(future::lazy(move || { + let fut = async move { let mut endpoint = Endpoint::new(endpoint_addr); endpoint.set_security_attributes(security_attributes); @@ -173,27 +167,21 @@ where } } - // Make sure to construct Handle::default() inside Tokio runtime - let reactor = if cfg!(windows) { - #[allow(deprecated)] - reactor.unwrap_or_else(Handle::current) - } else { - reactor.unwrap_or_else(Handle::default) - }; - - let connections = match endpoint.incoming(&reactor) { + let endpoint_addr = endpoint.path().to_owned(); + let connections = match endpoint.incoming() { Ok(connections) => connections, Err(e) => { start_signal .send(Err(e)) .expect("Cannot fail since receiver never dropped before receiving"); - return future::Either::A(future::ok(())); + return; } }; let mut id = 0u64; - let server = connections.map(move |(io_stream, remote_id)| { + use futures::TryStreamExt; + let server = connections.map_ok(move |io_stream| { id = id.wrapping_add(1); let session_id = id; let session_stats = session_stats.clone(); @@ -204,61 +192,56 @@ where let (sender, receiver) = mpsc::unbounded(); let meta = meta_extractor.extract(&RequestContext { - endpoint_addr: &remote_id, + endpoint_addr: endpoint_addr.as_ref(), session_id, sender, }); - let service = Service::new(rpc_handler.clone(), meta); - let (writer, reader) = Framed::new( - io_stream, - codecs::StreamCodec::new(incoming_separator.clone(), outgoing_separator.clone()), - ) - .split(); + let mut service = Service::new(rpc_handler.clone(), meta); + let codec = codecs::StreamCodec::new(incoming_separator.clone(), outgoing_separator.clone()); + let framed = tokio_util::codec::Decoder::framed(codec, io_stream); + let (writer, reader) = futures::StreamExt::split(framed); + let responses = reader - .map(move |req| { + .map_ok(move |req| { service .call(req) - .then(|result| match result { - Err(_) => future::ok(None), - Ok(some_result) => future::ok(some_result), - }) - .map_err(|_: ()| std::io::ErrorKind::Other.into()) + // Ignore service errors + .map(|x| Ok(x.ok().flatten())) }) - .buffer_unordered(client_buffer_size) - .filter_map(|x| x) + .try_buffer_unordered(client_buffer_size) + // Filter out previously ignored service errors as `None`s + .try_filter_map(|x| futures::future::ok(x)) // we use `select_with_weak` here, instead of `select`, to close the stream // as soon as the ipc pipe is closed - .select_with_weak(futures03::TryStreamExt::compat(futures03::StreamExt::map(receiver, Ok))); + .select_with_weak(receiver.map(Ok)); - let writer = writer.send_all(responses).then(move |_| { + responses.forward(writer).then(move |_| { trace!(target: "ipc", "Peer: service finished"); if let Some(stats) = session_stats.as_ref() { stats.close_session(session_id) } - Ok(()) - }); - writer + async { Ok(()) } + }) }); start_signal .send(Ok(())) .expect("Cannot fail since receiver never dropped before receiving"); + let stop = stop_receiver.map_err(|_| std::io::ErrorKind::Interrupted); + let stop = Box::pin(stop); - let stop = stop_receiver.map_err(|_| std::io::ErrorKind::Interrupted.into()); - future::Either::B( - server - .buffer_unordered(1024) - .for_each(|_| Ok(())) - .select(stop) - .map(|(_, server)| { - // We drop the server first to prevent a situation where main thread terminates - // before the server is properly dropped (see #504 for more details) - drop(server); - let _ = wait_signal.send(()); - }) - .map_err(|_| ()), - ) - })); + let server = server.try_buffer_unordered(1024).for_each(|_| async {}); + + let result = futures::future::select(Box::pin(server), stop).await; + // We drop the server first to prevent a situation where main thread terminates + // before the server is properly dropped (see #504 for more details) + drop(result); + let _ = wait_signal.send(()); + }; + + use futures::FutureExt; + let fut = Box::pin(fut.map(drop)); + executor.executor().spawn(fut); let handle = InnerHandles { executor: Some(executor), @@ -266,7 +249,8 @@ where path: path.to_owned(), }; - match start_receiver.wait().expect("Message should always be sent") { + use futures::TryFutureExt; + match start_receiver.recv().expect("Message should always be sent") { Ok(()) => Ok(Server { handles: Arc::new(Mutex::new(handle)), wait_handle: Some(wait_receiver), @@ -280,7 +264,7 @@ where #[derive(Debug)] pub struct Server { handles: Arc>, - wait_handle: Option>, + wait_handle: Option>, } impl Server { @@ -298,7 +282,9 @@ impl Server { /// Wait for the server to finish pub fn wait(mut self) { - self.wait_handle.take().map(|wait_receiver| wait_receiver.wait()); + if let Some(wait_receiver) = self.wait_handle.take() { + let _ = wait_receiver.recv(); + } } } @@ -342,13 +328,10 @@ impl CloseHandle { mod tests { use super::*; - use futures01::{Future, Sink, Stream}; use jsonrpc_core::Value; - use jsonrpc_server_utils::tokio::{self, timer::Delay}; - use jsonrpc_server_utils::tokio_codec::Decoder; + use std::os::unix::net::UnixStream; use std::thread; - use std::time::{self, Duration, Instant}; - use tokio_uds::UnixStream; + use std::time::{self, Duration}; fn server_builder() -> ServerBuilder { let mut io = MetaIoHandler::<()>::default(); @@ -363,17 +346,22 @@ mod tests { } fn dummy_request_str(path: &str, data: &str) -> String { - let stream_future = UnixStream::connect(path); - let reply = stream_future.and_then(|stream| { - let stream = codecs::StreamCodec::stream_incoming().framed(stream); - let reply = stream - .send(data.to_owned()) - .and_then(move |stream| stream.into_future().map_err(|(err, _)| err)) - .and_then(|(reply, _)| future::ok(reply.expect("there should be one reply"))); - reply - }); + use futures::SinkExt; + + let reply = async move { + use tokio::net::UnixStream; - reply.wait().expect("wait for reply") + let stream: UnixStream = UnixStream::connect(path).await?; + let codec = codecs::StreamCodec::stream_incoming(); + let mut stream = tokio_util::codec::Decoder::framed(codec, stream); + stream.send(data.to_owned()).await?; + let (reply, _) = stream.into_future().await; + + reply.expect("there should be one reply") + }; + + let mut rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(reply).expect("wait for reply") } #[test] @@ -395,7 +383,7 @@ mod tests { let path = "/tmp/test-ipc-30000"; let _server = run(path); - UnixStream::connect(path).wait().expect("Socket should connect"); + UnixStream::connect(path).expect("Socket should connect"); } #[test] @@ -403,7 +391,7 @@ mod tests { crate::logger::init_log(); let path = "/tmp/test-ipc-40000"; let server = run(path); - let (stop_signal, stop_receiver) = oneshot::channel(); + let (stop_signal, stop_receiver) = std::sync::mpsc::channel(); let t = thread::spawn(move || { let result = dummy_request_str( @@ -414,15 +402,13 @@ mod tests { }); t.join().unwrap(); - let _ = stop_receiver - .map(move |result: String| { - assert_eq!( - result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", - "Response does not exactly match the expected response", - ); - server.close(); - }) - .wait(); + let result = stop_receiver.recv().unwrap(); + + assert_eq!( + result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", + "Response does not exactly match the expected response", + ); + server.close(); } #[test] @@ -430,7 +416,7 @@ mod tests { crate::logger::init_log(); let path = "/tmp/test-ipc-45000"; let server = run(path); - let (stop_signal, stop_receiver) = futures01::sync::mpsc::channel(400); + let (stop_signal, stop_receiver) = futures::channel::mpsc::channel(400); let mut handles = Vec::new(); for _ in 0..4 { @@ -451,16 +437,20 @@ mod tests { handle.join().unwrap(); } - let _ = stop_receiver - .map(|result| { - assert_eq!( - result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", - "Response does not exactly match the expected response", - ); - }) - .take(400) - .collect() - .wait(); + thread::spawn(move || { + let fut = stop_receiver + .map(|result| { + assert_eq!( + result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}", + "Response does not exactly match the expected response", + ); + }) + .take(400) + .for_each(|_| async {}); + futures::executor::block_on(fut); + }) + .join() + .unwrap(); server.close(); } @@ -476,7 +466,7 @@ mod tests { "There should be no socket file left" ); assert!( - UnixStream::connect(path).wait().is_err(), + UnixStream::connect(path).is_err(), "Connection to the closed socket should fail" ); } @@ -521,16 +511,19 @@ mod tests { }); t.join().unwrap(); - let _ = stop_receiver - .map(move |result: String| { + thread::spawn(move || { + futures::executor::block_on(async move { + let result = stop_receiver.await.unwrap(); assert_eq!( result, huge_response_test_json(), "Response does not exactly match the expected response", ); server.close(); - }) - .wait(); + }); + }) + .join() + .unwrap(); } #[test] @@ -547,7 +540,7 @@ mod tests { } struct SessionEndExtractor { - drop_receivers: Arc>>>, + drop_receivers: Arc>>>, } impl MetaExtractor> for SessionEndExtractor { @@ -563,7 +556,7 @@ mod tests { crate::logger::init_log(); let path = "/tmp/test-ipc-30009"; - let (signal, receiver) = futures01::sync::mpsc::channel(16); + let (signal, receiver) = futures::channel::mpsc::channel(16); let session_metadata_extractor = SessionEndExtractor { drop_receivers: Arc::new(Mutex::new(signal)), }; @@ -572,15 +565,17 @@ mod tests { let builder = ServerBuilder::with_meta_extractor(io, session_metadata_extractor); let server = builder.start(path).expect("Server must run with no issues"); { - let _ = UnixStream::connect(path).wait().expect("Socket should connect"); + let _ = UnixStream::connect(path).expect("Socket should connect"); } - receiver - .into_future() - .map_err(|_| ()) - .and_then(|drop_receiver| drop_receiver.0.unwrap().map_err(|_| ())) - .wait() - .unwrap(); + thread::spawn(move || { + futures::executor::block_on(async move { + let (drop_receiver, ..) = receiver.into_future().await; + drop_receiver.unwrap().await.unwrap(); + }); + }) + .join() + .unwrap(); server.close(); } @@ -592,7 +587,7 @@ mod tests { let handle = server.close_handle(); handle.close(); assert!( - UnixStream::connect(path).wait().is_err(), + UnixStream::connect(path).is_err(), "Connection to the closed socket should fail" ); } @@ -614,23 +609,24 @@ mod tests { tx.send(true).expect("failed to report that the server has stopped"); }); - let delay = Delay::new(Instant::now() + Duration::from_millis(500)) - .map(|_| false) - .map_err(|err| panic!("{:?}", err)); - - let result_fut = rx.map_err(|_| ()).select(delay).then(move |result| match result { - Ok((result, _)) => { - assert_eq!(result, true, "Wait timeout exceeded"); - assert!( - UnixStream::connect(path).wait().is_err(), - "Connection to the closed socket should fail" - ); - Ok(()) + let mut rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async move { + let timeout = tokio::time::delay_for(Duration::from_millis(500)); + + match futures::future::select(rx, timeout).await { + futures::future::Either::Left((result, _)) => { + assert!(result.is_ok(), "Rx failed"); + assert_eq!(result, Ok(true), "Wait timeout exceeded"); + assert!( + UnixStream::connect(path).is_err(), + "Connection to the closed socket should fail" + ); + Ok(()) + } + futures::future::Either::Right(_) => Err("timed out"), } - Err(_) => Err(()), - }); - - tokio::run(result_fut); + }) + .unwrap(); } #[test] diff --git a/server-utils/Cargo.toml b/server-utils/Cargo.toml index 4324500ab..e40481835 100644 --- a/server-utils/Cargo.toml +++ b/server-utils/Cargo.toml @@ -11,14 +11,15 @@ repository = "https://github.com/paritytech/jsonrpc" version = "16.0.0" [dependencies] -bytes = "0.4" -futures01 = { version = "0.1", package = "futures" } +bytes = "0.5" +futures = "0.3" globset = "0.4" jsonrpc-core = { version = "16.0", path = "../core" } lazy_static = "1.1.0" log = "0.4" -tokio = { version = "0.1.15" } -tokio-codec = { version = "0.1" } +tokio = { version = "0.2", features = ["rt-threaded", "io-driver", "io-util", "time", "tcp"] } +tokio-util = { version = "0.3", features = ["codec"] } + unicase = "2.0" [badges] diff --git a/server-utils/src/lib.rs b/server-utils/src/lib.rs index e13342007..5c9f52d1a 100644 --- a/server-utils/src/lib.rs +++ b/server-utils/src/lib.rs @@ -9,7 +9,7 @@ extern crate log; extern crate lazy_static; pub use tokio; -pub use tokio_codec; +pub use tokio_util; pub mod cors; pub mod hosts; diff --git a/server-utils/src/reactor.rs b/server-utils/src/reactor.rs index 1d1917db3..df8afd408 100644 --- a/server-utils/src/reactor.rs +++ b/server-utils/src/reactor.rs @@ -6,15 +6,16 @@ //! that `tokio::runtime` can be multi-threaded. use std::io; -use tokio; -use futures01::Future; +use tokio::runtime; +/// Task executor for Tokio 0.2 runtime. +pub type TaskExecutor = tokio::runtime::Handle; /// Possibly uninitialized event loop executor. #[derive(Debug)] pub enum UninitializedExecutor { /// Shared instance of executor. - Shared(tokio::runtime::TaskExecutor), + Shared(TaskExecutor), /// Event Loop should be spawned by the transport. Unspawned, } @@ -42,28 +43,20 @@ impl UninitializedExecutor { #[derive(Debug)] pub enum Executor { /// Shared instance - Shared(tokio::runtime::TaskExecutor), + Shared(TaskExecutor), /// Spawned Event Loop Spawned(RpcEventLoop), } impl Executor { /// Get tokio executor associated with this event loop. - pub fn executor(&self) -> tokio::runtime::TaskExecutor { - match *self { + pub fn executor(&self) -> TaskExecutor { + match self { Executor::Shared(ref executor) => executor.clone(), Executor::Spawned(ref eloop) => eloop.executor(), } } - /// Spawn a future onto the Tokio runtime. - pub fn spawn(&self, future: F) - where - F: Future + Send + 'static, - { - self.executor().spawn(future) - } - /// Closes underlying event loop (if any!). pub fn close(self) { if let Executor::Spawned(eloop) = self { @@ -82,9 +75,9 @@ impl Executor { /// A handle to running event loop. Dropping the handle will cause event loop to finish. #[derive(Debug)] pub struct RpcEventLoop { - executor: tokio::runtime::TaskExecutor, - close: Option>, - handle: Option, + executor: TaskExecutor, + close: Option>, + runtime: Option, } impl Drop for RpcEventLoop { @@ -101,36 +94,46 @@ impl RpcEventLoop { /// Spawns a new named thread with the `EventLoop`. pub fn with_name(name: Option) -> io::Result { - let (stop, stopped) = futures01::oneshot(); + let (stop, stopped) = futures::channel::oneshot::channel(); - let mut tb = tokio::runtime::Builder::new(); + let mut tb = runtime::Builder::new(); tb.core_threads(1); + tb.threaded_scheduler(); + tb.enable_all(); if let Some(name) = name { - tb.name_prefix(name); + tb.thread_name(name); } - let mut runtime = tb.build()?; - let executor = runtime.executor(); - let terminate = futures01::empty().select(stopped).map(|_| ()).map_err(|_| ()); - runtime.spawn(terminate); - let handle = runtime.shutdown_on_idle(); + let runtime = tb.build()?; + let executor = runtime.handle().to_owned(); + + runtime.spawn(async { + let _ = stopped.await; + }); Ok(RpcEventLoop { executor, close: Some(stop), - handle: Some(handle), + runtime: Some(runtime), }) } /// Get executor for this event loop. - pub fn executor(&self) -> tokio::runtime::TaskExecutor { - self.executor.clone() + pub fn executor(&self) -> runtime::Handle { + self.runtime + .as_ref() + .expect("Runtime is only None if we're being dropped; qed") + .handle() + .clone() } /// Blocks current thread and waits until the event loop is finished. pub fn wait(mut self) -> Result<(), ()> { - self.handle.take().ok_or(())?.wait() + // Dropping Tokio 0.2 runtime waits for all spawned tasks to terminate + let runtime = self.runtime.take().ok_or(())?; + drop(runtime); + Ok(()) } /// Finishes this event loop. diff --git a/server-utils/src/stream_codec.rs b/server-utils/src/stream_codec.rs index d7cb268b9..4edef5add 100644 --- a/server-utils/src/stream_codec.rs +++ b/server-utils/src/stream_codec.rs @@ -1,6 +1,5 @@ use bytes::BytesMut; use std::{io, str}; -use tokio_codec::{Decoder, Encoder}; /// Separator for enveloping messages in streaming codecs #[derive(Debug, Clone)] @@ -48,7 +47,7 @@ fn is_whitespace(byte: u8) -> bool { } } -impl Decoder for StreamCodec { +impl tokio_util::codec::Decoder for StreamCodec { type Item = String; type Error = io::Error; @@ -56,7 +55,7 @@ impl Decoder for StreamCodec { if let Separator::Byte(separator) = self.incoming_separator { if let Some(i) = buf.as_ref().iter().position(|&b| b == separator) { let line = buf.split_to(i); - buf.split_to(1); + let _ = buf.split_to(1); match str::from_utf8(&line.as_ref()) { Ok(s) => Ok(Some(s.to_string())), @@ -108,8 +107,7 @@ impl Decoder for StreamCodec { } } -impl Encoder for StreamCodec { - type Item = String; +impl tokio_util::codec::Encoder for StreamCodec { type Error = io::Error; fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> { @@ -127,7 +125,7 @@ mod tests { use super::StreamCodec; use bytes::{BufMut, BytesMut}; - use tokio_codec::Decoder; + use tokio_util::codec::Decoder; #[test] fn simple_encode() { diff --git a/server-utils/src/suspendable_stream.rs b/server-utils/src/suspendable_stream.rs index f563cdebe..8d3179da9 100644 --- a/server-utils/src/suspendable_stream.rs +++ b/server-utils/src/suspendable_stream.rs @@ -1,7 +1,10 @@ +use std::future::Future; use std::io; -use std::time::{Duration, Instant}; -use tokio::prelude::*; -use tokio::timer::Delay; +use std::pin::Pin; +use std::task::Poll; +use std::time::Duration; + +use tokio::time::Delay; /// `Incoming` is a stream of incoming sockets /// Polling the stream may return a temporary io::Error (for instance if we can't open the connection because of "too many open files" limit) @@ -33,38 +36,37 @@ impl SuspendableStream { } } -impl Stream for SuspendableStream +impl futures::Stream for SuspendableStream where - S: Stream, + S: futures::Stream> + Unpin, { type Item = I; - type Error = (); - fn poll(&mut self) -> Result>, ()> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { loop { - if let Some(mut timeout) = self.timeout.take() { - match timeout.poll() { - Ok(Async::Ready(_)) => {} - Ok(Async::NotReady) => { - self.timeout = Some(timeout); - return Ok(Async::NotReady); - } - Err(err) => { - warn!("Timeout error {:?}", err); - task::current().notify(); - return Ok(Async::NotReady); - } + if let Some(timeout) = self.timeout.as_mut() { + match Pin::new(timeout).poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(()) => {} } } - match self.stream.poll() { - Ok(item) => { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { + if self.next_delay > self.initial_delay { + self.next_delay = self.initial_delay; + } + return Poll::Ready(None); + } + Poll::Ready(Some(Ok(item))) => { if self.next_delay > self.initial_delay { self.next_delay = self.initial_delay; } - return Ok(item); + + return Poll::Ready(Some(item)); } - Err(ref err) => { + Poll::Ready(Some(Err(ref err))) => { if connection_error(err) { warn!("Connection Error: {:?}", err); continue; @@ -76,7 +78,7 @@ where }; debug!("Error accepting connection: {}", err); debug!("The server will stop accepting connections for {:?}", self.next_delay); - self.timeout = Some(Delay::new(Instant::now() + self.next_delay)); + self.timeout = Some(tokio::time::delay_for(self.next_delay)); } } } diff --git a/stdio/Cargo.toml b/stdio/Cargo.toml index ef90c2093..44fcf9592 100644 --- a/stdio/Cargo.toml +++ b/stdio/Cargo.toml @@ -10,15 +10,14 @@ repository = "https://github.com/paritytech/jsonrpc" version = "16.0.0" [dependencies] -futures = { version = "0.3", features = [ "compat" ] } +futures = "0.3" jsonrpc-core = { version = "16.0", path = "../core" } log = "0.4" -tokio = "0.1.7" -tokio-codec = "0.1.0" -tokio-io = "0.1.7" -tokio-stdin-stdout = "0.1.4" +tokio = { version = "0.2", features = ["io-std", "io-driver", "io-util"] } +tokio-util = { version = "0.3", features = ["codec"] } [dev-dependencies] +tokio = { version = "0.2", features = ["rt-core", "macros"] } lazy_static = "1.0" env_logger = "0.7" diff --git a/stdio/examples/stdio.rs b/stdio/examples/stdio.rs index 974ea9df4..bd2bc2caa 100644 --- a/stdio/examples/stdio.rs +++ b/stdio/examples/stdio.rs @@ -1,9 +1,11 @@ use jsonrpc_stdio_server::jsonrpc_core::*; use jsonrpc_stdio_server::ServerBuilder; -fn main() { +#[tokio::main] +async fn main() { let mut io = IoHandler::default(); io.add_sync_method("say_hello", |_params| Ok(Value::String("hello".to_owned()))); - ServerBuilder::new(io).build(); + let server = ServerBuilder::new(io).build(); + server.await; } diff --git a/stdio/src/lib.rs b/stdio/src/lib.rs index 9d118f563..79918c44a 100644 --- a/stdio/src/lib.rs +++ b/stdio/src/lib.rs @@ -5,29 +5,31 @@ //! use jsonrpc_stdio_server::ServerBuilder; //! use jsonrpc_stdio_server::jsonrpc_core::*; //! -//! fn main() { +//! #[tokio::main] +//! async fn main() { //! let mut io = IoHandler::default(); //! io.add_sync_method("say_hello", |_params| { //! Ok(Value::String("hello".to_owned())) //! }); //! -//! ServerBuilder::new(io).build(); +//! let server = ServerBuilder::new(io).build(); +//! server.await; //! } //! ``` #![deny(missing_docs)] -use tokio; -use tokio_stdin_stdout; +use std::future::Future; +use std::sync::Arc; + #[macro_use] extern crate log; pub use jsonrpc_core; +pub use tokio; use jsonrpc_core::{MetaIoHandler, Metadata, Middleware}; -use std::sync::Arc; -use tokio::prelude::{Future, Stream}; -use tokio_codec::{FramedRead, FramedWrite, LinesCodec}; +use tokio_util::codec::{FramedRead, LinesCodec}; /// Stdio server builder pub struct ServerBuilder = jsonrpc_core::NoopMiddleware> { @@ -47,31 +49,45 @@ where } } + /// Returns a server future that needs to be polled in order to make progress. + /// /// Will block until EOF is read or until an error occurs. /// The server reads from STDIN line-by-line, one request is taken /// per line and each response is written to STDOUT on a new line. - pub fn build(&self) { - let stdin = tokio_stdin_stdout::stdin(0); - let stdout = tokio_stdin_stdout::stdout(0).make_sendable(); + pub fn build(&self) -> impl Future + 'static { + let handler = self.handler.clone(); - let framed_stdin = FramedRead::new(stdin, LinesCodec::new()); - let framed_stdout = FramedWrite::new(stdout, LinesCodec::new()); + async move { + let stdin = tokio::io::stdin(); + let mut stdout = tokio::io::stdout(); - let handler = self.handler.clone(); - let future = framed_stdin - .and_then(move |line| Self::process(&handler, line).map_err(|_| unreachable!())) - .forward(framed_stdout) - .map(|_| ()) - .map_err(|e| panic!("{:?}", e)); + let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); - tokio::run(future); + use futures::StreamExt; + while let Some(request) = framed_stdin.next().await { + match request { + Ok(line) => { + let res = Self::process(&handler, line).await; + let mut sanitized = res.replace('\n', ""); + sanitized.push('\n'); + use tokio::io::AsyncWriteExt; + if let Err(e) = stdout.write_all(sanitized.as_bytes()).await { + log::warn!("Error writing response: {:?}", e); + } + } + Err(e) => { + log::warn!("Error reading line: {:?}", e); + } + } + } + } } /// Process a request asynchronously - fn process(io: &Arc>, input: String) -> impl Future + Send { - use jsonrpc_core::futures::{FutureExt, TryFutureExt}; + fn process(io: &Arc>, input: String) -> impl Future + Send { + use jsonrpc_core::futures::FutureExt; let f = io.handle_request(&input, Default::default()); - f.map(Ok).compat().map(move |result| match result { + f.map(move |result| match result { Some(res) => res, None => { info!("JSON RPC request produced no response: {:?}", input); diff --git a/tcp/Cargo.toml b/tcp/Cargo.toml index 365d8667a..cda8542d2 100644 --- a/tcp/Cargo.toml +++ b/tcp/Cargo.toml @@ -10,14 +10,11 @@ repository = "https://github.com/paritytech/jsonrpc" version = "16.0.0" [dependencies] -futures01 = { version = "0.1", package = "futures" } -# TODO remove when we no longer need compat (use jsonrpc-core re-export instead) -futures03 = { version = "0.3", features = ["compat"], package = "futures" } jsonrpc-core = { version = "16.0", path = "../core" } jsonrpc-server-utils = { version = "16.0", path = "../server-utils" } log = "0.4" parking_lot = "0.11.0" -tokio-service = "0.1" +tower-service = "0.3" [dev-dependencies] lazy_static = "1.0" diff --git a/tcp/src/dispatch.rs b/tcp/src/dispatch.rs index f12121a51..664e97a33 100644 --- a/tcp/src/dispatch.rs +++ b/tcp/src/dispatch.rs @@ -1,27 +1,26 @@ -use std; use std::collections::HashMap; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::Arc; +use std::task::Poll; -use crate::jsonrpc::futures::{self as futures03, channel::mpsc, StreamExt}; -use futures01::{Async, Poll, Stream}; +use crate::futures::{channel::mpsc, Stream}; use parking_lot::Mutex; pub type SenderChannels = Mutex>>; -pub struct PeerMessageQueue { +pub struct PeerMessageQueue { up: S, - receiver: Option + Send>>, + receiver: Option>, _addr: SocketAddr, } -impl PeerMessageQueue { +impl PeerMessageQueue { pub fn new(response_stream: S, receiver: mpsc::UnboundedReceiver, addr: SocketAddr) -> Self { - let receiver = futures03::compat::Compat::new(receiver.map(|v| Ok(v))); PeerMessageQueue { up: response_stream, - receiver: Some(Box::new(receiver)), + receiver: Some(receiver), _addr: addr, } } @@ -78,9 +77,8 @@ impl Dispatcher { } } -impl> Stream for PeerMessageQueue { - type Item = String; - type Error = std::io::Error; +impl> + Unpin> Stream for PeerMessageQueue { + type Item = std::io::Result; // The receiver will never return `Ok(Async::Ready(None))` // Because the sender is kept in `SenderChannels` and it will never be dropped until `the stream` is resolved. @@ -90,32 +88,32 @@ impl> Stream for PeerMessageQue // However, it is possible to have a race between `poll` and `push_work` if the connection is dropped. // Therefore, the receiver is then dropped when the connection is dropped and an error is propagated when // a `send` attempt is made on that channel. - fn poll(&mut self) -> Poll, std::io::Error> { + fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { // check if we have response pending + let this = Pin::into_inner(self); - let up_closed = match self.up.poll() { - Ok(Async::Ready(Some(item))) => return Ok(Async::Ready(Some(item))), - Ok(Async::Ready(None)) => true, - Ok(Async::NotReady) => false, - err => return err, + let up_closed = match Pin::new(&mut this.up).poll_next(cx) { + Poll::Ready(Some(Ok(item))) => return Poll::Ready(Some(Ok(item))), + Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))), + Poll::Ready(None) => true, + Poll::Pending => false, }; - let rx = match &mut self.receiver { + let mut rx = match &mut this.receiver { None => { debug_assert!(up_closed); - return Ok(Async::Ready(None)); + return Poll::Ready(None); } Some(rx) => rx, }; - match rx.poll() { - Ok(Async::Ready(Some(item))) => Ok(Async::Ready(Some(item))), - Ok(Async::Ready(None)) | Ok(Async::NotReady) if up_closed => { - self.receiver = None; - Ok(Async::Ready(None)) + match Pin::new(&mut rx).poll_next(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some(Ok(item))), + Poll::Ready(None) | Poll::Pending if up_closed => { + this.receiver = None; + Poll::Ready(None) } - Ok(Async::Ready(None)) | Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_) => Err(std::io::Error::new(std::io::ErrorKind::Other, "MPSC error")), + Poll::Ready(None) | Poll::Pending => Poll::Pending, } } } diff --git a/tcp/src/lib.rs b/tcp/src/lib.rs index 5eda8e7e1..b78e4be54 100644 --- a/tcp/src/lib.rs +++ b/tcp/src/lib.rs @@ -42,6 +42,8 @@ mod tests; use jsonrpc_core as jsonrpc; +pub(crate) use crate::jsonrpc::futures; + pub use self::server_utils::{codecs::Separator, tokio}; pub use crate::dispatch::{Dispatcher, PushMessageError}; pub use crate::meta::{MetaExtractor, RequestContext}; diff --git a/tcp/src/server.rs b/tcp/src/server.rs index 6e7d3693d..55a2b8d4a 100644 --- a/tcp/src/server.rs +++ b/tcp/src/server.rs @@ -1,14 +1,13 @@ -use std; +use std::io; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::Arc; -use tokio_service::Service as TokioService; - -use futures01::sync::oneshot; -use futures01::{future, Future, Sink, Stream}; +use tower_service::Service as _; +use crate::futures::{self, future}; use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware}; -use crate::server_utils::{codecs, reactor, tokio, tokio_codec::Framed, SuspendableStream}; +use crate::server_utils::{codecs, reactor, tokio, tokio_util::codec::Framed, SuspendableStream}; use crate::dispatch::{Dispatcher, PeerMessageQueue, SenderChannels}; use crate::meta::{MetaExtractor, NoopExtractor, RequestContext}; @@ -60,7 +59,7 @@ where } /// Utilize existing event loop executor. - pub fn event_loop_executor(mut self, handle: tokio::runtime::TaskExecutor) -> Self { + pub fn event_loop_executor(mut self, handle: reactor::TaskExecutor) -> Self { self.executor = reactor::UninitializedExecutor::Shared(handle); self } @@ -79,7 +78,7 @@ where } /// Starts a new server - pub fn start(self, addr: &SocketAddr) -> std::io::Result { + pub fn start(self, addr: &SocketAddr) -> io::Result { let meta_extractor = self.meta_extractor.clone(); let rpc_handler = self.handler.clone(); let channels = self.channels.clone(); @@ -87,25 +86,26 @@ where let outgoing_separator = self.outgoing_separator; let address = addr.to_owned(); let (tx, rx) = std::sync::mpsc::channel(); - let (stop_tx, stop_rx) = oneshot::channel(); + let (stop_tx, stop_rx) = futures::channel::oneshot::channel(); let executor = self.executor.initialize()?; - executor.spawn(future::lazy(move || { - let start = move || { - let listener = tokio::net::TcpListener::bind(&address)?; - let connections = SuspendableStream::new(listener.incoming()); + use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt}; + executor.executor().spawn(async move { + let start = async { + let listener = tokio::net::TcpListener::bind(&address).await?; + let connections = SuspendableStream::new(listener); - let server = connections.map(move |socket| { + let server = connections.map(|socket| { let peer_addr = match socket.peer_addr() { Ok(addr) => addr, Err(e) => { warn!(target: "tcp", "Unable to determine socket peer address, ignoring connection {}", e); - return future::Either::A(future::ok(())); + return future::Either::Left(async { io::Result::Ok(()) }); } }; trace!(target: "tcp", "Accepted incoming connection from {}", &peer_addr); - let (sender, receiver) = crate::jsonrpc::futures::channel::mpsc::unbounded(); + let (sender, receiver) = futures::channel::mpsc::unbounded(); let context = RequestContext { peer_addr, @@ -113,31 +113,33 @@ where }; let meta = meta_extractor.extract(&context); - let service = Service::new(peer_addr, rpc_handler.clone(), meta); - let (writer, reader) = Framed::new( + let mut service = Service::new(peer_addr, rpc_handler.clone(), meta); + let (mut writer, reader) = Framed::new( socket, codecs::StreamCodec::new(incoming_separator.clone(), outgoing_separator.clone()), ) .split(); - let responses = reader.and_then(move |req| { - service.call(req).then(|response| match response { - Err(e) => { - warn!(target: "tcp", "Error while processing request: {:?}", e); - future::ok(String::new()) - } - Ok(None) => { - trace!(target: "tcp", "JSON RPC request produced no response"); - future::ok(String::new()) - } - Ok(Some(response_data)) => { - trace!(target: "tcp", "Sent response: {}", &response_data); - future::ok(response_data) - } - }) - }); - - let peer_message_queue = { + // Work around https://github.com/rust-lang/rust/issues/64552 by boxing the stream type + let responses: Pin> + Send>> = + Box::pin(reader.and_then(move |req| { + service.call(req).then(|response| match response { + Err(e) => { + warn!(target: "tcp", "Error while processing request: {:?}", e); + future::ok(String::new()) + } + Ok(None) => { + trace!(target: "tcp", "JSON RPC request produced no response"); + future::ok(String::new()) + } + Ok(Some(response_data)) => { + trace!(target: "tcp", "Sent response: {}", &response_data); + future::ok(response_data) + } + }) + })); + + let mut peer_message_queue = { let mut channels = channels.lock(); channels.insert(peer_addr, sender.clone()); @@ -145,42 +147,33 @@ where }; let shared_channels = channels.clone(); - let writer = writer.send_all(peer_message_queue).then(move |_| { + let writer = async move { + writer.send_all(&mut peer_message_queue).await?; trace!(target: "tcp", "Peer {}: service finished", peer_addr); let mut channels = shared_channels.lock(); channels.remove(&peer_addr); Ok(()) - }); + }; - future::Either::B(writer) + future::Either::Right(writer) }); Ok(server) }; - let stop = stop_rx.map_err(|_| ()); - match start() { + match start.await { Ok(server) => { tx.send(Ok(())).expect("Rx is blocking parent thread."); - future::Either::A( - server - .buffer_unordered(1024) - .for_each(|_| Ok(())) - .select(stop) - .map(|_| ()) - .map_err(|(e, _)| { - error!("Error while executing the server: {:?}", e); - }), - ) + let server = server.buffer_unordered(1024).for_each(|_| async { () }); + + future::select(Box::pin(server), stop_rx).await; } Err(e) => { tx.send(Err(e)).expect("Rx is blocking parent thread."); - future::Either::B(stop.map_err(|e| { - error!("Error while executing the server: {:?}", e); - })) + let _ = stop_rx.await; } } - })); + }); let res = rx.recv().expect("Response is always sent before tx is dropped."); @@ -199,7 +192,7 @@ where /// TCP Server handle pub struct Server { executor: Option, - stop: Option>, + stop: Option>, } impl Server { diff --git a/tcp/src/service.rs b/tcp/src/service.rs index cb0f4b7b2..558b2feba 100644 --- a/tcp/src/service.rs +++ b/tcp/src/service.rs @@ -1,9 +1,11 @@ +use std::future::Future; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; -use crate::jsonrpc::futures::FutureExt; +use crate::futures; use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware}; -use futures01::Future; pub struct Service = middleware::Noop> { handler: Arc>, @@ -21,26 +23,27 @@ impl> Service { } } -impl> tokio_service::Service for Service +impl> tower_service::Service for Service where S::Future: Unpin, S::CallFuture: Unpin, { // These types must match the corresponding protocol types: - type Request = String; type Response = Option; - // For non-streaming protocols, service errors are always io::Error type Error = (); // The future for computing the response; box it for simplicity. - type Future = Box, Error = ()> + Send>; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } // Produce a future for computing a response from a request. - fn call(&self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: String) -> Self::Future { + use futures::FutureExt; trace!(target: "tcp", "Accepted request from peer {}: {}", &self.peer_addr, req); - Box::new(futures03::compat::Compat::new( - self.handler.handle_request(&req, self.meta.clone()).map(|v| Ok(v)), - )) + Box::pin(self.handler.handle_request(&req, self.meta.clone()).map(Ok)) } } diff --git a/tcp/src/tests.rs b/tcp/src/tests.rs index f06293847..b95e1b288 100644 --- a/tcp/src/tests.rs +++ b/tcp/src/tests.rs @@ -1,12 +1,14 @@ use std::net::{Shutdown, SocketAddr}; use std::str::FromStr; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; -use futures01::{future, Future}; use jsonrpc_core::{MetaIoHandler, Metadata, Value}; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; -use crate::server_utils::tokio::{self, io, net::TcpStream, timer::Delay}; +use crate::futures; +use crate::server_utils::tokio::{self, net::TcpStream}; use parking_lot::Mutex; @@ -20,6 +22,11 @@ fn casual_server() -> ServerBuilder { ServerBuilder::new(io) } +fn run_future(fut: impl std::future::Future + Send) -> O { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(fut) +} + #[test] fn doc_test() { crate::logger::init_log(); @@ -41,11 +48,7 @@ fn doc_test_connect() { let server = casual_server(); let _server = server.start(&addr).expect("Server must run with no issues"); - let stream = TcpStream::connect(&addr) - .and_then(move |_stream| Ok(())) - .map_err(|err| panic!("Server connection error: {:?}", err)); - - tokio::run(stream); + run_future(async move { TcpStream::connect(&addr).await }).expect("Server connection error"); } #[test] @@ -56,14 +59,11 @@ fn disconnect() { let dispatcher = server.dispatcher(); let _server = server.start(&addr).expect("Server must run with no issues"); - let stream = TcpStream::connect(&addr) - .and_then(move |stream| { - assert_eq!(stream.peer_addr().unwrap(), addr); - stream.shutdown(::std::net::Shutdown::Both) - }) - .map_err(|err| panic!("Error disconnecting: {:?}", err)); - - tokio::run(stream); + run_future(async move { + let stream = TcpStream::connect(&addr).await.unwrap(); + assert_eq!(stream.peer_addr().unwrap(), addr); + stream.shutdown(::std::net::Shutdown::Both).unwrap(); + }); ::std::thread::sleep(::std::time::Duration::from_millis(50)); @@ -71,19 +71,22 @@ fn disconnect() { } fn dummy_request(addr: &SocketAddr, data: Vec) -> Vec { - let (ret_tx, ret_rx) = futures01::sync::oneshot::channel(); - - let stream = TcpStream::connect(addr) - .and_then(move |stream| io::write_all(stream, data)) - .and_then(|(stream, _data)| { - stream.shutdown(Shutdown::Write).unwrap(); - io::read_to_end(stream, vec![]) - }) - .and_then(move |(_stream, read_buf)| ret_tx.send(read_buf).map_err(|err| panic!("Unable to send {:?}", err))) - .map_err(|err| panic!("Error connecting or closing connection: {:?}", err)); - - tokio::run(stream); - ret_rx.wait().expect("Unable to receive result") + let (ret_tx, ret_rx) = std::sync::mpsc::channel(); + + let stream = async move { + let mut stream = TcpStream::connect(addr).await?; + stream.write_all(&data).await?; + stream.shutdown(Shutdown::Write)?; + let mut read_buf = vec![]; + let _ = stream.read_to_end(&mut read_buf).await; + + let _ = ret_tx.send(read_buf).map_err(|err| panic!("Unable to send {:?}", err)); + + Ok::<(), Box>(()) + }; + + run_future(stream).unwrap(); + ret_rx.recv().expect("Unable to receive result") } fn dummy_request_str(addr: &SocketAddr, data: Vec) -> String { @@ -232,67 +235,62 @@ fn message() { let _server = server.start(&addr).expect("Server must run with no issues"); - let delay = Delay::new(Instant::now() + Duration::from_millis(500)).map_err(|err| panic!("{:?}", err)); - let message = "ping"; let executed_dispatch = Arc::new(Mutex::new(false)); let executed_request = Arc::new(Mutex::new(false)); let executed_dispatch_move = executed_dispatch.clone(); let executed_request_move = executed_request.clone(); - // CLIENT RUN - let stream = TcpStream::connect(&addr) - .and_then(|stream| future::ok(stream).join(delay)) - .and_then(move |stream| { - let peer_addr = peer_list.lock()[0].clone(); - dispatcher - .push_message(&peer_addr, message.to_owned()) - .expect("Should be sent with no errors"); - trace!(target: "tcp", "Dispatched message for {}", peer_addr); - future::ok(stream) - }) - .and_then(move |(stream, _)| { - // Read message plus newline appended by codec. - io::read_exact(stream, vec![0u8; message.len() + 1]) - }) - .and_then(move |(stream, read_buf)| { - trace!(target: "tcp", "Read ping message"); - let ping_signal = read_buf[..].to_vec(); - - assert_eq!( - format!("{}\n", message), - String::from_utf8(ping_signal).expect("String should be utf-8"), - "Sent request does not match received by the peer", - ); - // ensure that the above assert was actually triggered - *executed_dispatch_move.lock() = true; - - future::ok(stream) - }) - .and_then(|stream| { - // make request AFTER message dispatches - let data = b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n"; - io::write_all(stream, &data[..]) - }) - .and_then(|(stream, _)| { - stream.shutdown(Shutdown::Write).unwrap(); - io::read_to_end(stream, Vec::new()) - }) - .and_then(move |(_, read_buf)| { - trace!(target: "tcp", "Read response message"); - let response_signal = read_buf[..].to_vec(); - assert_eq!( - "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}\n", - String::from_utf8(response_signal).expect("String should be utf-8"), - "Response does not match the expected handling", - ); - *executed_request_move.lock() = true; - - future::ok(()) - }) - .map_err(|err| panic!("Dispach message error: {:?}", err)); - - tokio::run(stream); + let client = async move { + let stream = TcpStream::connect(&addr); + let delay = tokio::time::delay_for(Duration::from_millis(500)); + let (stream, _) = futures::join!(stream, delay); + let mut stream = stream?; + + let peer_addr = peer_list.lock()[0].clone(); + dispatcher + .push_message(&peer_addr, message.to_owned()) + .expect("Should be sent with no errors"); + trace!(target: "tcp", "Dispatched message for {}", peer_addr); + + // Read message plus newline appended by codec. + let mut read_buf = vec![0u8; message.len() + 1]; + let _ = stream.read_exact(&mut read_buf).await?; + + trace!(target: "tcp", "Read ping message"); + let ping_signal = read_buf[..].to_vec(); + + assert_eq!( + format!("{}\n", message), + String::from_utf8(ping_signal).expect("String should be utf-8"), + "Sent request does not match received by the peer", + ); + // ensure that the above assert was actually triggered + *executed_dispatch_move.lock() = true; + + // make request AFTER message dispatches + let data = b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n"; + stream.write_all(&data[..]).await?; + + stream.shutdown(Shutdown::Write).unwrap(); + let mut read_buf = vec![]; + let _ = stream.read_to_end(&mut read_buf).await?; + + trace!(target: "tcp", "Read response message"); + let response_signal = read_buf[..].to_vec(); + assert_eq!( + "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}\n", + String::from_utf8(response_signal).expect("String should be utf-8"), + "Response does not match the expected handling", + ); + *executed_request_move.lock() = true; + + // delay + Ok::<(), Box>(()) + }; + + run_future(client).unwrap(); + assert!(*executed_dispatch.lock()); assert!(*executed_request.lock()); } diff --git a/ws/Cargo.toml b/ws/Cargo.toml index 2247d1098..37324fbe3 100644 --- a/ws/Cargo.toml +++ b/ws/Cargo.toml @@ -10,8 +10,7 @@ repository = "https://github.com/paritytech/jsonrpc" version = "16.0.0" [dependencies] -futures01 = { version = "0.1", package = "futures" } -futures03 = { version = "0.3", package = "futures", features = [ "compat" ] } +futures = "0.3" jsonrpc-core = { version = "16.0", path = "../core" } jsonrpc-server-utils = { version = "16.0", path = "../server-utils" } log = "0.4" diff --git a/ws/src/metadata.rs b/ws/src/metadata.rs index 624be5830..25b1bed82 100644 --- a/ws/src/metadata.rs +++ b/ws/src/metadata.rs @@ -1,9 +1,12 @@ use std::fmt; +use std::future::Future; +use std::pin::Pin; use std::sync::{atomic, Arc}; +use std::task::{Context, Poll}; use crate::core; use crate::core::futures::channel::mpsc; -use crate::server_utils::{session, tokio::runtime::TaskExecutor}; +use crate::server_utils::{reactor::TaskExecutor, session}; use crate::ws; use crate::error; @@ -79,10 +82,8 @@ impl RequestContext { /// Get this session as a `Sink` spawning a new future /// in the underlying event loop. pub fn sender(&self) -> mpsc::UnboundedSender { - use futures03::{StreamExt, TryStreamExt}; let out = self.out.clone(); let (sender, receiver) = mpsc::unbounded(); - let receiver = receiver.map(Ok).compat(); self.executor.spawn(SenderFuture(out, Box::new(receiver))); sender } @@ -123,27 +124,23 @@ impl MetaExtractor for NoopExtractor { } } -struct SenderFuture(Sender, Box + Send>); -impl futures01::Future for SenderFuture { - type Item = (); - type Error = (); +struct SenderFuture(Sender, Box + Send + Unpin>); - fn poll(&mut self) -> futures01::Poll { - use futures01::Stream; +impl Future for SenderFuture { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use futures::Stream; + + let this = Pin::into_inner(self); loop { - let item = self.1.poll()?; - match item { - futures01::Async::NotReady => { - return Ok(futures01::Async::NotReady); - } - futures01::Async::Ready(None) => { - return Ok(futures01::Async::Ready(())); - } - futures01::Async::Ready(Some(val)) => { - if let Err(e) = self.0.send(val) { + match Pin::new(&mut this.1).poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(Some(val)) => { + if let Err(e) = this.0.send(val) { warn!("Error sending a subscription update: {:?}", e); - return Ok(futures01::Async::Ready(())); + return Poll::Ready(()); } } } diff --git a/ws/src/server_builder.rs b/ws/src/server_builder.rs index e10978f34..2bfdcaadc 100644 --- a/ws/src/server_builder.rs +++ b/ws/src/server_builder.rs @@ -2,10 +2,9 @@ use std::net::SocketAddr; use std::sync::Arc; use crate::core; -use crate::server_utils; use crate::server_utils::cors::Origin; use crate::server_utils::hosts::{DomainsValidation, Host}; -use crate::server_utils::reactor::UninitializedExecutor; +use crate::server_utils::reactor::{self, UninitializedExecutor}; use crate::server_utils::session::SessionStats; use crate::error::Result; @@ -69,7 +68,7 @@ where } /// Utilize existing event loop executor to poll RPC results. - pub fn event_loop_executor(mut self, executor: server_utils::tokio::runtime::TaskExecutor) -> Self { + pub fn event_loop_executor(mut self, executor: reactor::TaskExecutor) -> Self { self.executor = UninitializedExecutor::Shared(executor); self } diff --git a/ws/src/session.rs b/ws/src/session.rs index 65d76fd38..eef7b0e9e 100644 --- a/ws/src/session.rs +++ b/ws/src/session.rs @@ -1,17 +1,20 @@ -use std; +use std::future::Future; +use std::pin::Pin; use std::sync::{atomic, Arc}; +use std::task::{Context, Poll}; use crate::core; -use futures01::sync::oneshot; -use futures01::{Async, Future, Poll}; +use futures::channel::oneshot; +use futures::future; +use futures::FutureExt; use parking_lot::Mutex; use slab::Slab; use crate::server_utils::cors::Origin; use crate::server_utils::hosts::Host; +use crate::server_utils::reactor::TaskExecutor; use crate::server_utils::session::{SessionId, SessionStats}; -use crate::server_utils::tokio::runtime::TaskExecutor; use crate::server_utils::Pattern; use crate::ws; @@ -123,16 +126,16 @@ impl LivenessPoll { } impl Future for LivenessPoll { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = Pin::into_inner(self); // if the future resolves ok then we've been signalled to return. // it should never be cancelled, but if it was the session definitely // isn't live. - match self.rx.poll() { - Ok(Async::Ready(_)) | Err(_) => Ok(Async::Ready(())), - Ok(Async::NotReady) => Ok(Async::NotReady), + match Pin::new(&mut this.rx).poll(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, } } } @@ -270,30 +273,25 @@ where let active_lock = self.active.clone(); let response = self.handler.handle_request(req, metadata); - use futures03::{FutureExt, TryFutureExt}; - let response = response.map(Ok).compat(); - let future = response - .map(move |response| { - if !active_lock.load(atomic::Ordering::SeqCst) { - return; - } - if let Some(result) = response { - let res = out.send(result); - match res { - Err(error::Error::ConnectionClosed) => { - active_lock.store(false, atomic::Ordering::SeqCst); - } - Err(e) => { - warn!("Error while sending response: {:?}", e); - } - _ => {} + let future = response.map(move |response| { + if !active_lock.load(atomic::Ordering::SeqCst) { + return; + } + if let Some(result) = response { + let res = out.send(result); + match res { + Err(error::Error::ConnectionClosed) => { + active_lock.store(false, atomic::Ordering::SeqCst); } + Err(e) => { + warn!("Error while sending response: {:?}", e); + } + _ => {} } - }) - .select(poll_liveness) - .map(|_| ()) - .map_err(|_| ()); + } + }); + let future = future::select(future, poll_liveness); self.executor.spawn(future); Ok(()) diff --git a/ws/src/tests.rs b/ws/src/tests.rs index b416c9384..0d2928460 100644 --- a/ws/src/tests.rs +++ b/ws/src/tests.rs @@ -60,7 +60,7 @@ fn request(server: Server, request: &str) -> Response { } fn serve(port: u16) -> (Server, Arc) { - use futures03::{channel::oneshot, future, FutureExt}; + use futures::{channel::oneshot, future, FutureExt}; let pending = Arc::new(AtomicUsize::new(0)); let counter = pending.clone();