diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index b946b59c35790a..ade26e965dbe57 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -304,6 +304,11 @@ pub fn new_cluster_nodes( .enumerate() .map(|(ix, node)| (*node.pubkey(), ix)) .collect(); + // Paranoid check to ensure the correct operation of the datastructure. + assert!( + index.contains_key(&self_pubkey), + "Own public key should be present in ClusterNodes.index" + ); let broadcast = TypeId::of::() == TypeId::of::(); let stakes = nodes.iter().map(|node| node.stake); let mut weighted_shuffle = WeightedShuffle::new("cluster-nodes", stakes); @@ -362,7 +367,7 @@ fn get_nodes( .collect(); sort_and_dedup_nodes(&mut nodes); if should_dedup_tvu_addrs { - dedup_tvu_addrs(&mut nodes); + dedup_tvu_addrs(&mut nodes, self_pubkey); }; nodes } @@ -395,7 +400,7 @@ fn cmp_nodes_stake(a: &Node, b: &Node) -> Ordering { // same TVU socket-addr, we only send shreds to one of them. // Additionally limits number of nodes at the same IP address to // MAX_NUM_NODES_PER_IP_ADDRESS. -fn dedup_tvu_addrs(nodes: &mut Vec) { +fn dedup_tvu_addrs(nodes: &mut Vec, keep_identity: Pubkey) { const TVU_PROTOCOLS: [Protocol; 2] = [Protocol::UDP, Protocol::QUIC]; let capacity = nodes.len().saturating_mul(2); // Tracks (Protocol, SocketAddr) tuples already observed. @@ -409,6 +414,10 @@ fn dedup_tvu_addrs(nodes: &mut Vec) { // deterministic shuffle. return node_stake > 0u64; }; + // Do not delete the provided keep_identity + if node.pubkey == keep_identity { + return true; + } // Dedup socket addresses and limit nodes at same IP address. for protocol in TVU_PROTOCOLS { let Some(addr) = node.tvu(protocol) else { @@ -451,6 +460,9 @@ fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng { // Each other node retransmits shreds to fanout many nodes in the next layer. // For example the node k in the 1st layer will retransmit to nodes: // fanout + k, 2*fanout + k, ..., fanout*fanout + k +// +// Panics if `pred` does not return true for at least +// one of the items in `nodes` fn get_retransmit_peers( fanout: usize, // Predicate fn which identifies this node in the shuffle. @@ -459,7 +471,10 @@ fn get_retransmit_peers( ) -> (/*this node's index:*/ usize, impl Iterator) { let mut nodes = nodes.into_iter(); // This node's index within shuffled nodes. - let index = nodes.by_ref().position(pred).unwrap(); + let index = nodes + .by_ref() + .position(pred) + .expect("Provided predicate should return true at least once"); // Node's index within its neighborhood. let offset = index.saturating_sub(1) % fanout; // First node in the neighborhood. @@ -719,6 +734,79 @@ mod tests { std::{fmt::Debug, hash::Hash}, test_case::test_case, }; + #[test] + fn test_dedup_tvu_addrs() { + let mut nodes = vec![ + Node { + node: NodeId::ContactInfo(ContactInfo { + pubkey: Pubkey::new_unique(), + wallclock: 0, + tvu_udp: Some("1.1.1.1:1".parse().unwrap()), + tvu_quic: None, + }), + stake: 1, + }, + Node { + node: NodeId::ContactInfo(ContactInfo { + pubkey: Pubkey::new_unique(), + wallclock: 0, + tvu_udp: Some("1.1.1.1:1".parse().unwrap()), + tvu_quic: None, + }), + stake: 2, + }, + Node { + node: NodeId::ContactInfo(ContactInfo { + pubkey: Pubkey::new_unique(), + wallclock: 0, + tvu_udp: Some("2.2.2.2:2".parse().unwrap()), + tvu_quic: None, + }), + stake: 3, + }, + ]; + dedup_tvu_addrs(&mut nodes, Pubkey::new_unique()); + assert_eq!(nodes.len(), 3); + match &nodes[1].node { + NodeId::ContactInfo(contact_info) => { + // dedup should have removed the TVU address from identity reusing the same socket + assert!(contact_info.tvu_udp.is_none()); + } + NodeId::Pubkey(_pubkey) => panic!(), + } + } + #[test] + fn test_dedup_tvu_addrs_preserves_unstaked_id() { + let pk = Pubkey::new_unique(); + let mut nodes = vec![ + Node { + node: NodeId::ContactInfo(ContactInfo { + pubkey: pk, + wallclock: 0, + tvu_udp: Some("1.1.1.1:1".parse().unwrap()), + tvu_quic: None, + }), + stake: 0, + }, + Node { + node: NodeId::ContactInfo(ContactInfo { + pubkey: Pubkey::new_unique(), + wallclock: 0, + tvu_udp: Some("1.1.1.1:1".parse().unwrap()), + tvu_quic: None, + }), + stake: 2, + }, + ]; + dedup_tvu_addrs(&mut nodes, pk); + assert_eq!(nodes.len(), 2); + match &nodes[0].node { + NodeId::ContactInfo(contact_info) => { + assert!(contact_info.tvu_udp.is_some()); + } + NodeId::Pubkey(_pubkey) => panic!(), + } + } #[test] fn test_cluster_nodes_retransmit() {