-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Axum integration #39
base: main
Are you sure you want to change the base?
Axum integration #39
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @clarkohw! Thank you for taking the time to work on this, I really appreciate it.
I've reviewed the PR and overall the structure is good but I think that it needs a slight rework to make the API a bit more ergonomic and ensure that the contracts made during the upgrade request are maintained; it would also be nice to be able to provide a WebSocket configuration to the handler.
If I can help at all then let me know.
ratchet_axum/src/lib.rs
Outdated
.status(hyper::StatusCode::SWITCHING_PROTOCOLS) | ||
.header(hyper::header::CONNECTION, "upgrade") | ||
.header(hyper::header::UPGRADE, "websocket") | ||
.header("Sec-WebSocket-Accept", self.key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A user needs to be able to specify sec-websocket-protocol
headers and the server needs to take the intersection of the two sets.
ratchet_axum/src/lib.rs
Outdated
.header("Sec-WebSocket-Accept", self.key); | ||
|
||
if self.permessage_deflate { | ||
builder = builder.header("Sec-WebSocket-Extensions", "permessage-deflate"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't quite correct. If the client has sent a permessage-deflate
header then it must be negotiated by the extension otherwise the connection will be unreliable. If the server accepts the PMCE request, then the client may send a compressed frame that the server won't be able to read. While it is correct that the server can elect to ignore the configuration parameters, Ratchet does support Deflate PMCE configuration and it's worth integrating.
What it would be worth doing, is providing an https://docs.rs/ratchet_ext/latest/ratchet_ext/trait.ExtensionProvider.html which a user may specify to the Axum Router or the handler may capture it. Then you can use this provider to perform the extension negotiation using https://docs.rs/ratchet_ext/latest/ratchet_ext/trait.ExtensionProvider.html#tymethod.negotiate_server.
Also, so it's tidier, you can use http::http::header::SEC_WEBSOCKET_EXTENSIONS
here.
ratchet_axum/src/lib.rs
Outdated
@@ -0,0 +1,162 @@ | |||
// Port of hyper_tunstenite for fastwebsockets. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can understand the rationale behind this but I don't believe that this is required as it's a fairly simple integration and there isn't really another way of integrating it into Axum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some module-level documentation would be good for this. Coupled with an example of it
} | ||
|
||
impl IncomingUpgrade { | ||
pub fn upgrade(self) -> Result<(Response<Body>, UpgradeFut), Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be more ergonomic if this was inverted a bit. It would also be safer as it ensures that any contracts made during the upgrade are upheld as we create the WebSocket
instance. Otherwise, we could negotiate an extension and then a user could create a WebSocket
without it.
Something like this:
pub fn upgrade<E,F, Fut>(self, f:F) -> Response<Body>
where
F: FnOnce(UpgradedServer<TokioIo<hyper::upgrade::Upgraded>, E>) -> Fut,
Fut: Future<Output=()>,
E: Extension,
{
// await the upgrade future and spawn the user's handler after creating the WebSocket.
}
Then you could use it as follows:
async fn ws_handler<E>(incoming_upgrade: IncomingUpgrade, state: State<E>) -> impl IntoResponse {
incoming_upgrade.upgrade(|mut upgraded| async {
let UpgradedServer {
request,
websocket,
subprotocol,
} = upgraded;
let mut buf = BytesMut::new();
loop {
match websocket.read(&mut buf).await.unwrap() {
Message::Text => {
websocket.write(&mut buf, PayloadType::Text).await.unwrap();
buf.clear();
}
_ => break,
}
}
})
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SirCipher I am trying to implement the upgrade like this but having trouble because ExtensionProvider
is not Send
. I tried it with the below setup, and was able to get it to compile/run, but running into this error:
called `Result::unwrap()` on an `Err` value: Error { inner: Inner { kind: IO, source: Some(Kind(UnexpectedEof)) } }
wondering if you have any advice here
pub fn upgrade_2<E, F, Fut>(self, f: F, provider: E) -> Response<Body>
where
F: FnOnce(UpgradedServer<TokioIo<hyper::upgrade::Upgraded>, E::Extension>) -> Fut
+ Send
+ 'static,
Fut: Future<Output = ()> + Send,
E: ExtensionProvider + Send + 'static,
<E as ExtensionProvider>::Extension: Send,
{
tokio::spawn(async move {
let upgraded = match self.on_upgrade.await {
Ok(upgraded) => upgraded,
Err(err) => {
return;
}
};
let upgraded = TokioIo::new(upgraded);
let upgrade_server = accept_with(
upgraded,
WebSocketConfig::default(),
provider,
SubprotocolRegistry::default(),
)
.await
.unwrap()
.upgrade_with(self.headers)
.await
.unwrap();
f(upgrade_server).await;
});
let builder = Response::builder()
.status(hyper::StatusCode::SWITCHING_PROTOCOLS)
.header(hyper::header::CONNECTION, HEADER_CONNECTION)
.header(hyper::header::UPGRADE, HEADER_UPGRADE)
.header(hyper::header::SEC_WEBSOCKET_ACCEPT, self.key);
let response = builder
.body(Body::default())
.expect("bug: failed to build response");
response
}
@clarkohw, I've exposed the header handling functionality and you should now be able to integrate it into your Axum work. You'll need to depend on |
@clarkohw, do you have an update on this? |
have been juggling a few things. plan to work on this during next two weeks and 100% this month. if that's too slow feel free to take it over. if not, i will take care of it. |
This is an initial pass at integrating
ratchet
withaxum
with a working example. Any feedback is welcome! Note that this adapter pulls from thefastwebsockets
axum integration. Thanks!