From a58a1929094e1972ec3c9b16fbcfcd2f9e292122 Mon Sep 17 00:00:00 2001 From: FujiApple Date: Wed, 24 Jul 2024 21:54:20 +0800 Subject: [PATCH] feat(core): do not crash the tracer for some socket errors (#1238) --- crates/trippy-core/src/error.rs | 4 ++ crates/trippy-core/src/net/common.rs | 2 +- crates/trippy-core/src/net/ipv4.rs | 16 +++++++- crates/trippy-core/src/net/platform/unix.rs | 6 +++ .../trippy-core/src/net/platform/windows.rs | 6 ++- crates/trippy-core/src/probe.rs | 39 ++++++++++++++++++ crates/trippy-core/src/state.rs | 25 ++++++++++++ crates/trippy-core/src/strategy.rs | 40 +++++++++++++++++-- 8 files changed, 130 insertions(+), 8 deletions(-) diff --git a/crates/trippy-core/src/error.rs b/crates/trippy-core/src/error.rs index 43d555744..7d80353f1 100644 --- a/crates/trippy-core/src/error.rs +++ b/crates/trippy-core/src/error.rs @@ -19,6 +19,8 @@ pub enum Error { BadConfig(String), #[error("IO error: {0}")] IoError(#[from] IoError), + #[error("Probe failed to send: {0}")] + ProbeFailed(IoError), #[error("insufficient buffer capacity")] InsufficientCapacity, #[error("address {0} in use")] @@ -66,6 +68,8 @@ impl IoError { #[derive(Debug, Eq, PartialEq)] pub enum ErrorKind { InProgress, + HostUnreachable, + NetUnreachable, Std(io::ErrorKind), } diff --git a/crates/trippy-core/src/net/common.rs b/crates/trippy-core/src/net/common.rs index c330a645b..60ff026e8 100644 --- a/crates/trippy-core/src/net/common.rs +++ b/crates/trippy-core/src/net/common.rs @@ -12,7 +12,7 @@ pub fn process_result(addr: SocketAddr, res: IoResult<()>) -> Result<()> { Err(err) => match err.kind() { ErrorKind::InProgress => Ok(()), ErrorKind::Std(io::ErrorKind::AddrInUse) => Err(Error::AddressInUse(addr)), - ErrorKind::Std(_) => Err(Error::IoError(err)), + _ => Err(Error::IoError(err)), }, } } diff --git a/crates/trippy-core/src/net/ipv4.rs b/crates/trippy-core/src/net/ipv4.rs index 98fdd84db..343b2ff4f 100644 --- a/crates/trippy-core/src/net/ipv4.rs +++ b/crates/trippy-core/src/net/ipv4.rs @@ -108,7 +108,21 @@ impl Ipv4 { echo_request.packet(), )?; let remote_addr = SocketAddr::new(IpAddr::V4(self.dest_addr), 0); - icmp_send_socket.send_to(ipv4.packet(), remote_addr)?; + icmp_send_socket + .send_to(ipv4.packet(), remote_addr) + .map_err(|err| { + // TODO + #[allow(clippy::if_same_then_else)] + if err.kind() == ErrorKind::HostUnreachable { + Error::ProbeFailed(err) + } else if err.kind() == ErrorKind::Std(io::ErrorKind::InvalidInput) { + Error::ProbeFailed(err) + } else if err.kind() == ErrorKind::NetUnreachable { + Error::ProbeFailed(err) + } else { + Error::IoError(err) + } + })?; Ok(()) } diff --git a/crates/trippy-core/src/net/platform/unix.rs b/crates/trippy-core/src/net/platform/unix.rs index 19f6e1b1f..b720487b1 100644 --- a/crates/trippy-core/src/net/platform/unix.rs +++ b/crates/trippy-core/src/net/platform/unix.rs @@ -491,6 +491,10 @@ mod socket { fn from(value: &io::Error) -> Self { if value.raw_os_error() == io::Error::from(Error::EINPROGRESS).raw_os_error() { Self::InProgress + } else if value.raw_os_error() == io::Error::from(Error::EHOSTUNREACH).raw_os_error() { + Self::HostUnreachable + } else if value.raw_os_error() == io::Error::from(Error::ENETUNREACH).raw_os_error() { + Self::NetUnreachable } else { Self::Std(value.kind()) } @@ -502,6 +506,8 @@ mod socket { fn from(value: ErrorKind) -> Self { match value { ErrorKind::InProgress => Self::from(Error::EINPROGRESS), + ErrorKind::HostUnreachable => Self::from(Error::EHOSTUNREACH), + ErrorKind::NetUnreachable => Self::from(Error::ENETUNREACH), ErrorKind::Std(kind) => Self::from(kind), } } diff --git a/crates/trippy-core/src/net/platform/windows.rs b/crates/trippy-core/src/net/platform/windows.rs index c1c135f9c..cb98ea29c 100644 --- a/crates/trippy-core/src/net/platform/windows.rs +++ b/crates/trippy-core/src/net/platform/windows.rs @@ -20,8 +20,8 @@ use windows_sys::Win32::Networking::WinSock::{ IN_ADDR_0, IPPROTO_RAW, IPPROTO_TCP, SIO_ROUTING_INTERFACE_QUERY, SOCKADDR_IN, SOCKADDR_IN6, SOCKADDR_IN6_0, SOCKADDR_STORAGE, SOCKET_ERROR, SOL_SOCKET, SO_ERROR, SO_PORT_SCALABILITY, SO_REUSE_UNICASTPORT, TCP_FAIL_CONNECT_ON_ICMP_ERROR, TCP_ICMP_ERROR_INFO, WSABUF, WSADATA, - WSAEADDRNOTAVAIL, WSAECONNREFUSED, WSAEHOSTUNREACH, WSAEINPROGRESS, WSA_IO_INCOMPLETE, - WSA_IO_PENDING, + WSAEADDRNOTAVAIL, WSAECONNREFUSED, WSAEHOSTUNREACH, WSAEINPROGRESS, WSAENETUNREACH, + WSA_IO_INCOMPLETE, WSA_IO_PENDING, }; use windows_sys::Win32::System::IO::OVERLAPPED; @@ -605,6 +605,8 @@ impl From for StdIoError { fn from(value: ErrorKind) -> Self { match value { ErrorKind::InProgress => Self::from_raw_os_error(WSAEINPROGRESS), + ErrorKind::HostUnreachable => Self::from_raw_os_error(WSAEHOSTUNREACH), + ErrorKind::NetUnreachable => Self::from_raw_os_error(WSAENETUNREACH), ErrorKind::Std(kind) => Self::from(kind), } } diff --git a/crates/trippy-core/src/probe.rs b/crates/trippy-core/src/probe.rs index 21d34cacf..964583f82 100644 --- a/crates/trippy-core/src/probe.rs +++ b/crates/trippy-core/src/probe.rs @@ -24,6 +24,11 @@ pub enum ProbeStatus { /// port. When a probe is skipped, it will be marked as `Skipped` and a /// new probe will be sent with the same TTL next available sequence number. Skipped, + /// The probe has failed. + /// + /// A probe is considered failed when an error occurs while sending or + /// receiving. + Failed(ProbeFailed), /// The probe has been sent and is awaiting a response. /// /// If no response is received within the timeout, the probe will remain @@ -110,6 +115,18 @@ impl Probe { extensions, } } + + pub(crate) const fn failed(self) -> ProbeFailed { + ProbeFailed { + sequence: self.sequence, + identifier: self.identifier, + src_port: self.src_port, + dest_port: self.dest_port, + ttl: self.ttl, + round: self.round, + sent: self.sent, + } + } } /// A complete network tracing probe. @@ -153,6 +170,28 @@ pub struct ProbeComplete { pub extensions: Option, } +/// A failed network tracing probe. +/// +/// A probe is considered failed when an error occurs while sending or +/// receiving. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProbeFailed { + /// The sequence of the probe. + pub sequence: Sequence, + /// The trace identifier. + pub identifier: TraceId, + /// The source port (UDP/TCP only) + pub src_port: Port, + /// The destination port (UDP/TCP only) + pub dest_port: Port, + /// The TTL of the probe. + pub ttl: TimeToLive, + /// Which round the probe belongs to. + pub round: RoundId, + /// Timestamp when the probe was sent. + pub sent: SystemTime, +} + /// The type of ICMP packet received. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum IcmpPacketType { diff --git a/crates/trippy-core/src/state.rs b/crates/trippy-core/src/state.rs index 05ed6ff64..fd6b0d154 100644 --- a/crates/trippy-core/src/state.rs +++ b/crates/trippy-core/src/state.rs @@ -169,6 +169,8 @@ pub struct Hop { total_sent: usize, /// The total probes received for this hop. total_recv: usize, + /// The total probes that failed for this hop. + total_failed: usize, /// The total round trip time for this hop across all rounds. total_time: Duration, /// The round trip time for this hop in the current round. @@ -237,6 +239,12 @@ impl Hop { self.total_recv } + /// The total number of probes that failed. + #[must_use] + pub const fn total_failed(&self) -> usize { + self.total_failed + } + /// The % of packets that are lost. #[must_use] pub fn loss_pct(&self) -> f64 { @@ -359,6 +367,7 @@ impl Default for Hop { addrs: IndexMap::default(), total_sent: 0, total_recv: 0, + total_failed: 0, total_time: Duration::default(), last: None, best: None, @@ -533,6 +542,21 @@ impl FlowState { self.hops[index].last_dest_port = awaited.dest_port.0; self.hops[index].last_sequence = awaited.sequence.0; } + ProbeStatus::Failed(failed) => { + self.update_lowest_ttl(failed.ttl); + self.update_round(failed.round); + let index = usize::from(failed.ttl.0) - 1; + self.hops[index].total_sent += 1; + self.hops[index].total_failed += 1; + self.hops[index].ttl = failed.ttl.0; + self.hops[index].samples.insert(0, Duration::default()); + if self.hops[index].samples.len() > self.max_samples { + self.hops[index].samples.pop(); + } + self.hops[index].last_src_port = failed.src_port.0; + self.hops[index].last_dest_port = failed.dest_port.0; + self.hops[index].last_sequence = failed.sequence.0; + } ProbeStatus::NotSent | ProbeStatus::Skipped => {} } } @@ -712,6 +736,7 @@ mod tests { Self::Skipped => Self::Skipped, Self::Awaited(awaited) => Self::Awaited(Probe { round, ..awaited }), Self::Complete(completed) => Self::Complete(ProbeComplete { round, ..completed }), + Self::Failed(_) => todo!(), } } } diff --git a/crates/trippy-core/src/strategy.rs b/crates/trippy-core/src/strategy.rs index 4692412e4..0a3ddcc2f 100644 --- a/crates/trippy-core/src/strategy.rs +++ b/crates/trippy-core/src/strategy.rs @@ -7,7 +7,7 @@ use crate::probe::{ ResponseSeqUdp, }; use crate::types::{Checksum, Sequence, TimeToLive, TraceId}; -use crate::{Extensions, IcmpPacketType, MultipathStrategy, PortDirection, Protocol}; +use crate::{Extensions, IcmpPacketType, MultipathStrategy, PortDirection, Probe, Protocol}; use std::net::IpAddr; use std::time::{Duration, SystemTime}; use tracing::instrument; @@ -99,16 +99,20 @@ impl)> Strategy { let sent = SystemTime::now(); match self.config.protocol { Protocol::Icmp => { - network.send_probe(st.next_probe(sent))?; + let probe = st.next_probe(sent); + Self::do_send(network, st, probe)?; + } + Protocol::Udp => { + let probe = st.next_probe(sent); + Self::do_send(network, st, probe)?; } - Protocol::Udp => network.send_probe(st.next_probe(sent))?, Protocol::Tcp => { let mut probe = if st.round_has_capacity() { st.next_probe(sent) } else { return Err(Error::InsufficientCapacity); }; - while let Err(err) = network.send_probe(probe) { + while let Err(err) = Self::do_send(network, st, probe) { match err { Error::AddressInUse(_) => { if st.round_has_capacity() { @@ -126,6 +130,21 @@ impl)> Strategy { Ok(()) } + /// Send the probe and handle errors. + /// + /// Some errors are transient and should not be considered fatal. In these cases we mark the + /// probe as failed and continue. + fn do_send(network: &mut N, st: &mut TracerState, probe: Probe) -> Result<()> { + match network.send_probe(probe) { + Ok(()) => Ok(()), + Err(Error::ProbeFailed(_)) => { + st.fail_probe(); + Ok(()) + } + Err(err) => Err(err), + } + } + /// Read and process the next incoming `ICMP` packet. /// /// We allow multiple probes to be in-flight at any time, and we cannot guarantee that responses @@ -1010,6 +1029,19 @@ mod state { probe } + /// Mark the `ProbeStatus` at the current `sequence` as failed. + #[instrument(skip(self))] + pub fn fail_probe(&mut self) { + let probe_index = usize::from(self.sequence - self.round_sequence); + let probe = self.buffer[probe_index - 1].clone(); + match probe { + ProbeStatus::Awaited(awaited) => { + self.buffer[probe_index - 1] = ProbeStatus::Failed(awaited.failed()); + } + _ => todo!(), + } + } + /// Determine the `src_port`, `dest_port` and `identifier` for the current probe. /// /// This will differ depending on the `TracerProtocol`, `MultipathStrategy` &