diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index 462085623..d9aacbc24 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -964,7 +964,6 @@ pub struct ExtendedGossipMessageStoreState { next_id: u64, output_ports: HashMap, last_cursor: Cursor, - lagged_messages: HashMap, messages_to_be_saved: HashMap, // TODO: we need to remove the notifiers that are there for a long time to avoid memory leak // when the node is running for a long time. @@ -979,7 +978,6 @@ impl ExtendedGossipMessageStoreState { next_id: Default::default(), output_ports: Default::default(), last_cursor: Default::default(), - lagged_messages: Default::default(), messages_to_be_saved: Default::default(), message_saving_notifier: Default::default(), } @@ -1013,6 +1011,12 @@ impl ExtendedGossipMessageStoreState { } } + fn update_last_cursor(&mut self, cursor: Cursor) { + if cursor > self.last_cursor { + self.last_cursor = cursor; + } + } + fn save_broadcast_message(&mut self, message: BroadcastMessageWithTimestamp) { match message.clone() { BroadcastMessageWithTimestamp::ChannelAnnouncement(timestamp, channel_announcement) => { @@ -1026,9 +1030,29 @@ impl ExtendedGossipMessageStoreState { self.store.save_node_announcement(node_announcement) } } + self.update_last_cursor(message.cursor()); self.notify_message_saving_result(&message.cursor(), Ok(message)); } + // Saving all the messages whose transitive dependencies are already available. + // We will also change the relevant state (e.g. update the latest cursor). + // The returned list may be sent to the subscribers. + fn prune_complete_messages_to_be_saved(&mut self) -> Vec { + let complete_messages = self + .messages_to_be_saved + .values() + .filter(|m| self.has_transitive_dependencies(m)) + .cloned() + .collect::>(); + self.messages_to_be_saved + .retain(|_, v| !complete_messages.contains(v)); + for message in &complete_messages { + self.save_broadcast_message(message.clone()); + } + + complete_messages + } + fn has_node_announcement(&self, node_id: &Pubkey) -> bool { self.store .get_latest_node_announcement_timestamp(node_id) @@ -1036,9 +1060,6 @@ impl ExtendedGossipMessageStoreState { || self .messages_to_be_saved .contains_key(&BroadcastMessageID::NodeAnnouncement(node_id.clone())) - || self - .lagged_messages - .contains_key(&BroadcastMessageID::NodeAnnouncement(node_id.clone())) } fn get_channel_annnouncement(&self, outpoint: &OutPoint) -> Option { @@ -1054,15 +1075,6 @@ impl ExtendedGossipMessageStoreState { } _ => None, })) - .or(self - .lagged_messages - .get(&BroadcastMessageID::ChannelAnnouncement(outpoint.clone())) - .and_then(|message| match message { - BroadcastMessageWithTimestamp::ChannelAnnouncement(_, channel_announcement) => { - Some(channel_announcement.clone()) - } - _ => None, - })) } fn has_transitive_dependencies(&self, message: &BroadcastMessageWithTimestamp) -> bool { @@ -1228,7 +1240,7 @@ impl Actor for ExtendedGossipMess } ExtendedGossipMessageStoreMessage::SaveMessage(message, wait_for_saving, reply) => { - let (message, should_save) = match partially_verify_broadcast_message( + let (message, _) = match partially_verify_broadcast_message( message.clone(), &state.store, &state.chain_actor, @@ -1260,20 +1272,6 @@ impl Actor for ExtendedGossipMess return Ok(()); } - trace!("ExtendedGossipMessageActor saving message: {:?}", message); - let message_cursor = message.cursor(); - let message_id = message.message_id(); - // Check if the message is lagged. If it is, then save it also to lagged_messages. - if message_cursor < state.last_cursor { - trace!( - "ExtendedGossipMessageActor saving lagged message: {:?}", - message - ); - state - .lagged_messages - .insert(message_id.clone(), message.clone()); - } - if wait_for_saving { let notifier = state.create_message_saving_notifier(&message); let _ = reply.send(Ok(Some(notifier))); @@ -1281,73 +1279,50 @@ impl Actor for ExtendedGossipMess let _ = reply.send(Ok(None)); } - if should_save { - debug!( - "ExtendedGossipMessageActor saving message immediately: {:?}", - message - ); - state.save_broadcast_message(message.clone()); - let cursor = message.cursor(); - if let Some(notifier) = state.message_saving_notifier.remove(&cursor) { - let _ = notifier.send(Ok(message)); + trace!("ExtendedGossipMessageActor saving message: {:?}", message); + let message_cursor = message.cursor(); + let message_id = message.message_id(); + match state.messages_to_be_saved.get(&message_id) { + Some(saved_message) if saved_message.timestamp() > message.timestamp() => { + let error = GossipMessageProcessingError::NewerMessageSaved(format!( + "A newer message is already saved: {:?}", + saved_message + )); + state.notify_message_saving_result(&message_cursor, Err(error)); + return Ok(()); + } + _ => { + state + .messages_to_be_saved + .insert(message_id, message.clone()); } - } else { - trace!( - "ExtendedGossipMessageActor saving message to be saved later: {:?}", - message - ); - state - .messages_to_be_saved - .insert(message_id.clone(), message.clone()); } } ExtendedGossipMessageStoreMessage::Tick => { - debug!( - "ExtendedGossipMessageActor processing tick: last_cursor = {:?} #subscriptions = {}, #lagged_messages = {}, #messages_to_be_saved = {}", + trace!( + "Gossip store maintenance ticked: last_cursor = {:?} #subscriptions = {}, #messages_to_be_saved = {}", state.last_cursor, state.output_ports.len(), - state.lagged_messages.len(), - state.messages_to_be_saved.len() + state.messages_to_be_saved.len(), ); trace!( - "ExtendedGossipMessageActor processing tick: state.messages_to_be_saved {:?}", + "Gossip store maintenance ticked: state.messages_to_be_saved {:?}", state.messages_to_be_saved ); - // Messages that have their dependencies saved are complete messages. - // We need to save them to the store. - let complete_messages_to_be_saved = state - .messages_to_be_saved - .values() - .filter(|m| state.has_transitive_dependencies(m)) - .cloned() - .collect::>(); - state - .messages_to_be_saved - .retain(|_, v| !complete_messages_to_be_saved.contains(v)); - - // We also need to check if there are any lagged messages that are now complete. - let lagged_complete_messages = state - .lagged_messages - .values() - .filter(|m| state.has_transitive_dependencies(m)) - .cloned() - .collect::>(); - state - .lagged_messages - .retain(|_, v| !lagged_complete_messages.contains(v)); + let complete_messages = state.prune_complete_messages_to_be_saved(); // We need to send the lagged complete messages to the subscribers. After doing this, // we may remove the messages from the lagged_messages. for subscription in state.output_ports.values() { let messages_to_send = match subscription.filter { - Some(ref filter) => lagged_complete_messages + Some(ref filter) => complete_messages .iter() .filter(|m| &m.cursor() > filter) .cloned() .collect::>(), - None => lagged_complete_messages.clone(), + None => complete_messages.clone(), }; debug!( "ExtendedGossipMessageActor sending lagged complete messages to subscriber: number of messages = {}", @@ -1362,76 +1337,6 @@ impl Actor for ExtendedGossipMess .send(GossipMessageUpdates::new(chunk.to_vec())); } } - - debug!( - "ExtendedGossipMessageActor saving messages: number of lagged complete messages = {}, number of complete messages to be saved = {}", - lagged_complete_messages.len(), - complete_messages_to_be_saved.len() - ); - - // Saving all the messages that are complete and have also their dependencies saved. - for message in lagged_complete_messages - .into_iter() - .chain(complete_messages_to_be_saved) - { - trace!( - "ExtendedGossipMessageActor saving new complete message: {:?}", - message - ); - // TODO: we may need to order all the messages by their dependencies, because - // the saving of broadcast messages is not an atomic operation. The node may fail any time - // while saving the messages. If the node failed, some messages in the store may not have their - // dependencies saved yet. - state.save_broadcast_message(message.clone()); - } - - // We now have some messages later than last_cursor saved to the store, we can take them - // out and send them to the subscribers. Here we need to take messages directly from the - // store because some messages with complete dependencies are previously saved directly - // to the store. - - // This is the cursor that all the subscribers will be updated to. - // We read the latest cursor from the store and filter all the messages to have cursor <= last_cursor. - // This is because while processing the messages, some new messages may be saved to the store. - // Either updating our last_cursor to the initial last_cursor or the final last_cursor is problematic. - let last_cursor_now = state - .store - .get_latest_broadcast_message_cursor() - .unwrap_or(state.last_cursor.clone()); - for subscription in state.output_ports.values() { - let filter = subscription.filter.clone().unwrap_or_default(); - // We still need to check if the messages returned are newer than the filter, - // because a subscriber may set filter so large that our last_cursor is still smaller - // than this filter cursor. - let mut starting_cursor_in_the_loop = if state.last_cursor > filter { - state.last_cursor.clone() - } else { - filter - }; - loop { - let messages = state - .store - .get_broadcast_messages( - &starting_cursor_in_the_loop, - Some(MAX_NUM_OF_BROADCAST_MESSAGES), - ) - .into_iter() - .filter(|m| m.cursor() <= last_cursor_now) - .collect::>(); - match messages.last() { - Some(m) => { - starting_cursor_in_the_loop = m.cursor(); - subscription - .output_port - .send(GossipMessageUpdates::new(messages)); - } - None => { - break; - } - } - } - } - state.last_cursor = last_cursor_now; } } Ok(()) @@ -2308,19 +2213,15 @@ where } GossipActorMessage::TickNetworkMaintenance => { - debug!( - "Network maintenance ticked, current state: num of peers: {}, num of finished syncing peers: {}, num of active syncing peers: {}, num of passive syncing peers: {}, num of pending queries: {}", + trace!( + "Gossip network maintenance ticked, current state: num of peers: {}, num of finished syncing peers: {}, num of active syncing peers: {}, num of passive syncing peers: {}, num of pending queries: {}, peer states: {:?}", state.peer_states.len(), state.num_of_finished_syncing_peers(), state.num_of_active_syncing_peers(), state.num_of_passive_syncing_peers(), - state.pending_queries.len() - ); - debug!( - "Network maintenance ticked, current state: peer states: {:?}", - state.peer_states + state.pending_queries.len(), + &state.peer_states ); - for peer in state.new_peers_to_start_active_syncing() { debug!("Starting new active syncer for peer {:?}", &peer); state.start_new_active_syncer(&peer).await; diff --git a/src/fiber/graph.rs b/src/fiber/graph.rs index 9be711c38..50e59b464 100644 --- a/src/fiber/graph.rs +++ b/src/fiber/graph.rs @@ -316,6 +316,12 @@ where // Load all the broadcast messages starting from latest_cursor from the store. // Process them and set nodes and channels accordingly. pub(crate) fn load_from_store(&mut self) { + dbg!( + "Loading from store", + &self.nodes, + &self.channels, + &self.latest_cursor + ); loop { let messages = self.store.get_broadcast_messages(&self.latest_cursor, None); if messages.is_empty() { @@ -323,6 +329,12 @@ where } self.update_for_messages(messages); } + dbg!( + "Loading from store", + &self.nodes, + &self.channels, + &self.latest_cursor + ); } // Completely reload from store. Because messages with larger timestamp diff --git a/src/fiber/network.rs b/src/fiber/network.rs index 8ccce6f3d..483cd8ea4 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -908,6 +908,7 @@ where )); } NetworkActorEvent::GossipMessageUpdates(gossip_message_updates) => { + debug!("Updating network graph for gossip message updates"); let mut graph = self.network_graph.write().await; graph.update_for_messages(gossip_message_updates.messages); } diff --git a/src/fiber/tests/channel.rs b/src/fiber/tests/channel.rs index 1bb6ef64f..d871d7fbb 100644 --- a/src/fiber/tests/channel.rs +++ b/src/fiber/tests/channel.rs @@ -1821,6 +1821,7 @@ async fn test_network_send_payment_dry_run_will_not_create_payment_session() { )) }; let res = call!(node_a.network_actor, message).expect("node_a alive"); + dbg!(&res); assert!(res.is_ok()); // make sure we can send the same payment after dry run query