Skip to content

Commit 1586c82

Browse files
fix: use more specific poll_writable
1 parent aefc314 commit 1586c82

File tree

9 files changed

+295
-34
lines changed

9 files changed

+295
-34
lines changed

Cargo.lock

Lines changed: 3 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,9 @@ unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)", "cfg(iroh_l
4040

4141
[workspace.lints.clippy]
4242
unused-async = "warn"
43+
44+
45+
[patch.crates-io]
46+
iroh-quinn = { git = "https://github.com/n0-computer/quinn", branch = "iroh-fix-poll-writable" }
47+
iroh-quinn-udp = { git = "https://github.com/n0-computer/quinn", branch = "iroh-fix-poll-writable" }
48+
iroh-quinn-proto = { git = "https://github.com/n0-computer/quinn", branch = "iroh-fix-poll-writable" }

iroh/src/magicsock.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,17 @@ impl MagicSock {
468468
Err(io::Error::other("no valid socket available"))
469469
}
470470

471+
pub(super) fn addr_for_send(
472+
&self,
473+
dest: NodeIdMappedAddr,
474+
) -> Option<(PublicKey, Option<SocketAddr>, Option<RelayUrl>)> {
475+
self.node_map.addr_for_send(
476+
dest,
477+
self.ipv6_reported.load(Ordering::Relaxed),
478+
&self.metrics.magicsock,
479+
)
480+
}
481+
471482
/// Implementation for AsyncUdpSocket::try_send
472483
#[instrument(skip_all)]
473484
fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> {
@@ -1621,7 +1632,8 @@ enum DiscoBoxError {
16211632

16221633
impl AsyncUdpSocket for MagicSock {
16231634
fn create_io_poller(self: Arc<Self>) -> Pin<Box<dyn quinn::UdpPoller>> {
1624-
self.transports.create_io_poller()
1635+
self.transports
1636+
.create_io_poller(self.clone(), self.ip_mapped_addrs.clone())
16251637
}
16261638

16271639
fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> {

iroh/src/magicsock/node_map.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,22 @@ impl NodeMap {
265265
Some((public_key, udp_addr, relay_url, msgs))
266266
}
267267

268+
#[allow(clippy::type_complexity)]
269+
pub(super) fn addr_for_send(
270+
&self,
271+
addr: NodeIdMappedAddr,
272+
have_ipv6: bool,
273+
metrics: &Metrics,
274+
) -> Option<(PublicKey, Option<SocketAddr>, Option<RelayUrl>)> {
275+
let mut inner = self.inner.lock().expect("poisoned");
276+
let ep = inner.get_mut(NodeStateKey::NodeIdMappedAddr(addr))?;
277+
let public_key = *ep.public_key();
278+
trace!(dest = %addr, node_id = %public_key.fmt_short(), "dst mapped to NodeId");
279+
let now = Instant::now();
280+
let (udp_addr, relay_url) = ep.addr_for_send(&now, have_ipv6, metrics);
281+
Some((public_key, udp_addr, relay_url))
282+
}
283+
268284
pub(super) fn notify_shutdown(&self) {
269285
let mut inner = self.inner.lock().expect("poisoned");
270286
for (_, ep) in inner.node_states_mut() {

iroh/src/magicsock/node_map/node_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ impl NodeState {
277277
/// Returns the address(es) that should be used for sending the next packet.
278278
///
279279
/// This may return to send on one, both or no paths.
280-
fn addr_for_send(
280+
pub(super) fn addr_for_send(
281281
&mut self,
282282
now: &Instant,
283283
have_ipv6: bool,

iroh/src/magicsock/transports.rs

Lines changed: 71 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,20 @@ use std::{
88

99
use iroh_base::{NodeId, RelayUrl};
1010
use n0_watcher::Watcher;
11-
#[cfg(not(wasm_browser))]
12-
use netwatch::UdpSocket;
1311
use relay::RelayDatagramSendChannelSender;
1412
use tracing::{trace, warn};
1513

1614
#[cfg(not(wasm_browser))]
1715
mod ip;
1816
mod relay;
1917

18+
#[cfg(not(wasm_browser))]
19+
use self::ip::IpIoPoller;
2020
#[cfg(not(wasm_browser))]
2121
pub(crate) use self::ip::IpTransport;
2222
pub(crate) use self::relay::{RelayActorConfig, RelayTransport};
23-
use super::NetInfo;
23+
use super::{MagicSock, MappedAddr, NetInfo};
24+
use crate::net_report::IpMappedAddresses;
2425

2526
/// Manages the different underlying data transports that the magicsock
2627
/// can support.
@@ -250,7 +251,11 @@ impl Transports {
250251
}
251252
}
252253

253-
pub(crate) fn create_io_poller(&self) -> Pin<Box<dyn quinn::UdpPoller>> {
254+
pub(crate) fn create_io_poller(
255+
&self,
256+
msock: Arc<MagicSock>,
257+
ip_mapped_addrs: IpMappedAddresses,
258+
) -> Pin<Box<dyn quinn::UdpPoller>> {
254259
#[cfg(not(wasm_browser))]
255260
let ip_pollers = self.ip.iter().map(|t| t.create_io_poller()).collect();
256261

@@ -260,6 +265,8 @@ impl Transports {
260265
#[cfg(not(wasm_browser))]
261266
ip_pollers,
262267
relay_pollers,
268+
ip_mapped_addrs,
269+
msock,
263270
})
264271
}
265272

@@ -380,29 +387,74 @@ impl Addr {
380387
#[derive(Debug)]
381388
pub struct IoPoller {
382389
#[cfg(not(wasm_browser))]
383-
ip_pollers: Vec<Arc<UdpSocket>>,
390+
ip_pollers: Vec<IpIoPoller>,
384391
relay_pollers: Vec<RelayDatagramSendChannelSender>,
392+
ip_mapped_addrs: IpMappedAddresses,
393+
msock: Arc<MagicSock>, // :(
385394
}
386395

387396
impl quinn::UdpPoller for IoPoller {
388-
fn poll_writable(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
389-
// This version returns Ready as soon as any of them are ready.
390-
397+
fn poll_writable(
398+
mut self: Pin<&mut Self>,
399+
cx: &mut Context,
400+
transmit: &quinn_proto::Transmit,
401+
) -> Poll<io::Result<()>> {
391402
let this = &mut *self;
392-
#[cfg(not(wasm_browser))]
393-
for poller in &mut this.ip_pollers {
394-
match poller.poll_writable(cx) {
395-
Poll::Ready(_) => return Poll::Ready(Ok(())),
396-
Poll::Pending => (),
397-
}
398-
}
399403

400-
for poller in &mut this.relay_pollers {
401-
match poller.poll_writable(cx) {
402-
Poll::Ready(_) => return Poll::Ready(Ok(())),
403-
Poll::Pending => (),
404+
match MappedAddr::from(transmit.destination) {
405+
MappedAddr::None(_dest) => {
406+
// return Poll::Ready(Err(io::Error::other("Cannot convert to a mapped address.")));
407+
}
408+
MappedAddr::NodeId(dest) => {
409+
// Get the node's relay address and best direct address, as well
410+
// as any pings that need to be sent for hole-punching purposes.
411+
match this.msock.addr_for_send(dest) {
412+
Some((_node_id, udp_addr, relay_url)) => {
413+
#[cfg(not(wasm_browser))]
414+
if let Some(addr) = udp_addr {
415+
for poller in &mut this.ip_pollers {
416+
if poller.is_valid_send_addr(&addr) {
417+
match poller.poll_writable(cx) {
418+
Poll::Ready(_) => return Poll::Ready(Ok(())),
419+
Poll::Pending => (),
420+
}
421+
}
422+
}
423+
}
424+
if let Some(_url) = relay_url {
425+
for poller in &mut this.relay_pollers {
426+
match poller.poll_writable(cx) {
427+
Poll::Ready(_) => return Poll::Ready(Ok(())),
428+
Poll::Pending => (),
429+
}
430+
}
431+
}
432+
}
433+
None => {
434+
// return Poll::Ready(Err(io::Error::other(
435+
// "no NodeState for mapped address",
436+
// )));
437+
}
438+
}
404439
}
440+
#[cfg(not(wasm_browser))]
441+
MappedAddr::Ip(addr) => match this.ip_mapped_addrs.get_ip_addr(&addr) {
442+
Some(addr) => {
443+
for poller in &mut this.ip_pollers {
444+
if poller.is_valid_send_addr(&addr) {
445+
match poller.poll_writable(cx) {
446+
Poll::Ready(_) => return Poll::Ready(Ok(())),
447+
Poll::Pending => (),
448+
}
449+
}
450+
}
451+
}
452+
None => {
453+
// return Poll::Ready(Err(io::Error::other("unknown mapped address")));
454+
}
455+
},
405456
}
457+
406458
Poll::Pending
407459
}
408460
}

iroh/src/magicsock/transports/ip.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,11 @@ impl IpTransport {
3838
}
3939
}
4040

41-
pub(super) fn create_io_poller(&self) -> Arc<UdpSocket> {
42-
self.socket.clone()
41+
pub(super) fn create_io_poller(&self) -> IpIoPoller {
42+
IpIoPoller {
43+
bind_addr: self.bind_addr,
44+
socket: self.socket.clone(),
45+
}
4346
}
4447

4548
pub(super) fn poll_send(
@@ -57,6 +60,7 @@ impl IpTransport {
5760
segment_size: transmit.segment_size,
5861
src_ip: src,
5962
});
63+
trace!("send res: {:?}", res);
6064

6165
match res {
6266
Ok(res) => {
@@ -153,3 +157,24 @@ impl IpTransport {
153157
self.socket.clone()
154158
}
155159
}
160+
161+
#[derive(Debug)]
162+
pub(super) struct IpIoPoller {
163+
socket: Arc<UdpSocket>,
164+
bind_addr: SocketAddr,
165+
}
166+
167+
impl IpIoPoller {
168+
pub(super) fn is_valid_send_addr(&self, addr: &SocketAddr) -> bool {
169+
#[allow(clippy::match_like_matches_macro)]
170+
match (self.bind_addr, addr) {
171+
(SocketAddr::V4(_), SocketAddr::V4(..)) => true,
172+
(SocketAddr::V6(_), SocketAddr::V6(..)) => true,
173+
_ => false,
174+
}
175+
}
176+
177+
pub(super) fn poll_writable(&self, cx: &mut std::task::Context<'_>) -> Poll<io::Result<()>> {
178+
self.socket.poll_writable(cx)
179+
}
180+
}

iroh/src/protocol.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,6 @@ mod tests {
446446
use n0_watcher::Watcher;
447447
use quinn::ApplicationClose;
448448
use testresult::TestResult;
449-
use tracing_test::traced_test;
450449

451450
use super::*;
452451
use crate::{endpoint::ConnectionError, RelayMode};
@@ -490,17 +489,24 @@ mod tests {
490489
}
491490
}
492491

492+
// #[tokio::test(flavor = "multi_thread")]
493493
#[tokio::test]
494-
#[traced_test]
495494
async fn test_limiter() -> Result<()> {
496-
let e1 = Endpoint::builder().bind().await?;
495+
tracing_subscriber::fmt::try_init().ok();
496+
let e1 = Endpoint::builder()
497+
.relay_mode(RelayMode::Disabled)
498+
.bind()
499+
.await?;
497500
// deny all access
498501
let proto = AccessLimit::new(Echo, |_node_id| false);
499502
let r1 = Router::builder(e1.clone()).accept(ECHO_ALPN, proto).spawn();
500503

501504
let addr1 = r1.endpoint().node_addr().initialized().await?;
502-
503-
let e2 = Endpoint::builder().bind().await?;
505+
dbg!(&addr1);
506+
let e2 = Endpoint::builder()
507+
.relay_mode(RelayMode::Disabled)
508+
.bind()
509+
.await?;
504510

505511
println!("connecting");
506512
let conn = e2.connect(addr1, ECHO_ALPN).await?;
@@ -515,8 +521,10 @@ mod tests {
515521
Ok(())
516522
}
517523

524+
// #[tokio::test(flavor = "multi_thread")]
518525
#[tokio::test]
519526
async fn test_graceful_shutdown() -> TestResult {
527+
tracing_subscriber::fmt::try_init().ok();
520528
#[derive(Debug, Clone, Default)]
521529
struct TestProtocol {
522530
connections: Arc<Mutex<Vec<Connection>>>,
@@ -534,34 +542,44 @@ mod tests {
534542
}
535543

536544
fn shutdown(&self) -> BoxFuture<()> {
545+
eprintln!("shutting down");
537546
let this = self.clone();
538547
Box::pin(async move {
539548
tokio::time::sleep(Duration::from_millis(100)).await;
549+
eprintln!("waiting for lock");
540550
let mut connections = this.connections.lock().expect("poisoned");
551+
eprintln!("got lock");
541552
for conn in connections.drain(..) {
553+
eprintln!("closing");
542554
conn.close(42u32.into(), b"shutdown");
543555
}
544556
})
545557
}
546558
}
547559

560+
eprintln!("creating ep1");
548561
let endpoint = Endpoint::builder()
549562
.relay_mode(RelayMode::Disabled)
550563
.bind()
551564
.await?;
552565
let router = Router::builder(endpoint)
553566
.accept(TEST_ALPN, TestProtocol::default())
554567
.spawn();
568+
eprintln!("waiting for node addr");
555569
let addr = router.endpoint().node_addr().initialized().await?;
556570

571+
eprintln!("creating ep2");
557572
let endpoint2 = Endpoint::builder()
558573
.relay_mode(RelayMode::Disabled)
559574
.bind()
560575
.await?;
576+
eprintln!("connecting to {:?}", addr);
561577
let conn = endpoint2.connect(addr, TEST_ALPN).await?;
562578

579+
eprintln!("starting shutdown");
563580
router.shutdown().await?;
564581

582+
eprintln!("waiting for closed conn");
565583
let reason = conn.closed().await;
566584
assert_eq!(
567585
reason,

0 commit comments

Comments
 (0)