Skip to content

Commit b2d073f

Browse files
committed
Add support for batches of other types
While the spec only includes commitment_signed messages in batches, there may be other types of batches in the future. Generalize the message batching code to allow for other types in the future.
1 parent af85099 commit b2d073f

File tree

1 file changed

+63
-14
lines changed

1 file changed

+63
-14
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,28 @@ enum InitSyncTracker{
537537
NodesSyncing(NodeId),
538538
}
539539

540+
/// A batch of messages initiated when receiving a `start_batch` message.
541+
struct MessageBatch {
542+
/// The channel associated with all the messages in the batch.
543+
channel_id: ChannelId,
544+
545+
/// The number of messages expected to be in the batch.
546+
batch_size: usize,
547+
548+
/// The batch of messages, which should all be of the same type.
549+
messages: MessageBatchImpl,
550+
}
551+
552+
/// The representation of the message batch, which may different for each message type.
553+
enum MessageBatchImpl {
554+
/// Used before the first message in the batch is received, since the type of messages in the
555+
/// batch is not yet known.
556+
Unknown,
557+
558+
/// A batch of `commitment_signed` messages, where each has a unique `funding_txid`.
559+
CommitmentSigned(BTreeMap<Txid, msgs::CommitmentSigned>),
560+
}
561+
540562
/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
541563
/// forwarding gossip messages to peers altogether.
542564
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
@@ -618,7 +640,7 @@ struct Peer {
618640

619641
inbound_connection: bool,
620642

621-
commitment_signed_batch: Option<(ChannelId, usize, BTreeMap<Txid, msgs::CommitmentSigned>)>,
643+
message_batch: Option<MessageBatch>,
622644
}
623645

624646
impl Peer {
@@ -1157,7 +1179,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
11571179
received_channel_announce_since_backlogged: false,
11581180
inbound_connection: false,
11591181

1160-
commitment_signed_batch: None,
1182+
message_batch: None,
11611183
}));
11621184
Ok(res)
11631185
}
@@ -1215,7 +1237,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
12151237
received_channel_announce_since_backlogged: false,
12161238
inbound_connection: true,
12171239

1218-
commitment_signed_batch: None,
1240+
message_batch: None,
12191241
}));
12201242
Ok(())
12211243
}
@@ -1771,7 +1793,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
17711793
// During splicing, commitment_signed messages need to be collected into a single batch
17721794
// before they are handled.
17731795
if let wire::Message::StartBatch(msg) = message {
1774-
if peer_lock.commitment_signed_batch.is_some() {
1796+
if peer_lock.message_batch.is_some() {
17751797
log_debug!(logger, "Peer {} sent start_batch for channel {} before previous batch completed", log_pubkey!(their_node_id), &msg.channel_id);
17761798
return Err(PeerHandleError { }.into());
17771799
}
@@ -1788,45 +1810,72 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
17881810
return Err(PeerHandleError { }.into());
17891811
}
17901812

1791-
peer_lock.commitment_signed_batch = Some((msg.channel_id, batch_size, BTreeMap::new()));
1813+
let message_batch = MessageBatch {
1814+
channel_id: msg.channel_id,
1815+
batch_size: msg.batch_size as usize,
1816+
messages: MessageBatchImpl::Unknown,
1817+
};
1818+
peer_lock.message_batch = Some(message_batch);
17921819

17931820
return Ok(None);
17941821
}
17951822

17961823
if let wire::Message::CommitmentSigned(msg) = message {
1797-
if let Some((channel_id, batch_size, buffer)) = &mut peer_lock.commitment_signed_batch {
1798-
if msg.channel_id != *channel_id {
1799-
log_debug!(logger, "Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", log_pubkey!(their_node_id), channel_id, &msg.channel_id);
1824+
if let Some(message_batch) = &mut peer_lock.message_batch {
1825+
if let MessageBatchImpl::Unknown = message_batch.messages {
1826+
message_batch.messages = MessageBatchImpl::CommitmentSigned(BTreeMap::new());
1827+
}
1828+
1829+
let buffer = match &mut message_batch.messages {
1830+
MessageBatchImpl::Unknown => unreachable!(),
1831+
MessageBatchImpl::CommitmentSigned(ref mut messages) => messages,
1832+
};
1833+
1834+
if msg.channel_id != message_batch.channel_id {
1835+
log_debug!(logger, "Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", log_pubkey!(their_node_id), message_batch.channel_id, &msg.channel_id);
18001836
return Err(PeerHandleError { }.into());
18011837
}
18021838

18031839
let funding_txid = match msg.funding_txid {
18041840
Some(funding_txid) => funding_txid,
18051841
None => {
1806-
log_debug!(logger, "Peer {} sent batched commitment_signed without a funding_txid for channel {}", log_pubkey!(their_node_id), channel_id);
1842+
log_debug!(logger, "Peer {} sent batched commitment_signed without a funding_txid for channel {}", log_pubkey!(their_node_id), message_batch.channel_id);
18071843
return Err(PeerHandleError { }.into());
18081844
},
18091845
};
18101846

18111847
match buffer.entry(funding_txid) {
18121848
btree_map::Entry::Vacant(entry) => { entry.insert(msg); },
18131849
btree_map::Entry::Occupied(_) => {
1814-
log_debug!(logger, "Peer {} sent batched commitment_signed with duplicate funding_txid {} for channel {}", log_pubkey!(their_node_id), funding_txid, channel_id);
1850+
log_debug!(logger, "Peer {} sent batched commitment_signed with duplicate funding_txid {} for channel {}", log_pubkey!(their_node_id), funding_txid, message_batch.channel_id);
18151851
return Err(PeerHandleError { }.into());
18161852
}
18171853
}
18181854

1819-
if buffer.len() == *batch_size {
1820-
let (channel_id, _, batch) = peer_lock.commitment_signed_batch.take().expect("batch should have been inserted");
1855+
if buffer.len() == message_batch.batch_size {
1856+
let MessageBatch { channel_id, batch_size: _, messages } = peer_lock.message_batch.take().expect("batch should have been inserted");
1857+
let batch = match messages {
1858+
MessageBatchImpl::Unknown => unreachable!(),
1859+
MessageBatchImpl::CommitmentSigned(messages) => messages,
1860+
};
1861+
18211862
return Ok(Some(LogicalMessage::CommitmentSignedBatch(channel_id, batch)));
18221863
} else {
18231864
return Ok(None);
18241865
}
18251866
} else {
18261867
return Ok(Some(LogicalMessage::FromWire(wire::Message::CommitmentSigned(msg))));
18271868
}
1828-
} else if peer_lock.commitment_signed_batch.is_some() {
1829-
log_debug!(logger, "Peer {} sent non-commitment_signed message when expecting batched commitment_signed", log_pubkey!(their_node_id));
1869+
} else if let Some(message_batch) = &peer_lock.message_batch {
1870+
match message_batch.messages {
1871+
MessageBatchImpl::Unknown => {
1872+
log_debug!(logger, "Peer {} sent an unexpected message for a batch", log_pubkey!(their_node_id));
1873+
},
1874+
MessageBatchImpl::CommitmentSigned(_) => {
1875+
log_debug!(logger, "Peer {} sent an unexpected message for a commitment_signed batch", log_pubkey!(their_node_id));
1876+
},
1877+
}
1878+
18301879
return Err(PeerHandleError { }.into());
18311880
}
18321881

0 commit comments

Comments
 (0)