Skip to content

Commit

Permalink
Migrate to Tokio 0.2 (#603)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xanewok authored Jan 11, 2021
1 parent 61ea7c6 commit 91cf903
Show file tree
Hide file tree
Showing 33 changed files with 924 additions and 895 deletions.
10 changes: 4 additions & 6 deletions core-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ categories = [
]

[features]
tls = ["jsonrpc-client-transports/tls", "futures01"]
http = ["jsonrpc-client-transports/http", "futures01"]
ws = ["jsonrpc-client-transports/ws", "futures01"]
ipc = ["jsonrpc-client-transports/ipc", "futures01"]
tls = ["jsonrpc-client-transports/tls"]
http = ["jsonrpc-client-transports/http"]
ws = ["jsonrpc-client-transports/ws"]
ipc = ["jsonrpc-client-transports/ipc"]
arbitrary_precision = ["jsonrpc-client-transports/arbitrary_precision"]

[dependencies]
jsonrpc-client-transports = { version = "16.0", path = "./transports", default-features = false }
# Only for client transports, should be removed when we fully transition to futures=0.3
futures01 = { version = "0.1", package = "futures", optional = true }
futures = { version = "0.3", features = [ "compat" ] }

[badges]
Expand Down
3 changes: 0 additions & 3 deletions core-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,3 @@

pub use futures;
pub use jsonrpc_client_transports::*;

#[cfg(feature = "futures01")]
pub use futures01;
16 changes: 7 additions & 9 deletions core-client/transports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,34 @@ categories = [
[features]
default = ["http", "tls", "ws"]
tls = ["hyper-tls", "http"]
http = ["hyper", "futures01"]
http = ["hyper", "tokio/full"]
ws = [
"websocket",
"tokio",
"futures01",
"futures/compat"
]
ipc = [
"parity-tokio-ipc",
"jsonrpc-server-utils",
"tokio",
"futures01",
]
arbitrary_precision = ["serde_json/arbitrary_precision", "jsonrpc-core/arbitrary_precision"]

[dependencies]
derive_more = "0.99"
futures = { version = "0.3", features = [ "compat" ] }
futures = "0.3"
jsonrpc-core = { version = "16.0", path = "../../core" }
jsonrpc-pubsub = { version = "16.0", path = "../../pubsub" }
log = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
url = "1.7"

futures01 = { version = "0.1.26", package = "futures", optional = true }
hyper = { version = "0.12", optional = true }
hyper-tls = { version = "0.3.2", optional = true }
hyper = { version = "0.13", optional = true }
hyper-tls = { version = "0.4", optional = true }
jsonrpc-server-utils = { version = "16.0", path = "../../server-utils", optional = true }
parity-tokio-ipc = { version = "0.2", optional = true }
tokio = { version = "0.1", optional = true }
parity-tokio-ipc = { version = "0.8", optional = true }
tokio = { version = "0.2", optional = true }
websocket = { version = "0.24", optional = true }

[dev-dependencies]
Expand Down
132 changes: 52 additions & 80 deletions core-client/transports/src/transports/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,75 +4,52 @@
use super::RequestBuilder;
use crate::{RpcChannel, RpcError, RpcMessage, RpcResult};
use futures::{Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use hyper::{http, rt, Client, Request, Uri};
use futures::{future, Future, FutureExt, StreamExt, TryFutureExt};
use hyper::{http, Client, Request, Uri};

/// Create a HTTP Client
pub fn connect<TClient>(url: &str) -> impl Future<Output = RpcResult<TClient>>
pub async fn connect<TClient>(url: &str) -> RpcResult<TClient>
where
TClient: From<RpcChannel>,
{
let (sender, receiver) = futures::channel::oneshot::channel();
let url = url.to_owned();

std::thread::spawn(move || {
let connect = rt::lazy(move || {
do_connect(&url)
.map(|client| {
if sender.send(client).is_err() {
panic!("The caller did not wait for the server.");
}
Ok(())
})
.compat()
});
rt::run(connect);
});
let url: Uri = url.parse().map_err(|e| RpcError::Other(Box::new(e)))?;

receiver.map(|res| res.expect("Server closed prematurely.").map(TClient::from))
}
let (client_api, client_worker) = do_connect(url).await;
tokio::spawn(client_worker);

fn do_connect(url: &str) -> impl Future<Output = RpcResult<RpcChannel>> {
use futures::future::ready;
Ok(TClient::from(client_api))
}

async fn do_connect(url: Uri) -> (RpcChannel, impl Future<Output = ()>) {
let max_parallel = 8;
let url: Uri = match url.parse() {
Ok(url) => url,
Err(e) => return ready(Err(RpcError::Other(Box::new(e)))),
};

#[cfg(feature = "tls")]
let connector = match hyper_tls::HttpsConnector::new(4) {
Ok(connector) => connector,
Err(e) => return ready(Err(RpcError::Other(Box::new(e)))),
};
let connector = hyper_tls::HttpsConnector::new();
#[cfg(feature = "tls")]
let client = Client::builder().build::<_, hyper::Body>(connector);

#[cfg(not(feature = "tls"))]
let client = Client::new();

// Keep track of internal request IDs when building subsequent requests
let mut request_builder = RequestBuilder::new();

let (sender, receiver) = futures::channel::mpsc::unbounded();

use futures01::{Future, Stream};
let fut = receiver
.map(Ok)
.compat()
.filter_map(move |msg: RpcMessage| {
let (request, sender) = match msg {
future::ready(match msg {
RpcMessage::Call(call) => {
let (_, request) = request_builder.call_request(&call);
(request, Some(call.sender))
Some((request, Some(call.sender)))
}
RpcMessage::Notify(notify) => (request_builder.notification(&notify), None),
RpcMessage::Notify(notify) => Some((request_builder.notification(&notify), None)),
RpcMessage::Subscribe(_) => {
log::warn!("Unsupported `RpcMessage` type `Subscribe`.");
return None;
None
}
};

})
})
.map(move |(request, sender)| {
let request = Request::post(&url)
.header(
http::header::CONTENT_TYPE,
Expand All @@ -85,46 +62,42 @@ fn do_connect(url: &str) -> impl Future<Output = RpcResult<RpcChannel>> {
.body(request.into())
.expect("Uri and request headers are valid; qed");

Some(client.request(request).then(move |response| Ok((response, sender))))
client
.request(request)
.then(|response| async move { (response, sender) })
})
.buffer_unordered(max_parallel)
.for_each(|(result, sender)| {
use futures01::future::{
self,
Either::{A, B},
};
let future = match result {
.for_each(|(response, sender)| async {
let result = match response {
Ok(ref res) if !res.status().is_success() => {
log::trace!("http result status {}", res.status());
A(future::err(RpcError::Client(format!(
Err(RpcError::Client(format!(
"Unexpected response status code: {}",
res.status()
))))
)))
}
Err(err) => Err(RpcError::Other(Box::new(err))),
Ok(res) => {
hyper::body::to_bytes(res.into_body())
.map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e)))
.await
}
Ok(res) => B(res
.into_body()
.map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e)))
.concat2()),
Err(err) => A(future::err(RpcError::Other(Box::new(err)))),
};
future.then(|result| {
if let Some(sender) = sender {
let response = result
.and_then(|response| {
let response_str = String::from_utf8_lossy(response.as_ref()).into_owned();
super::parse_response(&response_str)
})
.and_then(|r| r.1);
if let Err(err) = sender.send(response) {
log::warn!("Error resuming asynchronous request: {:?}", err);
}

if let Some(sender) = sender {
let response = result
.and_then(|response| {
let response_str = String::from_utf8_lossy(response.as_ref()).into_owned();
super::parse_response(&response_str)
})
.and_then(|r| r.1);
if let Err(err) = sender.send(response) {
log::warn!("Error resuming asynchronous request: {:?}", err);
}
Ok(())
})
}
});

rt::spawn(fut.map_err(|e: RpcError| log::error!("RPC Client error: {:?}", e)));
ready(Ok(sender.into()))
(sender.into(), fut)
}

#[cfg(test)]
Expand Down Expand Up @@ -218,7 +191,7 @@ mod tests {
Ok(()) as RpcResult<_>
};

futures::executor::block_on(run).unwrap();
tokio::runtime::Runtime::new().unwrap().block_on(run).unwrap();
}

#[test]
Expand All @@ -227,18 +200,16 @@ mod tests {

// given
let server = TestServer::serve(id);
let (tx, rx) = std::sync::mpsc::channel();

// when
let run = async move {
let run = async {
let client: TestClient = connect(&server.uri).await.unwrap();
client.notify(12).unwrap();
tx.send(()).unwrap();
};

let pool = futures::executor::ThreadPool::builder().pool_size(1).create().unwrap();
pool.spawn_ok(run);
rx.recv().unwrap();
tokio::runtime::Runtime::new().unwrap().block_on(run);
// Ensure that server has not been moved into runtime
drop(server);
}

#[test]
Expand All @@ -249,7 +220,8 @@ mod tests {
let invalid_uri = "invalid uri";

// when
let res: RpcResult<TestClient> = futures::executor::block_on(connect(invalid_uri));
let fut = connect(invalid_uri);
let res: RpcResult<TestClient> = tokio::runtime::Runtime::new().unwrap().block_on(fut);

// then
assert_matches!(
Expand All @@ -271,7 +243,7 @@ mod tests {
let client: TestClient = connect(&server.uri).await?;
client.fail().await
};
let res = futures::executor::block_on(run);
let res = tokio::runtime::Runtime::new().unwrap().block_on(run);

// then
if let Err(RpcError::JsonRpcError(err)) = res {
Expand Down Expand Up @@ -312,6 +284,6 @@ mod tests {
Ok(()) as RpcResult<_>
};

futures::executor::block_on(run).unwrap();
tokio::runtime::Runtime::new().unwrap().block_on(run).unwrap();
}
}
Loading

0 comments on commit 91cf903

Please sign in to comment.