Skip to content

Commit

Permalink
Allow parsing failures (#197)
Browse files Browse the repository at this point in the history
  • Loading branch information
MOZGIII authored Oct 24, 2024
1 parent b827ad8 commit c955148
Showing 1 changed file with 46 additions and 42 deletions.
88 changes: 46 additions & 42 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,52 +118,56 @@ impl<T: EventMessage + Unpin> Stream for Connection<T> {
let pin = self.get_mut();

loop {
// queue in the next message if not currently flushing
if let Err(err) = pin.start_send_next(cx) {
return Poll::Ready(Some(Err(err)));
}

// send the message
if let Some(call) = pin.pending_flush.take() {
if pin.ws.poll_ready_unpin(cx).is_ready() {
pin.needs_flush = true;
// try another flush
continue;
} else {
pin.pending_flush = Some(call);
loop {
// queue in the next message if not currently flushing
if let Err(err) = pin.start_send_next(cx) {
return Poll::Ready(Some(Err(err)));
}
}

break;
}

// read from the ws
match ready!(pin.ws.poll_next_unpin(cx)) {
Some(Ok(WsMessage::Text(text))) => {
let ready = match serde_json::from_str::<Message<T>>(&text) {
Ok(msg) => {
tracing::trace!("Received {:?}", msg);
Ok(msg)
}
Err(err) => {
tracing::debug!(target: "chromiumoxide::conn::raw_ws::parse_errors", msg = text, "Failed to parse raw WS message");
tracing::error!("Failed to deserialize WS response {}", err);
Err(err.into())
// send the message
if let Some(call) = pin.pending_flush.take() {
if pin.ws.poll_ready_unpin(cx).is_ready() {
pin.needs_flush = true;
// try another flush
continue;
} else {
pin.pending_flush = Some(call);
}
};
Poll::Ready(Some(ready))
}
Some(Ok(WsMessage::Close(_))) => Poll::Ready(None),
// ignore ping and pong
Some(Ok(WsMessage::Ping(_))) | Some(Ok(WsMessage::Pong(_))) => {
cx.waker().wake_by_ref();
Poll::Pending
}

break;
}
Some(Ok(msg)) => Poll::Ready(Some(Err(CdpError::UnexpectedWsMessage(msg)))),
Some(Err(err)) => Poll::Ready(Some(Err(CdpError::Ws(err)))),
None => {
// ws connection closed
Poll::Ready(None)

// read from the ws
match ready!(pin.ws.poll_next_unpin(cx)) {
Some(Ok(WsMessage::Text(text))) => {
let ready = match serde_json::from_str::<Message<T>>(&text) {
Ok(msg) => {
tracing::trace!("Received {:?}", msg);
Ok(msg)
}
Err(err) => {
tracing::debug!(target: "chromiumoxide::conn::raw_ws::parse_errors", msg = text, "Failed to parse raw WS message");
tracing::error!("Failed to deserialize WS response {}", err);
// Go to the next iteration and try reading the next message
// in the hopes we can reconver and continue working.
continue;
}
};
return Poll::Ready(Some(ready));
}
Some(Ok(WsMessage::Close(_))) => return Poll::Ready(None),
// ignore ping and pong
Some(Ok(WsMessage::Ping(_))) | Some(Ok(WsMessage::Pong(_))) => {
cx.waker().wake_by_ref();
return Poll::Pending;
}
Some(Ok(msg)) => return Poll::Ready(Some(Err(CdpError::UnexpectedWsMessage(msg)))),
Some(Err(err)) => return Poll::Ready(Some(Err(CdpError::Ws(err)))),
None => {
// ws connection closed
return Poll::Ready(None);
}
}
}
}
Expand Down

0 comments on commit c955148

Please sign in to comment.