diff --git a/quinn-proto/src/config/transport.rs b/quinn-proto/src/config/transport.rs index 4b08f4a77..997bd573a 100644 --- a/quinn-proto/src/config/transport.rs +++ b/quinn-proto/src/config/transport.rs @@ -50,6 +50,9 @@ pub struct TransportConfig { pub(crate) address_discovery_role: address_discovery::Role, pub(crate) max_concurrent_multipath_paths: Option, + + pub(crate) default_path_max_idle_timeout: Option, + pub(crate) default_path_keep_alive_interval: Option, } impl TransportConfig { @@ -356,6 +359,37 @@ impl TransportConfig { self } + /// Sets a default per-path maximum idle timeout + /// + /// If the path is idle for this long the path will be abandoned. Bear in mind this will + /// interact with the [`TransportConfig::max_idle_timeout`], if the last path is + /// abandoned the entire connection will be closed. + /// + /// You can also change this using [`Connection::set_path_max_idle_timeout`] for + /// existing paths. + /// + /// [`Connection::set_path_max_idle_timeout`]: crate::Connection::set_path_max_idle_timeout + pub fn default_path_max_idle_timeout(&mut self, timeout: Option) -> &mut Self { + self.default_path_max_idle_timeout = timeout; + self + } + + /// Sets a default per-path keep alive interval + /// + /// Note that this does not interact with the connection-wide + /// [`TransportConfig::keep_alive_interval`]. This setting will keep this path active, + /// [`TransportConfig::keep_alive_interval`] will keep the connection active, with no + /// control over which path is used for this. + /// + /// You can also change this using [`Connection::set_path_keep_alive_interval`] for + /// existing path. + /// + /// [`Connection::set_path_keep_alive_interval`]: crate::Connection::set_path_keep_alive_interval + pub fn default_path_keep_alive_interval(&mut self, interval: Option) -> &mut Self { + self.default_path_keep_alive_interval = interval; + self + } + /// Get the initial max [`crate::PathId`] this endpoint allows. /// /// Returns `None` if multipath is disabled. @@ -411,6 +445,8 @@ impl Default for TransportConfig { // disabled multipath by default max_concurrent_multipath_paths: None, + default_path_max_idle_timeout: None, + default_path_keep_alive_interval: None, } } } @@ -444,6 +480,8 @@ impl fmt::Debug for TransportConfig { enable_segmentation_offload, address_discovery_role, max_concurrent_multipath_paths, + default_path_max_idle_timeout, + default_path_keep_alive_interval, } = self; fmt.debug_struct("TransportConfig") .field("max_concurrent_bidi_streams", max_concurrent_bidi_streams) @@ -476,6 +514,14 @@ impl fmt::Debug for TransportConfig { "max_concurrent_multipath_paths", max_concurrent_multipath_paths, ) + .field( + "default_path_max_idle_timeout", + default_path_max_idle_timeout, + ) + .field( + "default_path_keep_alive_interval", + default_path_keep_alive_interval, + ) .finish_non_exhaustive() } } diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index a612ea514..a2c144c97 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -580,6 +580,40 @@ impl Connection { .and_then(|path| path.data.status.remote_status) } + /// Sets the max_idle_timeout for a specific path + /// + /// See [`TransportConfig::default_path_max_idle_timeout`] for details. + /// + /// Returns the previous value of the setting. + pub fn set_path_max_idle_timeout( + &mut self, + path_id: PathId, + timeout: Option, + ) -> Result, ClosedPath> { + let path = self + .paths + .get_mut(&path_id) + .ok_or(ClosedPath { _private: () })?; + Ok(std::mem::replace(&mut path.data.idle_timeout, timeout)) + } + + /// Sets the keep_alive_interval for a specific path + /// + /// See [`TransportConfig::default_path_keep_alive_interval`] for details. + /// + /// Returns the previous value of the setting. + pub fn set_path_keep_alive_interval( + &mut self, + path_id: PathId, + interval: Option, + ) -> Result, ClosedPath> { + let path = self + .paths + .get_mut(&path_id) + .ok_or(ClosedPath { _private: () })?; + Ok(std::mem::replace(&mut path.data.keep_alive, interval)) + } + /// Gets the [`PathData`] for a known [`PathId`]. /// /// Will panic if the path_id does not reference any known path. @@ -1508,9 +1542,18 @@ impl Connection { Timer::Idle => { self.kill(ConnectionError::TimedOut); } - Timer::KeepAlive(path_id) => { + Timer::PathIdle(path_id) => { + // TODO(flub): TransportErrorCode::NO_ERROR but where's the API to get + // that into a VarInt? + self.close_path(path_id, VarInt::from_u32(0)); + } + Timer::KeepAlive => { trace!("sending keep-alive"); - self.ping(path_id); + self.ping(); + } + Timer::PathKeepAlive(path_id) => { + trace!(?path_id, "sending keep-alive on path"); + self.ping_path(path_id).ok(); } Timer::LossDetection(path_id) => { self.on_loss_detection_timeout(now, path_id); @@ -1616,10 +1659,25 @@ impl Connection { /// Ping the remote endpoint /// - /// Causes an ACK-eliciting packet to be transmitted. - pub fn ping(&mut self, path: PathId) { - // TODO(@divma): for_path should not be used, we should check if the path still exists - self.spaces[self.highest_space].for_path(path).ping_pending = true; + /// Causes an ACK-eliciting packet to be transmitted on the connection. + pub fn ping(&mut self) { + // TODO(flub): This is very brute-force: it pings *all* the paths. Instead it would + // be nice if we could only send a single packet for this. + for path_data in self.spaces[self.highest_space].number_spaces.values_mut() { + path_data.ping_pending = true; + } + } + + /// Ping the remote endpoint over a specific path + /// + /// Causes an ACK-eliciting packet to be transmitted on the path. + pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> { + let path_data = self.spaces[self.highest_space] + .number_spaces + .get_mut(&path) + .ok_or(ClosedPath { _private: () })?; + path_data.ping_pending = true; + Ok(()) } /// Update traffic keys spontaneously @@ -2409,7 +2467,7 @@ impl Connection { ) { self.total_authed_packets += 1; self.reset_keep_alive(path_id, now); - self.reset_idle_timeout(now, space_id); + self.reset_idle_timeout(now, space_id, path_id); self.permit_idle_reset = true; self.receiving_ecn |= ecn.is_some(); if let Some(x) = ecn { @@ -2447,38 +2505,56 @@ impl Connection { } } - // TODO(flub): figure out if this should take a PathId. We could use an idle timeout on - // each path. We will need to figure out. - fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) { - let timeout = match self.idle_timeout { - None => return, - Some(dur) => dur, - }; - if self.state.is_closed() { - self.timers.stop(Timer::Idle); - return; + /// Resets the idle timeout timers + /// + /// Without multipath there is only the connection-wide idle timeout. When multipath is + /// enabled there is an additional per-path idle timeout. + fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) { + // First reset the global idle timeout. + if let Some(timeout) = self.idle_timeout { + if self.state.is_closed() { + self.timers.stop(Timer::Idle); + } else { + let dt = cmp::max(timeout, 3 * self.pto_max_path(space)); + self.timers.set(Timer::Idle, now + dt); + } + } + + // Now handle the per-path state + if let Some(timeout) = self.path_data(path_id).idle_timeout { + if self.state.is_closed() { + self.timers.stop(Timer::PathIdle(path_id)); + } else { + let dt = cmp::max(timeout, 3 * self.pto(space, path_id)); + self.timers.set(Timer::PathIdle(path_id), now + dt); + } } - // TODO(flub): Wrong PathId, see comment above. - let dt = cmp::max(timeout, 3 * self.pto(space, PathId::ZERO)); - self.timers.set(Timer::Idle, now + dt); } + /// Resets both the [`Timer::KeepAlive`] and [`Timer::PathKeepAlive`] timers fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) { - let interval = match self.config.keep_alive_interval { - Some(x) if self.state.is_established() => x, - _ => return, - }; - self.timers.set(Timer::KeepAlive(path_id), now + interval); + if !self.state.is_established() { + return; + } + + if let Some(interval) = self.config.keep_alive_interval { + self.timers.set(Timer::KeepAlive, now + interval); + } + + if let Some(interval) = self.path_data(path_id).keep_alive { + self.timers + .set(Timer::PathKeepAlive(path_id), now + interval); + } } - /// Sets the timer for when a previously issued CID should be retired next. + /// Sets the timer for when a previously issued CID should be retired next fn reset_cid_retirement(&mut self) { if let Some((_path, t)) = self.next_cid_retirement() { self.timers.set(Timer::PushNewCid, t); } } - /// The next time when a previously issued CID should be retired. + /// The next time when a previously issued CID should be retired fn next_cid_retirement(&self) -> Option<(PathId, Instant)> { self.local_cid_state .iter() @@ -3384,7 +3460,9 @@ impl Connection { // attack. Send a non-probing packet to recover the active path. match self.peer_supports_ack_frequency() { true => self.immediate_ack(path_id), - false => self.ping(path_id), + false => { + self.ping_path(path_id).ok(); + } } } } @@ -3823,8 +3901,7 @@ impl Connection { pub fn local_address_changed(&mut self) { // TODO(flub): if multipath is enabled this needs to create a new path entirely. self.update_rem_cid(PathId(0)); - // TODO(@divma): sending pings to paths that might no longer exist! - self.ping(PathId(0)); + self.ping(); } /// Switch to a previously unused remote connection ID, if possible @@ -4637,7 +4714,10 @@ impl Connection { .filter(|entry| { !matches!( entry.timer, - Timer::KeepAlive(_) | Timer::PushNewCid | Timer::KeyDiscard + Timer::KeepAlive + | Timer::PathKeepAlive(_) + | Timer::PushNewCid + | Timer::KeyDiscard ) }) .min_by_key(|entry| entry.time) diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index 9fbd4d536..25a41c3a0 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -240,7 +240,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { .for_path(path_id) .time_of_last_ack_eliciting_packet = Some(now); if conn.permit_idle_reset { - conn.reset_idle_timeout(now, space_id); + conn.reset_idle_timeout(now, space_id, path_id); } conn.permit_idle_reset = false; } diff --git a/quinn-proto/src/connection/paths.rs b/quinn-proto/src/connection/paths.rs index 2193256f3..97a798829 100644 --- a/quinn-proto/src/connection/paths.rs +++ b/quinn-proto/src/connection/paths.rs @@ -107,6 +107,23 @@ pub(super) struct PathData { first_packet: Option, /// The number of times a PTO has been sent without receiving an ack. pub(super) pto_count: u32, + + // + // Per-path idle & keep alive + // + /// Idle timeout for the path + /// + /// If expired, the path will be abandoned. This is different from the connection-wide + /// idle timeout which closes the connection if expired. + pub(super) idle_timeout: Option, + /// Keep alives to send on this path + /// + /// There is also a connection-level keep alive configured in the + /// [`TransportParameters`]. This triggers activity on any path which can keep the + /// connection alive. + /// + /// [`TransportParameters`]: crate::transport_parameters::TransportParameters + pub(super) keep_alive: Option, } impl PathData { @@ -160,6 +177,8 @@ impl PathData { status: Default::default(), first_packet: None, pto_count: 0, + idle_timeout: None, + keep_alive: None, } } @@ -189,6 +208,8 @@ impl PathData { status: prev.status.clone(), first_packet: None, pto_count: 0, + idle_timeout: prev.idle_timeout, + keep_alive: prev.keep_alive, } } diff --git a/quinn-proto/src/connection/timer.rs b/quinn-proto/src/connection/timer.rs index 4c55b704b..37e7abcb7 100644 --- a/quinn-proto/src/connection/timer.rs +++ b/quinn-proto/src/connection/timer.rs @@ -12,6 +12,8 @@ pub(crate) enum Timer { LossDetection(PathId), /// When to close the connection after no activity Idle, + /// When to abandon a path after no activity + PathIdle(PathId), /// When the close timer expires, the connection has been gracefully terminated. Close, /// When keys are discarded because they should not be needed anymore @@ -19,7 +21,9 @@ pub(crate) enum Timer { /// When to give up on validating a new path to the peer PathValidation(PathId), /// When to send a `PING` frame to keep the connection alive - KeepAlive(PathId), + KeepAlive, + /// When to send a `PING` frame to keep the path alive + PathKeepAlive(PathId), /// When pacing will allow us to send a packet Pacing(PathId), /// When to invalidate old CID and proactively push new one via NEW_CONNECTION_ID frame diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index bb9b06962..cffab22dc 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -205,11 +205,7 @@ fn server_stateless_reset() { pair.server.endpoint = Endpoint::new(endpoint_config, Some(Arc::new(server_config())), true, None); // Force the server to generate the smallest possible stateless reset - pair.client - .connections - .get_mut(&client_ch) - .unwrap() - .ping(PathId(0)); + pair.client.connections.get_mut(&client_ch).unwrap().ping(); info!("resetting"); pair.drive(); assert_matches!( @@ -1183,7 +1179,7 @@ fn idle_timeout() { }; let mut pair = Pair::new(Default::default(), server); let (client_ch, server_ch) = pair.connect(); - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); let start = pair.time; while !pair.client_conn_mut(client_ch).is_closed() @@ -1220,7 +1216,7 @@ fn connection_close_sends_acks() { let client_acks = pair.client_conn_mut(client_ch).stats().frame_rx.acks; - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); let time = pair.time; @@ -1269,7 +1265,7 @@ fn migration() { Ipv4Addr::new(127, 0, 0, 1).into(), CLIENT_PORTS.lock().unwrap().next().unwrap(), ); - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); // Assert that just receiving the ping message is accounted into the servers // anti-amplification budget @@ -1747,7 +1743,7 @@ fn cid_retirement() { active_cid_num = active_cid_num.min(LOC_CID_COUNT); let next_retire_prior_to = active_cid_num + 1; - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); // Server retires all valid remote CIDs pair.server_conn_mut(server_ch) .rotate_local_cid(next_retire_prior_to, Instant::now()); @@ -1930,7 +1926,7 @@ fn tail_loss_small_segment_size() { const DGRAM_NUM: u64 = 5; // Enough to build a GSO batch. info!("Sending an ack-eliciting datagram"); - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); // Drop these packets on the server side. @@ -1983,7 +1979,7 @@ fn tail_loss_respect_max_datagrams() { const DGRAM_NUM: u64 = 5; // Enough to build a GSO batch. info!("Sending an ack-eliciting datagram"); - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); // Drop these packets on the server side. @@ -2393,7 +2389,7 @@ fn loss_probe_requests_immediate_ack() { // Lose a ping let default_mtu = mem::replace(&mut pair.mtu, 0); - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); pair.mtu = default_mtu; @@ -2501,7 +2497,7 @@ fn migrate_detects_new_mtu_and_respects_original_peer_max_udp_payload_size() { Ipv4Addr::new(127, 0, 0, 1).into(), CLIENT_PORTS.lock().unwrap().next().unwrap(), ); - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive(); // Sanity check: the server saw that the client address was updated @@ -2680,7 +2676,7 @@ fn single_ack_eliciting_packet_triggers_ack_after_delay() { let stats_after_connect = pair.client_conn_mut(client_ch).stats(); let start = pair.time; - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); // Send ping pair.drive_server(); // Process ping pair.drive_client(); // Give the client a chance to process an ack, so our assertion can fail @@ -2768,7 +2764,7 @@ fn out_of_order_ack_eliciting_packet_triggers_ack() { // Send a packet that won't arrive right away (it will be dropped and be re-sent later) pair.mtu = 0; - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); // Sanity check (ping sent, no ACK received) @@ -2784,7 +2780,7 @@ fn out_of_order_ack_eliciting_packet_triggers_ack() { // Restore the default MTU and send another ping, which will arrive earlier than the dropped one pair.mtu = default_mtu; - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); pair.drive_server(); pair.drive_client(); @@ -2823,7 +2819,7 @@ fn single_ack_eliciting_packet_with_ce_bit_triggers_immediate_ack() { let start = pair.time; - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.congestion_experienced = true; pair.drive_client(); // Send ping @@ -2889,12 +2885,12 @@ fn ack_frequency_ack_delayed_from_first_of_flight() { // // * 0 ms: ping // * 5 ms: ping x2 - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); pair.time += Duration::from_millis(5); for _ in 0..2 { - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); } @@ -2944,7 +2940,7 @@ fn ack_frequency_ack_sent_after_max_ack_delay() { let (mut pair, client_ch, server_ch) = setup_ack_frequency_test(max_ack_delay); // Client sends a ping - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); // Server: receive the ping, send no ACK @@ -2986,12 +2982,12 @@ fn ack_frequency_ack_sent_after_packets_above_threshold() { // // * 0 ms: ping // * 5 ms: ping (11x) - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); pair.time += Duration::from_millis(5); for _ in 0..11 { - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); } @@ -3035,19 +3031,19 @@ fn ack_frequency_ack_sent_after_reordered_packets_below_threshold() { // * 0 ms: ping // * 5 ms: ping (lost) // * 5 ms: ping - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); pair.time += Duration::from_millis(5); // Send and lose an ack-eliciting packet pair.mtu = 0; - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); // Restore the default MTU and send another ping, which will arrive earlier than the dropped one pair.mtu = DEFAULT_MTU; - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); // Server: receive first ping, send no ACK @@ -3086,20 +3082,20 @@ fn ack_frequency_ack_sent_after_reordered_packets_above_threshold() { let (mut pair, client_ch, server_ch) = setup_ack_frequency_test(max_ack_delay); // Send a ping - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); // Send and lose two ack-eliciting packets pair.time += Duration::from_millis(5); pair.mtu = 0; for _ in 0..2 { - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); } // Restore the default MTU and send another ping, which will arrive earlier than the dropped ones pair.mtu = DEFAULT_MTU; - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive_client(); // Server: receive first ping, send no ACK @@ -3147,7 +3143,7 @@ fn ack_frequency_update_max_delay() { // Client sends a PING info!("first ping"); - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive(); // No change in ACK frequency @@ -3162,7 +3158,7 @@ fn ack_frequency_update_max_delay() { // RTT jumps, client sends another ping info!("delayed ping"); pair.latency *= 10; - pair.client_conn_mut(client_ch).ping(PathId(0)); + pair.client_conn_mut(client_ch).ping(); pair.drive(); // ACK frequency updated @@ -3372,10 +3368,10 @@ fn large_datagram_with_acks() { // Force the client to generate a large ACK frame by dropping several packets for _ in 0..10 { - pair.server_conn_mut(server_ch).ping(PathId(0)); + pair.server_conn_mut(server_ch).ping(); pair.drive_server(); pair.client.inbound.pop_back(); - pair.server_conn_mut(server_ch).ping(PathId(0)); + pair.server_conn_mut(server_ch).ping(); pair.drive_server(); } diff --git a/quinn/src/path.rs b/quinn/src/path.rs index 576987fef..5d6803102 100644 --- a/quinn/src/path.rs +++ b/quinn/src/path.rs @@ -1,6 +1,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll, ready}; +use std::time::Duration; use proto::{ClosedPath, ConnectionError, OpenPathError, PathId, PathStatus, VarInt}; use tokio::sync::oneshot; @@ -100,6 +101,36 @@ impl Path { closed: on_path_close_recv, } } + + /// Sets the keep_alive_interval for a specific path + /// + /// See [`TransportConfig::default_path_keep_alive_interval`] for details. + /// + /// Returns the previous value of the setting. + /// + /// [`TransportConfig::default_path_keep_alive_interval`]: crate::TransportConfig::default_path_keep_alive_interval + pub fn set_max_idle_timeout( + &self, + timeout: Option, + ) -> Result, ClosedPath> { + let mut state = self.conn.state.lock("path_set_max_idle_timeout"); + state.inner.set_path_max_idle_timeout(self.id, timeout) + } + + /// Sets the keep_alive_interval for a specific path + /// + /// See [`TransportConfig::default_path_keep_alive_interval`] for details. + /// + /// Returns the previous value of the setting. + /// + /// [`TransportConfig::default_path_keep_alive_interval`]: crate::TransportConfig::default_path_keep_alive_interval + pub fn set_keep_alive_interval( + &self, + interval: Option, + ) -> Result, ClosedPath> { + let mut state = self.conn.state.lock("path_set_keep_alive_interval"); + state.inner.set_path_keep_alive_interval(self.id, interval) + } } /// Future produced by [`Path::close`]