Skip to content

Commit

Permalink
Remove lagged messages in gossip message store actor
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Dec 11, 2024
1 parent 0154c6c commit 37624f7
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 153 deletions.
207 changes: 54 additions & 153 deletions src/fiber/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,6 @@ pub struct ExtendedGossipMessageStoreState<S> {
next_id: u64,
output_ports: HashMap<u64, BroadcastMessageOutput>,
last_cursor: Cursor,
lagged_messages: HashMap<BroadcastMessageID, BroadcastMessageWithTimestamp>,
messages_to_be_saved: HashMap<BroadcastMessageID, BroadcastMessageWithTimestamp>,
// TODO: we need to remove the notifiers that are there for a long time to avoid memory leak
// when the node is running for a long time.
Expand All @@ -979,7 +978,6 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
next_id: Default::default(),
output_ports: Default::default(),
last_cursor: Default::default(),
lagged_messages: Default::default(),
messages_to_be_saved: Default::default(),
message_saving_notifier: Default::default(),
}
Expand Down Expand Up @@ -1013,6 +1011,12 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
}
}

fn update_last_cursor(&mut self, cursor: Cursor) {
if cursor > self.last_cursor {
self.last_cursor = cursor;
}
}

fn save_broadcast_message(&mut self, message: BroadcastMessageWithTimestamp) {
match message.clone() {
BroadcastMessageWithTimestamp::ChannelAnnouncement(timestamp, channel_announcement) => {
Expand All @@ -1026,19 +1030,36 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
self.store.save_node_announcement(node_announcement)
}
}
self.update_last_cursor(message.cursor());
self.notify_message_saving_result(&message.cursor(), Ok(message));
}

// Saving all the messages whose transitive dependencies are already available.
// We will also change the relevant state (e.g. update the latest cursor).
// The returned list may be sent to the subscribers.
fn prune_complete_messages_to_be_saved(&mut self) -> Vec<BroadcastMessageWithTimestamp> {
let complete_messages = self
.messages_to_be_saved
.values()
.filter(|m| self.has_transitive_dependencies(m))
.cloned()
.collect::<Vec<_>>();
self.messages_to_be_saved
.retain(|_, v| !complete_messages.contains(v));
for message in &complete_messages {
self.save_broadcast_message(message.clone());
}

complete_messages
}

fn has_node_announcement(&self, node_id: &Pubkey) -> bool {
self.store
.get_latest_node_announcement_timestamp(node_id)
.is_some()
|| self
.messages_to_be_saved
.contains_key(&BroadcastMessageID::NodeAnnouncement(node_id.clone()))
|| self
.lagged_messages
.contains_key(&BroadcastMessageID::NodeAnnouncement(node_id.clone()))
}

fn get_channel_annnouncement(&self, outpoint: &OutPoint) -> Option<ChannelAnnouncement> {
Expand All @@ -1054,15 +1075,6 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
}
_ => None,
}))
.or(self
.lagged_messages
.get(&BroadcastMessageID::ChannelAnnouncement(outpoint.clone()))
.and_then(|message| match message {
BroadcastMessageWithTimestamp::ChannelAnnouncement(_, channel_announcement) => {
Some(channel_announcement.clone())
}
_ => None,
}))
}

fn has_transitive_dependencies(&self, message: &BroadcastMessageWithTimestamp) -> bool {
Expand Down Expand Up @@ -1228,7 +1240,7 @@ impl<S: GossipMessageStore + Send + Sync + 'static> Actor for ExtendedGossipMess
}

