Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to Tokio 0.2 #603

Merged
merged 73 commits into from
Jan 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
a8984f0
Fix unused variable warning in client-local example
Xanewok Dec 7, 2020
907cc08
Mark tokio_service import as trait-specific
Xanewok Dec 7, 2020
867c72f
Unify parity-tokio-ipc dependencies
Xanewok Dec 7, 2020
51c2fee
WIP: Try to adapt ipc to use futures03
Xanewok Dec 7, 2020
c7f403d
WIP: Use patched parity-tokio-ipc
Xanewok Dec 8, 2020
4461c61
WIP: PoC with tokio_compat
Xanewok Dec 8, 2020
c1409c2
Fix ipc test failure
Xanewok Dec 8, 2020
224c4a0
Fix and adapt rest of IPC
Xanewok Dec 8, 2020
8ed4be7
Prefer using tokio_compat everywhere for now
Xanewok Dec 8, 2020
cab4479
Prefer tower-service 0.3 over tokio-service 0.1
Xanewok Dec 8, 2020
015e918
Make tokio-codec optional
Xanewok Dec 8, 2020
ebc3df9
Don't use tokio-codec in the ipc server
Xanewok Dec 8, 2020
22a4323
Don't explicitly depend on futures01 in server-utils
Xanewok Dec 8, 2020
53241d4
Make tokio optional in server-utils
Xanewok Dec 8, 2020
a6d31f0
WIP: Prepare to spawn a 0.3 future in IPC server
Xanewok Dec 9, 2020
ffe6da7
Simplify service.call in IPC
Xanewok Dec 9, 2020
5e2921b
WIP: Alow to opt out of Tokio 0.1 in server-utils
Xanewok Dec 9, 2020
ffc2fca
Migrate IPC server to Tokio 0.2
Xanewok Dec 9, 2020
9387f85
Adapt ipc client transport to Tokio 0.2
Xanewok Dec 16, 2020
7ceda69
Adapt ws client to Tokio 0.2
Xanewok Dec 17, 2020
6009b18
WIP: Migrate http server to Tokio 0.2 part 1
Xanewok Dec 17, 2020
fb49817
WIP: Migrate http server to Tokio 0.2 part 2
Xanewok Dec 17, 2020
68e1475
WIP: Migrate http server to Tokio 0.2 part 3: tests pass
Xanewok Dec 17, 2020
67360db
http: Silence warnings
Xanewok Dec 17, 2020
3b5d965
WIP: Adapt tcp server: part 1
Xanewok Dec 17, 2020
0be5b86
Argh yet another attempt at one type is more general
Xanewok Dec 17, 2020
91d48ae
Prune futures01 from tcp server
Xanewok Dec 17, 2020
029b2d0
Fix stuff
Xanewok Dec 18, 2020
094222e
Clean up a bit TCP server impl
Xanewok Dec 18, 2020
ab00637
Clean up TCP server a bit more
Xanewok Dec 18, 2020
0ce3ac3
Migrate to Tokio 0.2 for WS server
Xanewok Dec 18, 2020
ce0874b
WIP: Adapt HTTP client to use Tokio 0.2
Xanewok Dec 18, 2020
188f09b
Prune tokio 0.1 from server-utils entirely
Xanewok Dec 18, 2020
90d28ee
Migrate stdio server to Tokio 0.2
Xanewok Dec 18, 2020
def4a98
Remove tokio-uds from ipc server
Xanewok Dec 18, 2020
c0d80fb
Remove some Tokio 0.1-related code
Xanewok Dec 18, 2020
7f420a3
Remove some commented code
Xanewok Dec 18, 2020
9a9680c
Remove some tokio-compat-related code
Xanewok Dec 18, 2020
a9be662
Match previous poll match order in tcp server
Xanewok Dec 18, 2020
ac60b45
Bring back event_loop_executors
Xanewok Dec 18, 2020
2642ad4
Minimize differences in server-utils/src/stream_codec.rs
Xanewok Dec 18, 2020
3f90c26
Minor improvements to tcp/src/dispatch.rs
Xanewok Dec 18, 2020
3c6693c
Remove commented code in http client
Xanewok Dec 18, 2020
2e2e3d2
Polish and minimize the diff
Xanewok Dec 21, 2020
9aac40c
Polish and minimize the diff pt 2
Xanewok Dec 21, 2020
68e9be6
Fix HTTP client Tokio runtime management
Xanewok Dec 21, 2020
3859116
Prefer using hyper API for HTTP server
Xanewok Dec 26, 2020
e3b9603
Work around Windows TCP socket close bug in Tokio 0.2
Xanewok Dec 27, 2020
a13cc97
Migrate http to std::future
Xanewok Dec 27, 2020
a4cd677
Remove futures01 from http core-client/http
Xanewok Dec 28, 2020
62263d9
Migrate core-client/ws to std::future
Xanewok Dec 28, 2020
93b2887
Remove futures01 from core-client
Xanewok Dec 28, 2020
931f7ff
Prefer futures to futures03 in server-utils
Xanewok Dec 28, 2020
2bbac0c
Migrate ipc server to std::future
Xanewok Dec 28, 2020
0b8326c
Prefer futures to futures03 in ipc server
Xanewok Dec 28, 2020
52d9fa0
Prefer futures to futures03 in tcp server
Xanewok Dec 28, 2020
ffee890
Migrate ws server to std::future
Xanewok Dec 28, 2020
274e653
Prefer futures to futures03 in ws server
Xanewok Dec 28, 2020
5720506
Prefer Unpin over std::marker::Unpin
Xanewok Dec 29, 2020
714b023
Remove stray eprintlns
Xanewok Dec 29, 2020
d33c2f9
Remove unnecessary Into casts
Xanewok Dec 29, 2020
e012842
Format the world
Xanewok Dec 29, 2020
7d4faa0
Don't do redundant work when suspendable stream is pending
Xanewok Jan 4, 2021
a883edc
Return future directly instead in stdio server
Xanewok Jan 4, 2021
7039504
Simplify some imports in Cargo.tomls
Xanewok Jan 4, 2021
010701d
Format again
Xanewok Jan 4, 2021
bb42d82
Add missing feature for WS client transport
Xanewok Jan 4, 2021
dc1a814
Don't spawn a thread in HTTP client connect
Xanewok Jan 4, 2021
94a4b23
Fix ws client sink impl with an unbounded queue
Xanewok Jan 5, 2021
1bd2fc5
Format again
Xanewok Jan 5, 2021
82c5be1
Use released parity-tokio-ipc v0.8 crate
Xanewok Jan 5, 2021
35cabd1
Remove extraneous `use std;` imports
Xanewok Jan 11, 2021
a05cd4e
Re-export tokio from ipc server
Xanewok Jan 11, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions core-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ categories = [
]

