diff --git a/chromiumoxide_cdp/src/lib.rs b/chromiumoxide_cdp/src/lib.rs index 933da07..18057ca 100644 --- a/chromiumoxide_cdp/src/lib.rs +++ b/chromiumoxide_cdp/src/lib.rs @@ -147,3 +147,21 @@ impl fmt::Display for StackTrace { Ok(()) } } + +#[cfg(test)] +mod tests { + use chromiumoxide_types::Message; + + use super::cdp::CdpEventMessage; + + // This makes sure we can parse arbitrary numbers (timestamp) as f64 + // and that the Message untagged union works core + #[test] + fn test_event_deserialize_f64() { + let raw = r#"{"method":"Page.lifecycleEvent","params":{"frameId":"B0FCF18A982213C9947D313EAA8F934A","loaderId":"547DA6CC3D4A41314EA08A88BFA62B21","name":"commit","timestamp":1531.878478},"sessionId":"E0BCD37484373226136272710B8CB432"}"#; + + let event = serde_json::from_str::>(raw).unwrap(); + + assert!(matches!(event, Message::Event(_))); + } +} diff --git a/src/browser.rs b/src/browser.rs index 43106a5..ab1fc5d 100644 --- a/src/browser.rs +++ b/src/browser.rs @@ -210,6 +210,7 @@ impl Browser { let handler_config = HandlerConfig { ignore_https_errors: config.ignore_https_errors, + ignore_invalid_messages: config.ignore_invalid_messages, viewport: config.viewport.clone(), context_ids: Vec::new(), request_timeout: config.request_timeout, @@ -642,6 +643,8 @@ pub struct BrowserConfig { /// Ignore https errors, default is true ignore_https_errors: bool, + /// Ignore invalid messages, default is true + ignore_invalid_messages: bool, viewport: Option, /// The duration after a request with no response should time out request_timeout: Duration, @@ -673,6 +676,7 @@ pub struct BrowserConfigBuilder { incognito: bool, launch_timeout: Duration, ignore_https_errors: bool, + ignore_invalid_events: bool, viewport: Option, request_timeout: Duration, args: Vec, @@ -706,6 +710,7 @@ impl Default for BrowserConfigBuilder { incognito: false, launch_timeout: Duration::from_millis(LAUNCH_TIMEOUT), ignore_https_errors: true, + ignore_invalid_events: true, viewport: Some(Default::default()), request_timeout: Duration::from_millis(REQUEST_TIMEOUT), args: Vec::new(), @@ -752,6 +757,13 @@ impl BrowserConfigBuilder { self } + /// The browser handler will return [CdpError::InvalidMessage] if a received + /// message cannot be parsed. + pub fn surface_invalid_messages(mut self) -> Self { + self.ignore_invalid_events = false; + self + } + pub fn port(mut self, port: u16) -> Self { self.port = port; self @@ -887,6 +899,7 @@ impl BrowserConfigBuilder { incognito: self.incognito, launch_timeout: self.launch_timeout, ignore_https_errors: self.ignore_https_errors, + ignore_invalid_messages: self.ignore_invalid_events, viewport: self.viewport, request_timeout: self.request_timeout, args: self.args, diff --git a/src/conn.rs b/src/conn.rs index 78d6c4d..00ee7ad 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -118,56 +118,51 @@ impl Stream for Connection { let pin = self.get_mut(); loop { - loop { - // queue in the next message if not currently flushing - if let Err(err) = pin.start_send_next(cx) { - return Poll::Ready(Some(Err(err))); - } + // queue in the next message if not currently flushing + if let Err(err) = pin.start_send_next(cx) { + return Poll::Ready(Some(Err(err))); + } - // send the message - if let Some(call) = pin.pending_flush.take() { - if pin.ws.poll_ready_unpin(cx).is_ready() { - pin.needs_flush = true; - // try another flush - continue; - } else { - pin.pending_flush = Some(call); - } + // send the message + if let Some(call) = pin.pending_flush.take() { + if pin.ws.poll_ready_unpin(cx).is_ready() { + pin.needs_flush = true; + // try another flush + continue; + } else { + pin.pending_flush = Some(call); } - - break; } - // read from the ws - match ready!(pin.ws.poll_next_unpin(cx)) { - Some(Ok(WsMessage::Text(text))) => { - let ready = match serde_json::from_str::>(&text) { - Ok(msg) => { - tracing::trace!("Received {:?}", msg); - Ok(msg) - } - Err(err) => { - tracing::debug!(target: "chromiumoxide::conn::raw_ws::parse_errors", msg = text, "Failed to parse raw WS message"); - tracing::error!("Failed to deserialize WS response {}", err); - // Go to the next iteration and try reading the next message - // in the hopes we can reconver and continue working. - continue; - } - }; - return Poll::Ready(Some(ready)); - } - Some(Ok(WsMessage::Close(_))) => return Poll::Ready(None), - // ignore ping and pong - Some(Ok(WsMessage::Ping(_))) | Some(Ok(WsMessage::Pong(_))) => { - cx.waker().wake_by_ref(); - return Poll::Pending; - } - Some(Ok(msg)) => return Poll::Ready(Some(Err(CdpError::UnexpectedWsMessage(msg)))), - Some(Err(err)) => return Poll::Ready(Some(Err(CdpError::Ws(err)))), - None => { - // ws connection closed - return Poll::Ready(None); - } + break; + } + + // read from the ws + match ready!(pin.ws.poll_next_unpin(cx)) { + Some(Ok(WsMessage::Text(text))) => { + let ready = match serde_json::from_str::>(&text) { + Ok(msg) => { + tracing::trace!("Received {:?}", msg); + Ok(msg) + } + Err(err) => { + tracing::debug!(target: "chromiumoxide::conn::raw_ws::parse_errors", msg = text, "Failed to parse raw WS message {}", err); + Err(CdpError::InvalidMessage(text, err)) + } + }; + Poll::Ready(Some(ready)) + } + Some(Ok(WsMessage::Close(_))) => Poll::Ready(None), + // ignore ping and pong + Some(Ok(WsMessage::Ping(_))) | Some(Ok(WsMessage::Pong(_))) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + Some(Ok(msg)) => Poll::Ready(Some(Err(CdpError::UnexpectedWsMessage(msg)))), + Some(Err(err)) => Poll::Ready(Some(Err(CdpError::Ws(err)))), + None => { + // ws connection closed + Poll::Ready(None) } } } diff --git a/src/error.rs b/src/error.rs index ce75bab..4aae36e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -61,6 +61,8 @@ pub enum CdpError { JavascriptException(Box), #[error("{0}")] Url(#[from] url::ParseError), + #[error("{1}")] + InvalidMessage(String, serde_json::Error), } impl CdpError { pub fn msg(msg: impl Into) -> Self { diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 836b31e..1c4e3ef 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -624,6 +624,13 @@ impl Stream for Handler { Ok(Message::Event(ev)) => { pin.on_event(ev); } + Err(err @ CdpError::InvalidMessage(_, _)) => { + if pin.config.ignore_invalid_messages { + tracing::warn!("WS Invalid message: {}", err); + } else { + return Poll::Ready(Some(Err(err))); + } + } Err(err) => { tracing::error!("WS Connection error: {:?}", err); return Poll::Ready(Some(Err(err))); @@ -650,6 +657,8 @@ impl Stream for Handler { pub struct HandlerConfig { /// Whether the `NetworkManager`s should ignore https errors pub ignore_https_errors: bool, + /// Whether to ignore invalid messages + pub ignore_invalid_messages: bool, /// Window and device settings pub viewport: Option, /// Context ids to set from the get go @@ -666,6 +675,7 @@ impl Default for HandlerConfig { fn default() -> Self { Self { ignore_https_errors: true, + ignore_invalid_messages: true, viewport: Default::default(), context_ids: Vec::new(), request_timeout: Duration::from_millis(REQUEST_TIMEOUT),