diff --git a/core/src/cluster_slots_service.rs b/core/src/cluster_slots_service.rs index 8fbc0534d10c27..650b53200ec8f9 100644 --- a/core/src/cluster_slots_service.rs +++ b/core/src/cluster_slots_service.rs @@ -2,7 +2,7 @@ pub mod cluster_slots; use { cluster_slots::ClusterSlots, crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, - solana_gossip::cluster_info::ClusterInfo, + solana_gossip::{cluster_info::ClusterInfo, epoch_specs::EpochSpecs}, solana_ledger::blockstore::Blockstore, solana_measure::measure::Measure, solana_runtime::bank_forks::BankForks, @@ -81,10 +81,8 @@ impl ClusterSlotsService { ) { let mut cluster_slots_service_timing = ClusterSlotsServiceTiming::default(); let mut last_stats = Instant::now(); - loop { - if exit.load(Ordering::Relaxed) { - break; - } + let mut epoch_specs = EpochSpecs::from(bank_forks.clone()); + while !exit.load(Ordering::Relaxed) { let slots = match cluster_slots_update_receiver.recv_timeout(Duration::from_millis(200)) { Ok(slots) => Some(slots), @@ -100,12 +98,25 @@ impl ClusterSlotsService { lowest_slot_elapsed.stop(); let mut process_cluster_slots_updates_elapsed = Measure::start("process_cluster_slots_updates_elapsed"); + if let Some(slots) = slots { - Self::process_cluster_slots_updates( - slots, - &cluster_slots_update_receiver, - &cluster_info, - ); + let node_id = cluster_info.id(); + let my_stake = epoch_specs + .current_epoch_staked_nodes() + .get(&node_id) + .cloned() + .unwrap_or_default(); + // staked node should push EpochSlots into CRDS to save gossip bandwidth + if my_stake > 0 { + Self::process_cluster_slots_updates( + slots, + &cluster_slots_update_receiver, + &cluster_info, + ); + } else { + // drain the channel dropping the updates + while cluster_slots_update_receiver.try_recv().is_ok() {} + } } let root_bank = bank_forks.read().unwrap().root_bank(); cluster_slots.update(&root_bank, &cluster_info); diff --git a/gossip/src/epoch_specs.rs b/gossip/src/epoch_specs.rs index 8c2a8fdf168b65..fff38c2c8d76c6 100644 --- a/gossip/src/epoch_specs.rs +++ b/gossip/src/epoch_specs.rs @@ -15,7 +15,7 @@ use { // Caches epoch specific information which stay fixed throughout the epoch. // Refreshes only if the root bank has moved to a new epoch. -pub(crate) struct EpochSpecs { +pub struct EpochSpecs { epoch: Epoch, // when fields were last updated. epoch_schedule: EpochSchedule, root: ReadOnlyAtomicSlot, // updated by bank-forks. @@ -26,7 +26,7 @@ pub(crate) struct EpochSpecs { impl EpochSpecs { #[inline] - pub(crate) fn current_epoch_staked_nodes(&mut self) -> &Arc> { + pub fn current_epoch_staked_nodes(&mut self) -> &Arc> { self.maybe_refresh(); &self.current_epoch_staked_nodes } diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index 06df390c59213f..2abc48d9edd2bd 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -18,7 +18,7 @@ pub mod duplicate_shred; pub mod duplicate_shred_handler; pub mod duplicate_shred_listener; pub mod epoch_slots; -mod epoch_specs; +pub mod epoch_specs; pub mod gossip_error; pub mod gossip_service; #[macro_use]