Skip to content

Commit

Permalink
Fix get broadcast messages implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Dec 10, 2024
1 parent d396ca4 commit f8bb4c0
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 243 deletions.
3 changes: 0 additions & 3 deletions src/store/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
/// | 64 | PeerId | Hash256 | ChannelState |
/// | 96 | Cursor | BroadcastMessage |
/// | 97 | BroadcastMessageID | u64 |
/// | 128 | NodeId | NodeInfo |
/// | 129 | Timestamp | NodeId |
/// | 160 | PeerId | MultiAddr |
/// | 192 | Hash256 | PaymentSession |
/// | 193 | OutPoint | Direction | TimedResult |
/// | 224 | Hash256 | ChannelData |
Expand Down
244 changes: 4 additions & 240 deletions src/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,256 +471,20 @@ impl NetworkGraphStateStore for Store {
}
}

// impl GossipMessageStore for Store {
// fn get_broadcast_messages_iter(
// &self,
// after_cursor: &Cursor,
// ) -> impl IntoIterator<Item = BroadcastMessageWithTimestamp> {
// let prefix = [
// &[BROADCAST_MESSAGE_PREFIX],
// after_cursor.to_bytes().as_slice(),
// ]
// .concat();

// self.db
// .prefix_iterator(prefix.as_ref())
// .take_while(move |(key, _)| key.starts_with(&[BROADCAST_MESSAGE_PREFIX]))
// // after_cursor means we should not include key/value with key == cursor
// .skip_while(move |(key, _)| key.as_ref() == &prefix)
// .map(|(key, value)| {
// debug_assert_eq!(key.len(), 1 + CURSOR_SIZE);
// let mut timestamp_bytes = [0u8; 8];
// timestamp_bytes.copy_from_slice(&key[1..9]);
// let timestamp = u64::from_le_bytes(timestamp_bytes);
// let message: BroadcastMessage = serde_json::from_slice(value.as_ref())
// .expect("deserialize BroadcastMessage should be OK");
// (message, timestamp).into()
// })
// }

// fn save_channel_announcement(&self, timestamp: u64, channel_announcement: ChannelAnnouncement) {
// if let Some(_old_timestamp) =
// self.get_latest_channel_announcement_timestamp(&channel_announcement.channel_outpoint)
// {
// // Channel announcement is immutable. If we have already saved one channel announcement,
// // we can early return now.
// return;
// }

// let mut batch = self.batch();
// let cursor = Cursor::new(
// timestamp,
// BroadcastMessageID::ChannelAnnouncement(channel_announcement.channel_outpoint.clone()),
// );

// // Update the timestamps of the channel
// let timestamp_key = [
// &[BROADCAST_MESSAGE_TIMESTAMP_PREFIX],
// cursor.message_id.to_bytes().as_slice(),
// ]
// .concat();
// let mut timestamps = self
// .get(&timestamp_key)
// .map(|v| v.try_into().expect("Invalid timestamp value length"))
// .unwrap_or([0u8; 24]);
// timestamps[..8].copy_from_slice(&timestamp.to_le_bytes());
// batch.put(timestamp_key, timestamps);

// // Save the channel announcement
// let message = BroadcastMessage::ChannelAnnouncement(channel_announcement);
// batch.put(
// [&[BROADCAST_MESSAGE_PREFIX], cursor.to_bytes().as_slice()].concat(),
// serde_json::to_vec(&message).expect("serialize BroadcastMessage should be OK"),
// );
// batch.commit();
// }

// fn save_channel_update(&self, channel_update: ChannelUpdate) {
// let mut batch = self.batch();
// let message_id = BroadcastMessageID::ChannelUpdate(channel_update.channel_outpoint.clone());

// // Remove old channel update if exists
// if let Some(old_timestamp) = self.get_latest_channel_update_timestamp(
// &channel_update.channel_outpoint,
// channel_update.is_update_of_node_1(),
// ) {
// if channel_update.timestamp <= old_timestamp {
// // This is an outdated channel update, early return
// return;
// }
// // Delete old channel update
// batch.delete(
// [
// &[BROADCAST_MESSAGE_PREFIX],
// Cursor::new(old_timestamp, message_id.clone())
// .to_bytes()
// .as_slice(),
// ]
// .concat(),
// );
// }

// // Update the timestamps of the channel
// let timestamp_key = [
// &[BROADCAST_MESSAGE_TIMESTAMP_PREFIX],
// message_id.to_bytes().as_slice(),
// ]
// .concat();
// let mut timestamps = self
// .get(&timestamp_key)
// .map(|v| v.try_into().expect("Invalid timestamp value length"))
// .unwrap_or([0u8; 24]);
// let start_index = if channel_update.is_update_of_node_1() {
// 8
// } else {
// 16
// };
// timestamps[start_index..start_index + 8]
// .copy_from_slice(&channel_update.timestamp.to_le_bytes());
// batch.put(timestamp_key, timestamps);

// // Save the channel update
// let cursor = Cursor::new(channel_update.timestamp, message_id);
// let message = BroadcastMessage::ChannelUpdate(channel_update.clone());
// batch.put(
// [&[BROADCAST_MESSAGE_PREFIX], cursor.to_bytes().as_slice()].concat(),
// serde_json::to_vec(&message).expect("serialize BroadcastMessage should be OK"),
// );
// batch.commit();
// }

