From 6059d5d61552dfc5d0afad22c9dc43601e1da6c5 Mon Sep 17 00:00:00 2001 From: Alex Pyattaev Date: Wed, 19 Mar 2025 19:54:22 +0000 Subject: [PATCH 1/2] make sure own identity does not get wiped during TVU deduplication --- turbine/src/cluster_nodes.rs | 39 ++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index b946b59c35790a..8f6644dc142f1f 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -244,7 +244,8 @@ impl ClusterNodes { fanout, |k| self.nodes[k].pubkey() == &self.pubkey, weighted_shuffle.shuffle(&mut rng), - ); + ) + .expect("Could not find own pubkey in cluster nodes"); let protocol = get_broadcast_protocol(shred); let peers = peers .filter_map(|k| self.nodes[k].contact_info()?.tvu(protocol)) @@ -310,6 +311,11 @@ pub fn new_cluster_nodes( if broadcast { weighted_shuffle.remove_index(index[&self_pubkey]); } + // Paranoid check to ensure the correct operation of the weighted shuffle. + assert!( + index.contains_key(&self_pubkey), + "Own public key should be present in ClusterNodes.index" + ); ClusterNodes { pubkey: self_pubkey, nodes, @@ -362,7 +368,7 @@ fn get_nodes( .collect(); sort_and_dedup_nodes(&mut nodes); if should_dedup_tvu_addrs { - dedup_tvu_addrs(&mut nodes); + dedup_tvu_addrs(&mut nodes, self_pubkey); }; nodes } @@ -395,7 +401,7 @@ fn cmp_nodes_stake(a: &Node, b: &Node) -> Ordering { // same TVU socket-addr, we only send shreds to one of them. // Additionally limits number of nodes at the same IP address to // MAX_NUM_NODES_PER_IP_ADDRESS. -fn dedup_tvu_addrs(nodes: &mut Vec) { +fn dedup_tvu_addrs(nodes: &mut Vec, keep_identity: Pubkey) { const TVU_PROTOCOLS: [Protocol; 2] = [Protocol::UDP, Protocol::QUIC]; let capacity = nodes.len().saturating_mul(2); // Tracks (Protocol, SocketAddr) tuples already observed. @@ -409,6 +415,11 @@ fn dedup_tvu_addrs(nodes: &mut Vec) { // deterministic shuffle. return node_stake > 0u64; }; + // Do not delete our own identity under any circumstances + // https://github.com/anza-xyz/agave/issues/5356 + if node.pubkey == keep_identity { + return true; + } // Dedup socket addresses and limit nodes at same IP address. for protocol in TVU_PROTOCOLS { let Some(addr) = node.tvu(protocol) else { @@ -451,15 +462,18 @@ fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng { // Each other node retransmits shreds to fanout many nodes in the next layer. // For example the node k in the 1st layer will retransmit to nodes: // fanout + k, 2*fanout + k, ..., fanout*fanout + k +// +// This function will only return Some(result) if `pred` returns true for at least +// one of the items in `nodes` fn get_retransmit_peers( fanout: usize, // Predicate fn which identifies this node in the shuffle. pred: impl Fn(T) -> bool, nodes: impl IntoIterator, -) -> (/*this node's index:*/ usize, impl Iterator) { +) -> Option<(/*this node's index:*/ usize, impl Iterator)> { let mut nodes = nodes.into_iter(); - // This node's index within shuffled nodes. - let index = nodes.by_ref().position(pred).unwrap(); + // This node's index should be somewhere within shuffled indices. + let index = nodes.by_ref().position(pred)?; // Node's index within its neighborhood. let offset = index.saturating_sub(1) % fanout; // First node in the neighborhood. @@ -473,7 +487,7 @@ fn get_retransmit_peers( *state = k; Some(peer) }); - (index, peers) + Some((index, peers)) } // Returns the parent node in the turbine broadcast tree. @@ -818,7 +832,7 @@ mod tests { for (k, peers) in peers.into_iter().enumerate() { { let (index, retransmit_peers) = - get_retransmit_peers(fanout, |node| node == &nodes[k], nodes); + get_retransmit_peers(fanout, |node| node == &nodes[k], nodes).unwrap(); assert_eq!(peers, retransmit_peers.copied().collect::>()); assert_eq!(index, k); } @@ -829,7 +843,8 @@ mod tests { } // Remaining nodes have no children. for k in offset..nodes.len() { - let (index, mut peers) = get_retransmit_peers(fanout, |node| node == &nodes[k], nodes); + let (index, mut peers) = + get_retransmit_peers(fanout, |node| node == &nodes[k], nodes).unwrap(); assert_eq!(peers.next(), None); assert_eq!(index, k); } @@ -970,13 +985,15 @@ mod tests { assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, &nodes), None); for k in 1..size { let parent = get_retransmit_parent(fanout, k, &nodes).unwrap(); - let (index, mut peers) = get_retransmit_peers(fanout, |node| node == &parent, &nodes); + let (index, mut peers) = + get_retransmit_peers(fanout, |node| node == &parent, &nodes).unwrap(); assert_eq!(index, cache[&parent]); assert_eq!(peers.find(|&&peer| peer == nodes[k]), Some(&nodes[k])); } for k in 0..size { let parent = Some(nodes[k]); - let (index, peers) = get_retransmit_peers(fanout, |node| node == &nodes[k], &nodes); + let (index, peers) = + get_retransmit_peers(fanout, |node| node == &nodes[k], &nodes).unwrap(); assert_eq!(index, k); for peer in peers { assert_eq!(get_retransmit_parent(fanout, cache[peer], &nodes), parent); From 92202a07babf5e31fe1d1bc5cb585a539d770414 Mon Sep 17 00:00:00 2001 From: Alex Pyattaev Date: Thu, 20 Mar 2025 15:11:58 +0000 Subject: [PATCH 2/2] address review comments --- turbine/src/cluster_nodes.rs | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index 8f6644dc142f1f..8e22b274aa9925 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -244,8 +244,7 @@ impl ClusterNodes { fanout, |k| self.nodes[k].pubkey() == &self.pubkey, weighted_shuffle.shuffle(&mut rng), - ) - .expect("Could not find own pubkey in cluster nodes"); + ); let protocol = get_broadcast_protocol(shred); let peers = peers .filter_map(|k| self.nodes[k].contact_info()?.tvu(protocol)) @@ -415,8 +414,7 @@ fn dedup_tvu_addrs(nodes: &mut Vec, keep_identity: Pubkey) { // deterministic shuffle. return node_stake > 0u64; }; - // Do not delete our own identity under any circumstances - // https://github.com/anza-xyz/agave/issues/5356 + // Do not delete the provided keep_identity if node.pubkey == keep_identity { return true; } @@ -463,17 +461,20 @@ fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng { // For example the node k in the 1st layer will retransmit to nodes: // fanout + k, 2*fanout + k, ..., fanout*fanout + k // -// This function will only return Some(result) if `pred` returns true for at least +// Panics if `pred` does not return true for at least // one of the items in `nodes` fn get_retransmit_peers( fanout: usize, // Predicate fn which identifies this node in the shuffle. pred: impl Fn(T) -> bool, nodes: impl IntoIterator, -) -> Option<(/*this node's index:*/ usize, impl Iterator)> { +) -> (/*this node's index:*/ usize, impl Iterator) { let mut nodes = nodes.into_iter(); - // This node's index should be somewhere within shuffled indices. - let index = nodes.by_ref().position(pred)?; + // This node's index within shuffled nodes. + let index = nodes + .by_ref() + .position(pred) + .expect("Provided predicate should return true at least once"); // Node's index within its neighborhood. let offset = index.saturating_sub(1) % fanout; // First node in the neighborhood. @@ -487,7 +488,7 @@ fn get_retransmit_peers( *state = k; Some(peer) }); - Some((index, peers)) + (index, peers) } // Returns the parent node in the turbine broadcast tree. @@ -832,7 +833,7 @@ mod tests { for (k, peers) in peers.into_iter().enumerate() { { let (index, retransmit_peers) = - get_retransmit_peers(fanout, |node| node == &nodes[k], nodes).unwrap(); + get_retransmit_peers(fanout, |node| node == &nodes[k], nodes); assert_eq!(peers, retransmit_peers.copied().collect::>()); assert_eq!(index, k); } @@ -843,8 +844,7 @@ mod tests { } // Remaining nodes have no children. for k in offset..nodes.len() { - let (index, mut peers) = - get_retransmit_peers(fanout, |node| node == &nodes[k], nodes).unwrap(); + let (index, mut peers) = get_retransmit_peers(fanout, |node| node == &nodes[k], nodes); assert_eq!(peers.next(), None); assert_eq!(index, k); } @@ -985,15 +985,13 @@ mod tests { assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, &nodes), None); for k in 1..size { let parent = get_retransmit_parent(fanout, k, &nodes).unwrap(); - let (index, mut peers) = - get_retransmit_peers(fanout, |node| node == &parent, &nodes).unwrap(); + let (index, mut peers) = get_retransmit_peers(fanout, |node| node == &parent, &nodes); assert_eq!(index, cache[&parent]); assert_eq!(peers.find(|&&peer| peer == nodes[k]), Some(&nodes[k])); } for k in 0..size { let parent = Some(nodes[k]); - let (index, peers) = - get_retransmit_peers(fanout, |node| node == &nodes[k], &nodes).unwrap(); + let (index, peers) = get_retransmit_peers(fanout, |node| node == &nodes[k], &nodes); assert_eq!(index, k); for peer in peers { assert_eq!(get_retransmit_parent(fanout, cache[peer], &nodes), parent);