Skip to content

Commit

Permalink
feat(core): do not crash the tracer for some socket errors (#1238)
Browse files Browse the repository at this point in the history
  • Loading branch information
fujiapple852 committed Jul 27, 2024
1 parent 0d959aa commit 5ff0737
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 5 deletions.
39 changes: 39 additions & 0 deletions crates/trippy-core/src/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ pub enum ProbeStatus {
/// port. When a probe is skipped, it will be marked as `Skipped` and a
/// new probe will be sent with the same TTL next available sequence number.
Skipped,
/// The probe has failed.
///
/// A probe is considered failed when an error occurs while sending or
/// receiving.
Failed(ProbeFailed),
/// The probe has been sent and is awaiting a response.
///
/// If no response is received within the timeout, the probe will remain
Expand Down Expand Up @@ -110,6 +115,18 @@ impl Probe {
extensions,
}
}

pub(crate) const fn failed(self) -> ProbeFailed {
ProbeFailed {
sequence: self.sequence,
identifier: self.identifier,
src_port: self.src_port,
dest_port: self.dest_port,
ttl: self.ttl,
round: self.round,
sent: self.sent,
}
}
}

/// A complete network tracing probe.
Expand Down Expand Up @@ -153,6 +170,28 @@ pub struct ProbeComplete {
pub extensions: Option<Extensions>,
}

/// A failed network tracing probe.
///
/// A probe is considered failed when an error occurs while sending or
/// receiving.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProbeFailed {
/// The sequence of the probe.
pub sequence: Sequence,
/// The trace identifier.
pub identifier: TraceId,
/// The source port (UDP/TCP only)
pub src_port: Port,
/// The destination port (UDP/TCP only)
pub dest_port: Port,
/// The TTL of the probe.
pub ttl: TimeToLive,
/// Which round the probe belongs to.
pub round: RoundId,
/// Timestamp when the probe was sent.
pub sent: SystemTime,
}

/// The type of ICMP packet received.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IcmpPacketType {
Expand Down
25 changes: 25 additions & 0 deletions crates/trippy-core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ pub struct Hop {
total_sent: usize,
/// The total probes received for this hop.
total_recv: usize,
/// The total probes that failed for this hop.
total_failed: usize,
/// The total round trip time for this hop across all rounds.
total_time: Duration,
/// The round trip time for this hop in the current round.
Expand Down Expand Up @@ -237,6 +239,12 @@ impl Hop {
self.total_recv
}

/// The total number of probes that failed.
#[must_use]
pub const fn total_failed(&self) -> usize {
self.total_failed
}

/// The % of packets that are lost.
#[must_use]
pub fn loss_pct(&self) -> f64 {
Expand Down Expand Up @@ -359,6 +367,7 @@ impl Default for Hop {
addrs: IndexMap::default(),
total_sent: 0,
total_recv: 0,
total_failed: 0,
total_time: Duration::default(),
last: None,
best: None,
Expand Down Expand Up @@ -533,6 +542,21 @@ impl FlowState {
self.hops[index].last_dest_port = awaited.dest_port.0;
self.hops[index].last_sequence = awaited.sequence.0;
}
ProbeStatus::Failed(failed) => {
self.update_lowest_ttl(failed.ttl);
self.update_round(failed.round);
let index = usize::from(failed.ttl.0) - 1;
self.hops[index].total_sent += 1;
self.hops[index].total_failed += 1;
self.hops[index].ttl = failed.ttl.0;
self.hops[index].samples.insert(0, Duration::default());
if self.hops[index].samples.len() > self.max_samples {
self.hops[index].samples.pop();
}
self.hops[index].last_src_port = failed.src_port.0;
self.hops[index].last_dest_port = failed.dest_port.0;
self.hops[index].last_sequence = failed.sequence.0;
}
ProbeStatus::NotSent | ProbeStatus::Skipped => {}
}
}
Expand Down Expand Up @@ -712,6 +736,7 @@ mod tests {
Self::Skipped => Self::Skipped,
Self::Awaited(awaited) => Self::Awaited(Probe { round, ..awaited }),
Self::Complete(completed) => Self::Complete(ProbeComplete { round, ..completed }),
Self::Failed(_) => todo!(),
}
}
}
Expand Down
44 changes: 39 additions & 5 deletions crates/trippy-core/src/strategy.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use self::state::TracerState;
use crate::config::StrategyConfig;
use crate::error::{Error, Result};
use crate::error::{Error, IoError, Result};
use crate::net::Network;
use crate::probe::{
ProbeStatus, Response, ResponseData, ResponseSeq, ResponseSeqIcmp, ResponseSeqTcp,
ResponseSeqUdp,
};
use crate::types::{Checksum, Sequence, TimeToLive, TraceId};
use crate::{Extensions, IcmpPacketType, MultipathStrategy, PortDirection, Protocol};
use crate::{Extensions, IcmpPacketType, MultipathStrategy, PortDirection, Probe, Protocol};
use std::net::IpAddr;
use std::time::{Duration, SystemTime};
use tracing::instrument;
Expand Down Expand Up @@ -99,16 +99,20 @@ impl<F: Fn(&Round<'_>)> Strategy<F> {
let sent = SystemTime::now();
match self.config.protocol {
Protocol::Icmp => {
network.send_probe(st.next_probe(sent))?;
let probe = st.next_probe(sent);
Self::do_send(network, st, probe)?;
}
Protocol::Udp => {
let probe = st.next_probe(sent);
Self::do_send(network, st, probe)?;
}
Protocol::Udp => network.send_probe(st.next_probe(sent))?,
Protocol::Tcp => {
let mut probe = if st.round_has_capacity() {
st.next_probe(sent)
} else {
return Err(Error::InsufficientCapacity);
};
while let Err(err) = network.send_probe(probe) {
while let Err(err) = Self::do_send(network, st, probe) {
match err {
Error::AddressInUse(_) => {
if st.round_has_capacity() {
Expand All @@ -126,6 +130,23 @@ impl<F: Fn(&Round<'_>)> Strategy<F> {
Ok(())
}

/// Send the probe and handle errors.
///
/// Some errors are transient and should not be considered fatal. In these cases we mark the
/// probe as failed and continue.
fn do_send<N: Network>(network: &mut N, st: &mut TracerState, probe: Probe) -> Result<()> {
match network.send_probe(probe) {
Ok(()) => Ok(()),
Err(Error::IoError(
IoError::Bind(_, _) | IoError::Connect(_, _) | IoError::SendTo(_, _),
)) => {
st.fail_probe();
Ok(())
}
Err(err) => Err(err),
}
}

/// Read and process the next incoming `ICMP` packet.
///
/// We allow multiple probes to be in-flight at any time, and we cannot guarantee that responses
Expand Down Expand Up @@ -1010,6 +1031,19 @@ mod state {
probe
}

/// Mark the `ProbeStatus` at the current `sequence` as failed.
#[instrument(skip(self))]
pub fn fail_probe(&mut self) {
let probe_index = usize::from(self.sequence - self.round_sequence);
let probe = self.buffer[probe_index - 1].clone();
match probe {
ProbeStatus::Awaited(awaited) => {
self.buffer[probe_index - 1] = ProbeStatus::Failed(awaited.failed());
}
_ => todo!(),
}
}

/// Determine the `src_port`, `dest_port` and `identifier` for the current probe.
///
/// This will differ depending on the `TracerProtocol`, `MultipathStrategy` &
Expand Down

0 comments on commit 5ff0737

Please sign in to comment.