ExtendedGossipMessageStoreMessage::SaveMessage(message, wait_for_saving, reply) => {
let (message, should_save) = match partially_verify_broadcast_message(
let (message, _) = match partially_verify_broadcast_message(
message.clone(),
&state.store,
&state.chain_actor,
Expand Down Expand Up @@ -1260,94 +1272,57 @@ impl<S: GossipMessageStore + Send + Sync + 'static> Actor for ExtendedGossipMess
return Ok(());
}

trace!("ExtendedGossipMessageActor saving message: {:?}", message);
let message_cursor = message.cursor();
let message_id = message.message_id();
// Check if the message is lagged. If it is, then save it also to lagged_messages.
if message_cursor < state.last_cursor {
trace!(
"ExtendedGossipMessageActor saving lagged message: {:?}",
message
);
state
.lagged_messages
.insert(message_id.clone(), message.clone());
}

if wait_for_saving {
let notifier = state.create_message_saving_notifier(&message);
let _ = reply.send(Ok(Some(notifier)));
} else {
let _ = reply.send(Ok(None));
}

if should_save {
debug!(
"ExtendedGossipMessageActor saving message immediately: {:?}",
message
);
state.save_broadcast_message(message.clone());
let cursor = message.cursor();
if let Some(notifier) = state.message_saving_notifier.remove(&cursor) {
let _ = notifier.send(Ok(message));
trace!("ExtendedGossipMessageActor saving message: {:?}", message);
let message_cursor = message.cursor();
let message_id = message.message_id();
match state.messages_to_be_saved.get(&message_id) {
Some(saved_message) if saved_message.timestamp() > message.timestamp() => {
let error = GossipMessageProcessingError::NewerMessageSaved(format!(
"A newer message is already saved: {:?}",
saved_message
));
state.notify_message_saving_result(&message_cursor, Err(error));
return Ok(());
}
_ => {
state
.messages_to_be_saved
.insert(message_id, message.clone());
}
} else {
trace!(
"ExtendedGossipMessageActor saving message to be saved later: {:?}",
message
);
state
.messages_to_be_saved
.insert(message_id.clone(), message.clone());
}
}

ExtendedGossipMessageStoreMessage::Tick => {
debug!(
"ExtendedGossipMessageActor processing tick: last_cursor = {:?} #subscriptions = {}, #lagged_messages = {}, #messages_to_be_saved = {}",
trace!(
"Gossip store maintenance ticked: last_cursor = {:?} #subscriptions = {}, #messages_to_be_saved = {}",
state.last_cursor,
state.output_ports.len(),
state.lagged_messages.len(),
state.messages_to_be_saved.len()
state.messages_to_be_saved.len(),
);
trace!(
"ExtendedGossipMessageActor processing tick: state.messages_to_be_saved {:?}",
"Gossip store maintenance ticked: state.messages_to_be_saved {:?}",
state.messages_to_be_saved
);

// Messages that have their dependencies saved are complete messages.
// We need to save them to the store.
let complete_messages_to_be_saved = state
.messages_to_be_saved
.values()
.filter(|m| state.has_transitive_dependencies(m))
.cloned()
.collect::<Vec<_>>();
state
.messages_to_be_saved
.retain(|_, v| !complete_messages_to_be_saved.contains(v));

// We also need to check if there are any lagged messages that are now complete.
let lagged_complete_messages = state
.lagged_messages
.values()
.filter(|m| state.has_transitive_dependencies(m))
.cloned()
.collect::<Vec<_>>();
state
.lagged_messages
.retain(|_, v| !lagged_complete_messages.contains(v));
let complete_messages = state.prune_complete_messages_to_be_saved();

// We need to send the lagged complete messages to the subscribers. After doing this,
// we may remove the messages from the lagged_messages.
for subscription in state.output_ports.values() {
let messages_to_send = match subscription.filter {
Some(ref filter) => lagged_complete_messages
Some(ref filter) => complete_messages
.iter()
.filter(|m| &m.cursor() > filter)
.cloned()
.collect::<Vec<_>>(),
None => lagged_complete_messages.clone(),
None => complete_messages.clone(),
};
debug!(
"ExtendedGossipMessageActor sending lagged complete messages to subscriber: number of messages = {}",
Expand All @@ -1362,76 +1337,6 @@ impl<S: GossipMessageStore + Send + Sync + 'static> Actor for ExtendedGossipMess
.send(GossipMessageUpdates::new(chunk.to_vec()));
}
}

debug!(
"ExtendedGossipMessageActor saving messages: number of lagged complete messages = {}, number of complete messages to be saved = {}",
lagged_complete_messages.len(),
complete_messages_to_be_saved.len()
);

// Saving all the messages that are complete and have also their dependencies saved.
for message in lagged_complete_messages
.into_iter()
.chain(complete_messages_to_be_saved)
{
trace!(
"ExtendedGossipMessageActor saving new complete message: {:?}",
message
);
// TODO: we may need to order all the messages by their dependencies, because
// the saving of broadcast messages is not an atomic operation. The node may fail any time
// while saving the messages. If the node failed, some messages in the store may not have their
// dependencies saved yet.
state.save_broadcast_message(message.clone());
}

// We now have some messages later than last_cursor saved to the store, we can take them
// out and send them to the subscribers. Here we need to take messages directly from the
// store because some messages with complete dependencies are previously saved directly
// to the store.

// This is the cursor that all the subscribers will be updated to.
// We read the latest cursor from the store and filter all the messages to have cursor <= last_cursor.
// This is because while processing the messages, some new messages may be saved to the store.
// Either updating our last_cursor to the initial last_cursor or the final last_cursor is problematic.
let last_cursor_now = state
.store
.get_latest_broadcast_message_cursor()
.unwrap_or(state.last_cursor.clone());
for subscription in state.output_ports.values() {
let filter = subscription.filter.clone().unwrap_or_default();
// We still need to check if the messages returned are newer than the filter,
// because a subscriber may set filter so large that our last_cursor is still smaller
// than this filter cursor.
let mut starting_cursor_in_the_loop = if state.last_cursor > filter {
state.last_cursor.clone()
} else {
filter
};
loop {
let messages = state
.store
.get_broadcast_messages(
&starting_cursor_in_the_loop,
Some(MAX_NUM_OF_BROADCAST_MESSAGES),
)
.into_iter()
.filter(|m| m.cursor() <= last_cursor_now)
.collect::<Vec<_>>();
match messages.last() {
Some(m) => {
starting_cursor_in_the_loop = m.cursor();
subscription
.output_port
.send(GossipMessageUpdates::new(messages));
}
None => {
break;
}
}
}
}
state.last_cursor = last_cursor_now;
}
}
Ok(())
Expand Down Expand Up @@ -2308,19 +2213,15 @@ where
}

GossipActorMessage::TickNetworkMaintenance => {
debug!(
"Network maintenance ticked, current state: num of peers: {}, num of finished syncing peers: {}, num of active syncing peers: {}, num of passive syncing peers: {}, num of pending queries: {}",
trace!(
"Gossip network maintenance ticked, current state: num of peers: {}, num of finished syncing peers: {}, num of active syncing peers: {}, num of passive syncing peers: {}, num of pending queries: {}, peer states: {:?}",
state.peer_states.len(),
state.num_of_finished_syncing_peers(),
state.num_of_active_syncing_peers(),
state.num_of_passive_syncing_peers(),
state.pending_queries.len()
);
debug!(
"Network maintenance ticked, current state: peer states: {:?}",
state.peer_states
state.pending_queries.len(),
&state.peer_states
);

for peer in state.new_peers_to_start_active_syncing() {
debug!("Starting new active syncer for peer {:?}", &peer);
state.start_new_active_syncer(&peer).await;
Expand Down
12 changes: 12 additions & 0 deletions src/fiber/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,25 @@ where
// Load all the broadcast messages starting from latest_cursor from the store.
// Process them and set nodes and channels accordingly.
pub(crate) fn load_from_store(&mut self) {
dbg!(
"Loading from store",
&self.nodes,
&self.channels,
&self.latest_cursor
);
loop {
let messages = self.store.get_broadcast_messages(&self.latest_cursor, None);
if messages.is_empty() {
break;
}
self.update_for_messages(messages);
}
dbg!(
"Loading from store",
&self.nodes,
&self.channels,
&self.latest_cursor
);
}

// Completely reload from store. Because messages with larger timestamp
Expand Down
1 change: 1 addition & 0 deletions src/fiber/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,7 @@ where
));
}
NetworkActorEvent::GossipMessageUpdates(gossip_message_updates) => {
debug!("Updating network graph for gossip message updates");
let mut graph = self.network_graph.write().await;
graph.update_for_messages(gossip_message_updates.messages);
}
Expand Down
1 change: 1 addition & 0 deletions src/fiber/tests/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1821,6 +1821,7 @@ async fn test_network_send_payment_dry_run_will_not_create_payment_session() {
))
};
let res = call!(node_a.network_actor, message).expect("node_a alive");
dbg!(&res);
assert!(res.is_ok());

// make sure we can send the same payment after dry run query
Expand Down

0 comments on commit 37624f7

Please sign in to comment.