Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make sure own identity does not get wiped during TVU deduplication #5380

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ pub fn new_cluster_nodes<T: 'static>(
if broadcast {
weighted_shuffle.remove_index(index[&self_pubkey]);
}
// Paranoid check to ensure the correct operation of the weighted shuffle.
assert!(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe debug_assert ?

Copy link
Author

@alexpyattaev alexpyattaev Mar 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not get called very often (once every few seconds), and this ensures the thing crashes early (rather than in any of the unwraps in the code). Basically this is the check that would get hit if the invariant of "have my own key in the list" is ever violated in the future changes to dedup logic and/or if there are bugs in gossip.

index.contains_key(&self_pubkey),
"Own public key should be present in ClusterNodes.index"
);
ClusterNodes {
pubkey: self_pubkey,
nodes,
Expand Down Expand Up @@ -362,7 +367,7 @@ fn get_nodes(
.collect();
sort_and_dedup_nodes(&mut nodes);
if should_dedup_tvu_addrs {
dedup_tvu_addrs(&mut nodes);
dedup_tvu_addrs(&mut nodes, self_pubkey);
};
nodes
}
Expand Down Expand Up @@ -395,7 +400,7 @@ fn cmp_nodes_stake(a: &Node, b: &Node) -> Ordering {
// same TVU socket-addr, we only send shreds to one of them.
// Additionally limits number of nodes at the same IP address to
// MAX_NUM_NODES_PER_IP_ADDRESS.
fn dedup_tvu_addrs(nodes: &mut Vec<Node>) {
fn dedup_tvu_addrs(nodes: &mut Vec<Node>, keep_identity: Pubkey) {
const TVU_PROTOCOLS: [Protocol; 2] = [Protocol::UDP, Protocol::QUIC];
let capacity = nodes.len().saturating_mul(2);
// Tracks (Protocol, SocketAddr) tuples already observed.
Expand All @@ -409,6 +414,10 @@ fn dedup_tvu_addrs(nodes: &mut Vec<Node>) {
// deterministic shuffle.
return node_stake > 0u64;
};
// Do not delete the provided keep_identity
if node.pubkey == keep_identity {
return true;
}
// Dedup socket addresses and limit nodes at same IP address.
for protocol in TVU_PROTOCOLS {
let Some(addr) = node.tvu(protocol) else {
Expand Down Expand Up @@ -451,6 +460,9 @@ fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng {
// Each other node retransmits shreds to fanout many nodes in the next layer.
// For example the node k in the 1st layer will retransmit to nodes:
// fanout + k, 2*fanout + k, ..., fanout*fanout + k
//
// Panics if `pred` does not return true for at least
// one of the items in `nodes`
fn get_retransmit_peers<T>(
fanout: usize,
// Predicate fn which identifies this node in the shuffle.
Expand All @@ -459,7 +471,10 @@ fn get_retransmit_peers<T>(
) -> (/*this node's index:*/ usize, impl Iterator<Item = T>) {
let mut nodes = nodes.into_iter();
// This node's index within shuffled nodes.
let index = nodes.by_ref().position(pred).unwrap();
let index = nodes
.by_ref()
.position(pred)
.expect("Provided predicate should return true at least once");
// Node's index within its neighborhood.
let offset = index.saturating_sub(1) % fanout;
// First node in the neighborhood.
Expand Down
Loading