[features]
tls = ["jsonrpc-client-transports/tls", "futures01"]
http = ["jsonrpc-client-transports/http", "futures01"]
ws = ["jsonrpc-client-transports/ws", "futures01"]
ipc = ["jsonrpc-client-transports/ipc", "futures01"]
tls = ["jsonrpc-client-transports/tls"]
http = ["jsonrpc-client-transports/http"]
ws = ["jsonrpc-client-transports/ws"]
ipc = ["jsonrpc-client-transports/ipc"]
arbitrary_precision = ["jsonrpc-client-transports/arbitrary_precision"]

[dependencies]
jsonrpc-client-transports = { version = "16.0", path = "./transports", default-features = false }
# Only for client transports, should be removed when we fully transition to futures=0.3
futures01 = { version = "0.1", package = "futures", optional = true }
futures = { version = "0.3", features = [ "compat" ] }

[badges]
Expand Down
3 changes: 0 additions & 3 deletions core-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,3 @@

pub use futures;
pub use jsonrpc_client_transports::*;

#[cfg(feature = "futures01")]
pub use futures01;
16 changes: 7 additions & 9 deletions core-client/transports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,34 @@ categories = [
[features]
default = ["http", "tls", "ws"]
tls = ["hyper-tls", "http"]
http = ["hyper", "futures01"]
http = ["hyper", "tokio/full"]
ws = [
"websocket",
"tokio",
"futures01",
"futures/compat"
]
ipc = [
"parity-tokio-ipc",
"jsonrpc-server-utils",
"tokio",
"futures01",
]
arbitrary_precision = ["serde_json/arbitrary_precision", "jsonrpc-core/arbitrary_precision"]

[dependencies]
derive_more = "0.99"
futures = { version = "0.3", features = [ "compat" ] }
futures = "0.3"
jsonrpc-core = { version = "16.0", path = "../../core" }
jsonrpc-pubsub = { version = "16.0", path = "../../pubsub" }
log = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
url = "1.7"

futures01 = { version = "0.1.26", package = "futures", optional = true }
hyper = { version = "0.12", optional = true }
hyper-tls = { version = "0.3.2", optional = true }
hyper = { version = "0.13", optional = true }
hyper-tls = { version = "0.4", optional = true }
jsonrpc-server-utils = { version = "16.0", path = "../../server-utils", optional = true }
parity-tokio-ipc = { version = "0.2", optional = true }
tokio = { version = "0.1", optional = true }
parity-tokio-ipc = { version = "0.8", optional = true }
tokio = { version = "0.2", optional = true }
websocket = { version = "0.24", optional = true }

[dev-dependencies]
Expand Down
132 changes: 52 additions & 80 deletions core-client/transports/src/transports/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,75 +4,52 @@

use super::RequestBuilder;
use crate::{RpcChannel, RpcError, RpcMessage, RpcResult};
use futures::{Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use hyper::{http, rt, Client, Request, Uri};
use futures::{future, Future, FutureExt, StreamExt, TryFutureExt};
use hyper::{http, Client, Request, Uri};

/// Create a HTTP Client
pub fn connect<TClient>(url: &str) -> impl Future<Output = RpcResult<TClient>>
pub async fn connect<TClient>(url: &str) -> RpcResult<TClient>
where
TClient: From<RpcChannel>,
{
let (sender, receiver) = futures::channel::oneshot::channel();
let url = url.to_owned();

std::thread::spawn(move || {
let connect = rt::lazy(move || {
do_connect(&url)
.map(|client| {
if sender.send(client).is_err() {
panic!("The caller did not wait for the server.");
}
Ok(())
})
.compat()
});
rt::run(connect);
});
let url: Uri = url.parse().map_err(|e| RpcError::Other(Box::new(e)))?;

receiver.map(|res| res.expect("Server closed prematurely.").map(TClient::from))
}
let (client_api, client_worker) = do_connect(url).await;
tokio::spawn(client_worker);

fn do_connect(url: &str) -> impl Future<Output = RpcResult<RpcChannel>> {
use futures::future::ready;
Ok(TClient::from(client_api))
}

async fn do_connect(url: Uri) -> (RpcChannel, impl Future<Output = ()>) {
let max_parallel = 8;
let url: Uri = match url.parse() {
Ok(url) => url,
Err(e) => return ready(Err(RpcError::Other(Box::new(e)))),
};

#[cfg(feature = "tls")]
let connector = match hyper_tls::HttpsConnector::new(4) {
Ok(connector) => connector,
Err(e) => return ready(Err(RpcError::Other(Box::new(e)))),
};
let connector = hyper_tls::HttpsConnector::new();
#[cfg(feature = "tls")]
let client = Client::builder().build::<_, hyper::Body>(connector);

#[cfg(not(feature = "tls"))]
let client = Client::new();

// Keep track of internal request IDs when building subsequent requests
let mut request_builder = RequestBuilder::new();

let (sender, receiver) = futures::channel::mpsc::unbounded();

use futures01::{Future, Stream};
let fut = receiver
.map(Ok)
.compat()
.filter_map(move |msg: RpcMessage| {
let (request, sender) = match msg {
future::ready(match msg {
RpcMessage::Call(call) => {
let (_, request) = request_builder.call_request(&call);
(request, Some(call.sender))
Some((request, Some(call.sender)))
}
RpcMessage::Notify(notify) => (request_builder.notification(&notify), None),
RpcMessage::Notify(notify) => Some((request_builder.notification(&notify), None)),
RpcMessage::Subscribe(_) => {
log::warn!("Unsupported `RpcMessage` type `Subscribe`.");
return None;
None
}
};

})
})
.map(move |(request, sender)| {
let request = Request::post(&url)
.header(
http::header::CONTENT_TYPE,
Expand All @@ -85,46 +62,42 @@ fn do_connect(url: &str) -> impl Future<Output = RpcResult<RpcChannel>> {
.body(request.into())
.expect("Uri and request headers are valid; qed");

Some(client.request(request).then(move |response| Ok((response, sender))))
client
.request(request)
.then(|response| async move { (response, sender) })
})
.buffer_unordered(max_parallel)
.for_each(|(result, sender)| {
use futures01::future::{
self,
Either::{A, B},
};
let future = match result {
.for_each(|(response, sender)| async {
let result = match response {
Ok(ref res) if !res.status().is_success() => {
log::trace!("http result status {}", res.status());
A(future::err(RpcError::Client(format!(
Err(RpcError::Client(format!(
"Unexpected response status code: {}",
res.status()
))))
)))
}
Err(err) => Err(RpcError::Other(Box::new(err))),
Ok(res) => {
hyper::body::to_bytes(res.into_body())
.map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e)))
.await
}
Ok(res) => B(res
.into_body()
.map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e)))
.concat2()),
Err(err) => A(future::err(RpcError::Other(Box::new(err)))),
};
future.then(|result| {
if let Some(sender) = sender {
let response = result
.and_then(|response| {
let response_str = String::from_utf8_lossy(response.as_ref()).into_owned();
super::parse_response(&response_str)
})
.and_then(|r| r.1);
if let Err(err) = sender.send(response) {
log::warn!("Error resuming asynchronous request: {:?}", err);
}

if let Some(sender) = sender {
let response = result
.and_then(|response| {
let response_str = String::from_utf8_lossy(response.as_ref()).into_owned();
super::parse_response(&response_str)
})
.and_then(|r| r.1);
if let Err(err) = sender.send(response) {
log::warn!("Error resuming asynchronous request: {:?}", err);
}
Ok(())
})
}
});

rt::spawn(fut.map_err(|e: RpcError| log::error!("RPC Client error: {:?}", e)));
ready(Ok(sender.into()))
(sender.into(), fut)
}

#[cfg(test)]
Expand Down Expand Up @@ -218,7 +191,7 @@ mod tests {
Ok(()) as RpcResult<_>
};

futures::executor::block_on(run).unwrap();
tokio::runtime::Runtime::new().unwrap().block_on(run).unwrap();
}

#[test]
Expand All @@ -227,18 +200,16 @@ mod tests {

// given
let server = TestServer::serve(id);
let (tx, rx) = std::sync::mpsc::channel();

// when
let run = async move {
let run = async {
let client: TestClient = connect(&server.uri).await.unwrap();
client.notify(12).unwrap();
tx.send(()).unwrap();
};

let pool = futures::executor::ThreadPool::builder().pool_size(1).create().unwrap();
pool.spawn_ok(run);
rx.recv().unwrap();
tokio::runtime::Runtime::new().unwrap().block_on(run);
// Ensure that server has not been moved into runtime
drop(server);
}

#[test]
Expand All @@ -249,7 +220,8 @@ mod tests {
let invalid_uri = "invalid uri";

// when
let res: RpcResult<TestClient> = futures::executor::block_on(connect(invalid_uri));
let fut = connect(invalid_uri);
let res: RpcResult<TestClient> = tokio::runtime::Runtime::new().unwrap().block_on(fut);

// then
assert_matches!(
Expand All @@ -271,7 +243,7 @@ mod tests {
let client: TestClient = connect(&server.uri).await?;
client.fail().await
};
let res = futures::executor::block_on(run);
let res = tokio::runtime::Runtime::new().unwrap().block_on(run);

// then
if let Err(RpcError::JsonRpcError(err)) = res {
Expand Down Expand Up @@ -312,6 +284,6 @@ mod tests {
Ok(()) as RpcResult<_>
};

futures::executor::block_on(run).unwrap();
tokio::runtime::Runtime::new().unwrap().block_on(run).unwrap();
}
}
Loading