diff --git a/src/store/schema.rs b/src/store/schema.rs index c886f1e2..9caab7eb 100644 --- a/src/store/schema.rs +++ b/src/store/schema.rs @@ -10,9 +10,6 @@ /// | 64 | PeerId | Hash256 | ChannelState | /// | 96 | Cursor | BroadcastMessage | /// | 97 | BroadcastMessageID | u64 | -/// | 128 | NodeId | NodeInfo | -/// | 129 | Timestamp | NodeId | -/// | 160 | PeerId | MultiAddr | /// | 192 | Hash256 | PaymentSession | /// | 193 | OutPoint | Direction | TimedResult | /// | 224 | Hash256 | ChannelData | diff --git a/src/store/store.rs b/src/store/store.rs index 73a1be64..e7090b7e 100644 --- a/src/store/store.rs +++ b/src/store/store.rs @@ -471,244 +471,6 @@ impl NetworkGraphStateStore for Store { } } -// impl GossipMessageStore for Store { -// fn get_broadcast_messages_iter( -// &self, -// after_cursor: &Cursor, -// ) -> impl IntoIterator { -// let prefix = [ -// &[BROADCAST_MESSAGE_PREFIX], -// after_cursor.to_bytes().as_slice(), -// ] -// .concat(); - -// self.db -// .prefix_iterator(prefix.as_ref()) -// .take_while(move |(key, _)| key.starts_with(&[BROADCAST_MESSAGE_PREFIX])) -// // after_cursor means we should not include key/value with key == cursor -// .skip_while(move |(key, _)| key.as_ref() == &prefix) -// .map(|(key, value)| { -// debug_assert_eq!(key.len(), 1 + CURSOR_SIZE); -// let mut timestamp_bytes = [0u8; 8]; -// timestamp_bytes.copy_from_slice(&key[1..9]); -// let timestamp = u64::from_le_bytes(timestamp_bytes); -// let message: BroadcastMessage = serde_json::from_slice(value.as_ref()) -// .expect("deserialize BroadcastMessage should be OK"); -// (message, timestamp).into() -// }) -// } - -// fn save_channel_announcement(&self, timestamp: u64, channel_announcement: ChannelAnnouncement) { -// if let Some(_old_timestamp) = -// self.get_latest_channel_announcement_timestamp(&channel_announcement.channel_outpoint) -// { -// // Channel announcement is immutable. If we have already saved one channel announcement, -// // we can early return now. -// return; -// } - -// let mut batch = self.batch(); -// let cursor = Cursor::new( -// timestamp, -// BroadcastMessageID::ChannelAnnouncement(channel_announcement.channel_outpoint.clone()), -// ); - -// // Update the timestamps of the channel -// let timestamp_key = [ -// &[BROADCAST_MESSAGE_TIMESTAMP_PREFIX], -// cursor.message_id.to_bytes().as_slice(), -// ] -// .concat(); -// let mut timestamps = self -// .get(×tamp_key) -// .map(|v| v.try_into().expect("Invalid timestamp value length")) -// .unwrap_or([0u8; 24]); -// timestamps[..8].copy_from_slice(×tamp.to_le_bytes()); -// batch.put(timestamp_key, timestamps); - -// // Save the channel announcement -// let message = BroadcastMessage::ChannelAnnouncement(channel_announcement); -// batch.put( -// [&[BROADCAST_MESSAGE_PREFIX], cursor.to_bytes().as_slice()].concat(), -// serde_json::to_vec(&message).expect("serialize BroadcastMessage should be OK"), -// ); -// batch.commit(); -// } - -// fn save_channel_update(&self, channel_update: ChannelUpdate) { -// let mut batch = self.batch(); -// let message_id = BroadcastMessageID::ChannelUpdate(channel_update.channel_outpoint.clone()); - -// // Remove old channel update if exists -// if let Some(old_timestamp) = self.get_latest_channel_update_timestamp( -// &channel_update.channel_outpoint, -// channel_update.is_update_of_node_1(), -// ) { -// if channel_update.timestamp <= old_timestamp { -// // This is an outdated channel update, early return -// return; -// } -// // Delete old channel update -// batch.delete( -// [ -// &[BROADCAST_MESSAGE_PREFIX], -// Cursor::new(old_timestamp, message_id.clone()) -// .to_bytes() -// .as_slice(), -// ] -// .concat(), -// ); -// } - -// // Update the timestamps of the channel -// let timestamp_key = [ -// &[BROADCAST_MESSAGE_TIMESTAMP_PREFIX], -// message_id.to_bytes().as_slice(), -// ] -// .concat(); -// let mut timestamps = self -// .get(×tamp_key) -// .map(|v| v.try_into().expect("Invalid timestamp value length")) -// .unwrap_or([0u8; 24]); -// let start_index = if channel_update.is_update_of_node_1() { -// 8 -// } else { -// 16 -// }; -// timestamps[start_index..start_index + 8] -// .copy_from_slice(&channel_update.timestamp.to_le_bytes()); -// batch.put(timestamp_key, timestamps); - -// // Save the channel update -// let cursor = Cursor::new(channel_update.timestamp, message_id); -// let message = BroadcastMessage::ChannelUpdate(channel_update.clone()); -// batch.put( -// [&[BROADCAST_MESSAGE_PREFIX], cursor.to_bytes().as_slice()].concat(), -// serde_json::to_vec(&message).expect("serialize BroadcastMessage should be OK"), -// ); -// batch.commit(); -// } - -// fn save_node_announcement(&self, node_announcement: NodeAnnouncement) { -// let mut batch = self.batch(); -// let message_id = BroadcastMessageID::NodeAnnouncement(node_announcement.node_id.clone()); - -// if let Some(old_timestamp) = -// self.get_latest_node_announcement_timestamp(&node_announcement.node_id) -// { -// if node_announcement.timestamp <= old_timestamp { -// // This is an outdated node announcement. Early return. -// return; -// } - -// // Delete old node announcement -// batch.delete( -// [ -// &[BROADCAST_MESSAGE_PREFIX], -// Cursor::new(old_timestamp, message_id.clone()) -// .to_bytes() -// .as_slice(), -// ] -// .concat(), -// ); -// } -// batch.put( -// [ -// &[BROADCAST_MESSAGE_TIMESTAMP_PREFIX], -// message_id.to_bytes().as_slice(), -// ] -// .concat(), -// node_announcement.timestamp.to_le_bytes(), -// ); - -// // Save the channel update -// let cursor = Cursor::new(node_announcement.timestamp, message_id); -// let message = BroadcastMessage::NodeAnnouncement(node_announcement); -// batch.put( -// [&[BROADCAST_MESSAGE_PREFIX], cursor.to_bytes().as_slice()].concat(), -// serde_json::to_vec(&message).expect("serialize BroadcastMessage should be OK"), -// ); -// batch.commit(); -// } - -// fn get_broadcast_message_with_cursor( -// &self, -// cursor: &Cursor, -// ) -> Option { -// let timestamp = cursor.timestamp; -// let key = [&[BROADCAST_MESSAGE_PREFIX], cursor.to_bytes().as_slice()].concat(); -// self.get(key).map(|v| { -// BroadcastMessageWithTimestamp::from(( -// serde_json::from_slice(v.as_ref()).expect("deserialize Hash256 should be OK"), -// timestamp, -// )) -// }) -// } - -// fn get_latest_broadcast_message_cursor(&self) -> Option { -// let prefix = vec![BROADCAST_MESSAGE_PREFIX]; -// let mode = IteratorMode::End; -// self.db -// .iterator(mode) -// .take_while(|(key, _)| key.starts_with(&prefix)) -// .last() -// .map(|(key, _)| { -// let last_key = key.to_vec(); -// Cursor::from_bytes(&last_key[1..]).expect("deserialize Cursor should be OK") -// }) -// } - -// fn get_latest_channel_announcement_timestamp(&self, outpoint: &OutPoint) -> Option { -// let message_id = BroadcastMessageID::ChannelAnnouncement(outpoint.clone()); -// let timestamp_key = [ -// &[BROADCAST_MESSAGE_TIMESTAMP_PREFIX], -// message_id.to_bytes().as_slice(), -// ] -// .concat(); -// self.get(×tamp_key).map(|v| { -// let v: [u8; 24] = v.try_into().expect("Invalid timestamp value length"); -// u64::from_le_bytes( -// v[..8] -// .try_into() -// .expect("timestamp length valid, shown above"), -// ) -// }) -// } - -// fn get_latest_channel_update_timestamp( -// &self, -// outpoint: &OutPoint, -// is_node1: bool, -// ) -> Option { -// let message_id = BroadcastMessageID::ChannelUpdate(outpoint.clone()); -// let timestamp_key = [ -// &[BROADCAST_MESSAGE_TIMESTAMP_PREFIX], -// message_id.to_bytes().as_slice(), -// ] -// .concat(); -// self.get(×tamp_key).map(|v| { -// let v: [u8; 24] = v.try_into().expect("Invalid timestamp value length"); -// let start_index = if is_node1 { 8 } else { 16 }; -// u64::from_le_bytes( -// v[start_index..start_index + 8] -// .try_into() -// .expect("timestamp length valid, shown above"), -// ) -// }) -// } - -// fn get_latest_node_announcement_timestamp(&self, pk: &Pubkey) -> Option { -// let message_id = BroadcastMessageID::NodeAnnouncement(pk.clone()); -// let timestamp_key = [ -// &[BROADCAST_MESSAGE_TIMESTAMP_PREFIX], -// message_id.to_bytes().as_slice(), -// ] -// .concat(); -// self.get(×tamp_key) -// .map(|v| u64::from_le_bytes(v.try_into().expect("Invalid timestamp value length"))) -// } -// } - impl GossipMessageStore for Store { fn get_broadcast_messages_iter( &self, @@ -716,11 +478,13 @@ impl GossipMessageStore for Store { ) -> impl IntoIterator { let cursor = after_cursor.to_bytes(); let prefix = [BROADCAST_MESSAGE_PREFIX]; - let mode = IteratorMode::From(prefix.as_slice(), DbDirection::Forward); + let start = [&prefix, cursor.as_slice()].concat(); + let mode = IteratorMode::From(&start, DbDirection::Forward); self.db .iterator(mode) + // We should skip the value with the same cursor (after_cursor is exclusive). + .skip_while(move |(key, _)| key.as_ref() == &start) .take_while(move |(key, _)| key.starts_with(&prefix)) - .skip_while(move |(key, _)| key.as_ref() <= cursor.as_slice()) .map(|(key, value)| { debug_assert_eq!(key.len(), 1 + CURSOR_SIZE); let mut timestamp_bytes = [0u8; 8];