Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.2: stop unstaked nodes from pushing EpochSlots into the cluster (backport of #5141) #5286

Merged
merged 1 commit into from
Mar 21, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions core/src/cluster_slots_service.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ pub mod cluster_slots;
use {
cluster_slots::ClusterSlots,
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
solana_gossip::cluster_info::ClusterInfo,
solana_gossip::{cluster_info::ClusterInfo, epoch_specs::EpochSpecs},
solana_ledger::blockstore::Blockstore,
solana_measure::measure::Measure,
solana_runtime::bank_forks::BankForks,
@@ -81,10 +81,8 @@ impl ClusterSlotsService {
) {
let mut cluster_slots_service_timing = ClusterSlotsServiceTiming::default();
let mut last_stats = Instant::now();
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let mut epoch_specs = EpochSpecs::from(bank_forks.clone());
while !exit.load(Ordering::Relaxed) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the future, please avoid superfluous refactors like this for changes that are intended for backport. they can go into master as a followup

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing!

let slots = match cluster_slots_update_receiver.recv_timeout(Duration::from_millis(200))
{
Ok(slots) => Some(slots),
@@ -100,12 +98,25 @@ impl ClusterSlotsService {
lowest_slot_elapsed.stop();
let mut process_cluster_slots_updates_elapsed =
Measure::start("process_cluster_slots_updates_elapsed");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid superfluous cosmetics

if let Some(slots) = slots {
Self::process_cluster_slots_updates(
slots,
&cluster_slots_update_receiver,
&cluster_info,
);
let node_id = cluster_info.id();
let my_stake = epoch_specs
.current_epoch_staked_nodes()
.get(&node_id)
.cloned()
.unwrap_or_default();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid smearing over potential unexpected error scenarios with dubious generics (Default is a bug). this would have been more obvious as

let maybe_my_stake = epoch_specs
    .current_epoch_staked_nodes()
    .get(&node_id)
    .cloned();
match maybe_my_stake {
    Some(my_stake) if my_stake > 0 => {
        ...process
    }
    () => {
        ...drain
    }
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably, unwrap_or(0) would work even better here. Agree that default can do surprising things. Should we add recommendations to avoid using default() where reasonable?

// staked node should push EpochSlots into CRDS to save gossip bandwidth
if my_stake > 0 {
Self::process_cluster_slots_updates(
slots,
&cluster_slots_update_receiver,
&cluster_info,
);
} else {
// drain the channel dropping the updates
while cluster_slots_update_receiver.try_recv().is_ok() {}
}
}
let root_bank = bank_forks.read().unwrap().root_bank();
cluster_slots.update(&root_bank, &cluster_info);
4 changes: 2 additions & 2 deletions gossip/src/epoch_specs.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ use {

// Caches epoch specific information which stay fixed throughout the epoch.
// Refreshes only if the root bank has moved to a new epoch.
pub(crate) struct EpochSpecs {
pub struct EpochSpecs {
epoch: Epoch, // when fields were last updated.
epoch_schedule: EpochSchedule,
root: ReadOnlyAtomicSlot, // updated by bank-forks.
@@ -28,7 +28,7 @@ pub(crate) struct EpochSpecs {

impl EpochSpecs {
#[inline]
pub(crate) fn current_epoch_staked_nodes(&mut self) -> &Arc<HashMap<Pubkey, /*stake:*/ u64>> {
pub fn current_epoch_staked_nodes(&mut self) -> &Arc<HashMap<Pubkey, /*stake:*/ u64>> {
self.maybe_refresh();
&self.current_epoch_staked_nodes
}
2 changes: 1 addition & 1 deletion gossip/src/lib.rs
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ pub mod duplicate_shred;
pub mod duplicate_shred_handler;
pub mod duplicate_shred_listener;
pub mod epoch_slots;
mod epoch_specs;
pub mod epoch_specs;
pub mod gossip_error;
pub mod gossip_service;
#[macro_use]