// fn save_node_announcement(&self, node_announcement: NodeAnnouncement) {
// let mut batch = self.batch();
// let message_id = BroadcastMessageID::NodeAnnouncement(node_announcement.node_id.clone());

// if let Some(old_timestamp) =
// self.get_latest_node_announcement_timestamp(&node_announcement.node_id)
// {
// if node_announcement.timestamp <= old_timestamp {
// // This is an outdated node announcement. Early return.
// return;
// }

// // Delete old node announcement
// batch.delete(
// [
// &[BROADCAST_MESSAGE_PREFIX],
// Cursor::new(old_timestamp, message_id.clone())
// .to_bytes()
// .as_slice(),
// ]
// .concat(),
// );
// }
// batch.put(
// [
// &[BROADCAST_MESSAGE_TIMESTAMP_PREFIX],
// message_id.to_bytes().as_slice(),
// ]
// .concat(),
// node_announcement.timestamp.to_le_bytes(),
// );

// // Save the channel update
// let cursor = Cursor::new(node_announcement.timestamp, message_id);
// let message = BroadcastMessage::NodeAnnouncement(node_announcement);
// batch.put(
// [&[BROADCAST_MESSAGE_PREFIX], cursor.to_bytes().as_slice()].concat(),
// serde_json::to_vec(&message).expect("serialize BroadcastMessage should be OK"),
// );
// batch.commit();
// }

// fn get_broadcast_message_with_cursor(
// &self,
// cursor: &Cursor,
// ) -> Option<BroadcastMessageWithTimestamp> {
// let timestamp = cursor.timestamp;
// let key = [&[BROADCAST_MESSAGE_PREFIX], cursor.to_bytes().as_slice()].concat();
// self.get(key).map(|v| {
// BroadcastMessageWithTimestamp::from((
// serde_json::from_slice(v.as_ref()).expect("deserialize Hash256 should be OK"),
// timestamp,
// ))
// })
// }

// fn get_latest_broadcast_message_cursor(&self) -> Option<Cursor> {
// let prefix = vec![BROADCAST_MESSAGE_PREFIX];
// let mode = IteratorMode::End;
// self.db
// .iterator(mode)
// .take_while(|(key, _)| key.starts_with(&prefix))
// .last()
// .map(|(key, _)| {
// let last_key = key.to_vec();
// Cursor::from_bytes(&last_key[1..]).expect("deserialize Cursor should be OK")
// })
// }

// fn get_latest_channel_announcement_timestamp(&self, outpoint: &OutPoint) -> Option<u64> {
// let message_id = BroadcastMessageID::ChannelAnnouncement(outpoint.clone());
// let timestamp_key = [
// &[BROADCAST_MESSAGE_TIMESTAMP_PREFIX],
// message_id.to_bytes().as_slice(),
// ]
// .concat();
// self.get(&timestamp_key).map(|v| {
// let v: [u8; 24] = v.try_into().expect("Invalid timestamp value length");
// u64::from_le_bytes(
// v[..8]
// .try_into()
// .expect("timestamp length valid, shown above"),
// )
// })
// }

// fn get_latest_channel_update_timestamp(
// &self,
// outpoint: &OutPoint,
// is_node1: bool,
// ) -> Option<u64> {
// let message_id = BroadcastMessageID::ChannelUpdate(outpoint.clone());
// let timestamp_key = [
// &[BROADCAST_MESSAGE_TIMESTAMP_PREFIX],
// message_id.to_bytes().as_slice(),
// ]
// .concat();
// self.get(&timestamp_key).map(|v| {
// let v: [u8; 24] = v.try_into().expect("Invalid timestamp value length");
// let start_index = if is_node1 { 8 } else { 16 };
// u64::from_le_bytes(
// v[start_index..start_index + 8]
// .try_into()
// .expect("timestamp length valid, shown above"),
// )
// })
// }

// fn get_latest_node_announcement_timestamp(&self, pk: &Pubkey) -> Option<u64> {
// let message_id = BroadcastMessageID::NodeAnnouncement(pk.clone());
// let timestamp_key = [
// &[BROADCAST_MESSAGE_TIMESTAMP_PREFIX],
// message_id.to_bytes().as_slice(),
// ]
// .concat();
// self.get(&timestamp_key)
// .map(|v| u64::from_le_bytes(v.try_into().expect("Invalid timestamp value length")))
// }
// }

impl GossipMessageStore for Store {
fn get_broadcast_messages_iter(
&self,
after_cursor: &Cursor,
) -> impl IntoIterator<Item = crate::fiber::types::BroadcastMessageWithTimestamp> {
let cursor = after_cursor.to_bytes();
let prefix = [BROADCAST_MESSAGE_PREFIX];
let mode = IteratorMode::From(prefix.as_slice(), DbDirection::Forward);
let start = [&prefix, cursor.as_slice()].concat();
let mode = IteratorMode::From(&start, DbDirection::Forward);
self.db
.iterator(mode)
// We should skip the value with the same cursor (after_cursor is exclusive).
.skip_while(move |(key, _)| key.as_ref() == &start)
.take_while(move |(key, _)| key.starts_with(&prefix))
.skip_while(move |(key, _)| key.as_ref() <= cursor.as_slice())
.map(|(key, value)| {
debug_assert_eq!(key.len(), 1 + CURSOR_SIZE);
let mut timestamp_bytes = [0u8; 8];
Expand Down

0 comments on commit f8bb4c0

Please sign in to comment.