diff --git a/src/cch/actor.rs b/src/cch/actor.rs index 6644d458..58e4a7c0 100644 --- a/src/cch/actor.rs +++ b/src/cch/actor.rs @@ -355,7 +355,7 @@ impl CchActor { } order.channel_id = Some(tlc_notification.channel_id); - order.tlc_id = Some(tlc_notification.tlc.id.into()); + order.tlc_id = Some(tlc_notification.tlc.tlc_id.into()); state.orders_db.update_send_btc_order(order.clone()).await?; let req = routerrpc::SendPaymentRequest { @@ -593,7 +593,7 @@ impl CchActor { expiry: now_timestamp_as_millis_u64() + self.config.ckb_final_tlc_expiry_delta, hash_algorithm: HashAlgorithm::Sha256, - peeled_onion_packet: None, + onion_packet: None, previous_tlc: None, }, rpc_reply, diff --git a/src/errors.rs b/src/errors.rs index ab7d296c..2227576f 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -39,7 +39,7 @@ pub enum Error { #[error("Send payment error: {0}")] SendPaymentError(String), #[error("Send payment first hop error: {0}")] - SendPaymentFirstHopError(String), + SendPaymentFirstHopError(String, bool), #[error("InvalidParameter: {0}")] InvalidParameter(String), #[error("Network Graph error: {0}")] diff --git a/src/fiber/channel.rs b/src/fiber/channel.rs index 45fa6715..c73636b4 100644 --- a/src/fiber/channel.rs +++ b/src/fiber/channel.rs @@ -33,8 +33,8 @@ use musig2::{ PubNonce, SecNonce, }; use ractor::{ - async_trait as rasync_trait, call, Actor, ActorProcessingErr, ActorRef, OutputPort, - RpcReplyPort, SpawnErr, + async_trait as rasync_trait, call, concurrency::Duration, Actor, ActorProcessingErr, ActorRef, + OutputPort, RpcReplyPort, SpawnErr, }; use serde::{Deserialize, Serialize}; @@ -93,6 +93,8 @@ pub const INITIAL_COMMITMENT_NUMBER: u64 = 0; // The channel is disabled, and no more tlcs can be added to the channel. pub const CHANNEL_DISABLED_FLAG: u32 = 1; +const AUTO_SETDOWN_TLC_INTERVAL: Duration = Duration::from_secs(2); + #[derive(Debug)] pub enum ChannelActorMessage { /// Command are the messages that are sent to the channel actor to perform some action. @@ -112,7 +114,7 @@ pub struct AddTlcResponse { #[derive(Clone)] pub struct TlcNotification { pub channel_id: Hash256, - pub tlc: TLC, + pub tlc: AddTlcInfo, pub script: Script, } @@ -137,14 +139,14 @@ pub enum TxCollaborationCommand { TxComplete(), } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AddTlcCommand { pub amount: u128, pub payment_hash: Hash256, pub expiry: u64, pub hash_algorithm: HashAlgorithm, - /// Peeled onion packet for the current node - pub peeled_onion_packet: Option, + /// Onion packet for the next node + pub onion_packet: Option, pub previous_tlc: Option<(Hash256, u64)>, } @@ -375,27 +377,8 @@ where Ok(()) } FiberChannelMessage::CommitmentSigned(commitment_signed) => { - state.handle_commitment_signed_message(commitment_signed, &self.network)?; - if let ChannelState::SigningCommitment(flags) = state.state { - if !flags.contains(SigningCommitmentFlags::OUR_COMMITMENT_SIGNED_SENT) { - // TODO: maybe we should send our commitment_signed message here. - debug!("CommitmentSigned message received, but we haven't sent our commitment_signed message yet"); - // Notify outside observers. - self.network - .send_message(NetworkActorMessage::new_notification( - NetworkServiceEvent::CommitmentSignaturePending( - state.get_remote_peer_id(), - state.get_id(), - state.get_current_commitment_number(false), - ), - )) - .expect(ASSUME_NETWORK_ACTOR_ALIVE); - } - } - self.try_to_forward_pending_tlc(state).await; - self.try_to_settle_down_tlc(state); - self.try_to_send_remove_tlcs(state).await; - Ok(()) + self.handle_commitment_signed_peer_message(state, commitment_signed) + .await } FiberChannelMessage::TxSignatures(tx_signatures) => { // We're the one who sent tx_signature first, and we received a tx_signature message. @@ -468,72 +451,10 @@ where Ok(()) } FiberChannelMessage::AddTlc(add_tlc) => { - let tlc_id = add_tlc.tlc_id; - let tlc_count = state.tlcs.len(); - if let Err(e) = self.handle_add_tlc_peer_message(state, add_tlc).await { - // we assume that TLC was not inserted into our state, - // so we can safely send RemoveTlc message to the peer - // note this new add_tlc may be trying to add a duplicate tlc, - // so we use tlc count to make sure no new tlc was added - // and only send RemoveTlc message to peer if the TLC is not in our state - error!("Error handling AddTlc message: {:?}", e); - assert!(tlc_count == state.tlcs.len()); - let error_detail = self.get_tlc_detail_error(state, &e).await; - if state.get_received_tlc(tlc_id).is_none() { - self.network - .send_message(NetworkActorMessage::new_command( - NetworkActorCommand::SendFiberMessage(FiberMessageWithPeerId::new( - state.get_remote_peer_id(), - FiberMessage::remove_tlc(RemoveTlc { - channel_id: state.get_id(), - tlc_id, - reason: RemoveTlcReason::RemoveTlcFail(TlcErrPacket::new( - error_detail, - )), - }), - )), - )) - .expect(ASSUME_NETWORK_ACTOR_ALIVE); - } - return Err(e); - } - Ok(()) + self.handle_add_tlc_peer_message(state, add_tlc) } FiberChannelMessage::RemoveTlc(remove_tlc) => { - state.check_for_tlc_update(None)?; - let channel_id = state.get_id(); - - let remove_reason = remove_tlc.reason.clone(); - let tlc_details = state - .remove_tlc_with_reason(TLCId::Offered(remove_tlc.tlc_id), &remove_reason)?; - if let ( - Some(ref udt_type_script), - RemoveTlcReason::RemoveTlcFulfill(RemoveTlcFulfill { payment_preimage }), - ) = (state.funding_udt_type_script.clone(), &remove_reason) - { - let mut tlc = tlc_details.tlc.clone(); - tlc.payment_preimage = Some(*payment_preimage); - self.subscribers - .settled_tlcs_subscribers - .send(TlcNotification { - tlc, - channel_id, - script: udt_type_script.clone(), - }); - } - if tlc_details.tlc.previous_tlc.is_none() { - // only the original sender of the TLC should send `TlcRemoveReceived` event - // because only the original sender cares about the TLC event to settle the payment - self.network - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::TlcRemoveReceived( - tlc_details.tlc.payment_hash, - remove_reason, - ), - )) - .expect("myself alive"); - } - Ok(()) + self.handle_remove_tlc_peer_message(state, remove_tlc) } FiberChannelMessage::Shutdown(shutdown) => { let flags = match state.state { @@ -659,9 +580,8 @@ where } ProcessingChannelError::TlcAmountIsTooLow => TlcErrorCode::AmountBelowMinimum, ProcessingChannelError::TlcNumberExceedLimit - | ProcessingChannelError::TlcValueInflightExceedLimit => { - TlcErrorCode::TemporaryChannelFailure - } + | ProcessingChannelError::TlcValueInflightExceedLimit + | ProcessingChannelError::WaitingTlcAck => TlcErrorCode::TemporaryChannelFailure, ProcessingChannelError::InvalidState(_) => match state.state { // we can not revert back up `ChannelReady` after `ShuttingDown` ChannelState::Closed(_) | ChannelState::ShuttingDown(_) => { @@ -677,8 +597,14 @@ where // otherwise, channel maybe not ready _ => TlcErrorCode::TemporaryChannelFailure, }, - // TODO: there maybe more error types here - _ => TlcErrorCode::IncorrectOrUnknownPaymentDetails, + ProcessingChannelError::RepeatedProcessing(_) + | ProcessingChannelError::SpawnErr(_) + | ProcessingChannelError::Musig2SigningError(_) + | ProcessingChannelError::Musig2VerifyError(_) + | ProcessingChannelError::CapacityError(_) => TlcErrorCode::TemporaryNodeFailure, + ProcessingChannelError::InvalidParameter(_) => { + TlcErrorCode::IncorrectOrUnknownPaymentDetails + } }; let channel_update = if error_code.is_update() { @@ -693,132 +619,153 @@ where ) } - async fn try_to_forward_pending_tlc(&self, state: &mut ChannelActorState) { - let tlc_infos = state.get_tlcs_for_forwarding(); - for info in tlc_infos { - assert!(info.tlc.is_received()); - let added_tlc_id = info.tlc.id.into(); - let peeled_onion_packet = info - .tlc - .peeled_onion_packet - .expect("peeled onion packet exists in tlcs for forwarding"); - let _ = self - .handle_forward_onion_packet(state, peeled_onion_packet, added_tlc_id) - .await; - } + async fn handle_commitment_signed_peer_message( + &self, + state: &mut ChannelActorState, + commitment_signed: CommitmentSigned, + ) -> Result<(), ProcessingChannelError> { + // build commitment tx and verify signature from remote, if passed send ACK for partner + state.handle_commitment_signed_message(commitment_signed, &self.network)?; + self.flush_staging_tlc_operations(state).await; + Ok(()) } - async fn try_to_send_remove_tlcs(&self, state: &mut ChannelActorState) { - let tlc_infos = state.get_tlcs_for_sending_remove_tlcs(); - for tlc_info in tlc_infos { - assert!(tlc_info.is_offered()); - let remove_reason = tlc_info.removed_at.expect("expect remove_at").1; - if let Some((previous_channel_id, previous_tlc)) = tlc_info.tlc.previous_tlc { - assert!(previous_tlc.is_received()); - info!( - "begin to remove tlc from previous channel: {:?}", - &previous_tlc - ); - assert!(previous_channel_id != state.get_id()); - let (send, recv) = oneshot::channel::>(); - let port = RpcReplyPort::from(send); - self.network - .send_message(NetworkActorMessage::new_command( - NetworkActorCommand::ControlFiberChannel(ChannelCommandWithId { - channel_id: previous_channel_id, - command: ChannelCommand::RemoveTlc( - RemoveTlcCommand { - id: previous_tlc.into(), - reason: remove_reason.clone(), - }, - port, - ), - }), - )) - .expect(ASSUME_NETWORK_ACTOR_ALIVE); - let res = recv.await.expect("remove tlc replied"); - info!("remove tlc from previous channel: {:?}", &res); - } else { - unreachable!("remove tlc without previous tlc"); + async fn flush_staging_tlc_operations(&self, state: &mut ChannelActorState) { + let pending_apply_tlcs = state.tlc_state.commit_remote_tlcs(); + for tlc_info in pending_apply_tlcs { + match tlc_info { + TlcKind::AddTlc(add_tlc) => { + if add_tlc.is_received() { + if let Err(e) = self.apply_add_tlc_operation(state, &add_tlc).await { + let error_detail = self.get_tlc_detail_error(state, &e).await; + self.register_tlc_remove( + state, + add_tlc.tlc_id, + RemoveTlcReason::RemoveTlcFail(TlcErrPacket::new( + error_detail.clone(), + )), + ); + self.network + .clone() + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::AddTlcFailed( + state.get_local_peer_id(), + add_tlc.payment_hash, + error_detail, + ), + )) + .expect(ASSUME_NETWORK_ACTOR_ALIVE); + } + } + } + TlcKind::RemoveTlc(remove_tlc) => { + let _ = self + .apply_remove_tlc_operation(state, remove_tlc) + .await + .map_err(|e| { + debug!("error happened in apply_remove_tlc_operation: {:?}", e); + error!("Error handling apply_remove_tlc_operation: {:?}", e); + }); + } } - state.set_offered_tlc_removed(tlc_info.tlc.id.into()); } } - fn try_to_settle_down_tlc(&self, state: &mut ChannelActorState) { - let tlcs = state.get_tlcs_for_settle_down(); - let mut update_invoice_payment_hash = false; - for tlc_info in tlcs { - let tlc = tlc_info.tlc.clone(); - if let Some(invoice) = self.store.get_invoice(&tlc.payment_hash) { - let status = self.get_invoice_status(&invoice); - match status { - CkbInvoiceStatus::Expired | CkbInvoiceStatus::Cancelled => { - let error_code = match status { - CkbInvoiceStatus::Expired => TlcErrorCode::InvoiceExpired, - CkbInvoiceStatus::Cancelled => TlcErrorCode::InvoiceCancelled, - _ => unreachable!(), - }; - let command = RemoveTlcCommand { - id: tlc.get_id(), - reason: RemoveTlcReason::RemoveTlcFail(TlcErrPacket::new(TlcErr::new( - error_code, - ))), - }; - let result = self.handle_remove_tlc_command(state, command); - info!("try to settle down tlc: {:?} result: {:?}", &tlc, &result); - } - CkbInvoiceStatus::Paid => { - unreachable!("Paid invoice shold not be paid again"); - } - _ => { - update_invoice_payment_hash = true; - } - } - } + async fn try_to_relay_remove_tlc(&self, state: &mut ChannelActorState, tlc_id: u64) { + let tlc_info = state.get_offered_tlc(tlc_id).expect("expect tlc"); + assert!(tlc_info.is_offered()); + let remove_reason = tlc_info + .removed_at + .as_ref() + .expect("expect remove_at") + .1 + .clone(); + if let Some((previous_channel_id, previous_tlc)) = tlc_info.previous_tlc { + assert!(previous_tlc.is_received()); + info!( + "begin to remove tlc from previous channel: {:?}", + &previous_tlc + ); + assert!(previous_channel_id != state.get_id()); + let (send, recv) = oneshot::channel::>(); + let port = RpcReplyPort::from(send); + self.network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::ControlFiberChannel(ChannelCommandWithId { + channel_id: previous_channel_id, + command: ChannelCommand::RemoveTlc( + RemoveTlcCommand { + id: previous_tlc.into(), + reason: remove_reason, + }, + port, + ), + }), + )) + .expect(ASSUME_NETWORK_ACTOR_ALIVE); + let res = recv.await.expect("remove tlc replied"); + debug!("remove tlc from previous channel: {:?}", &res); + } else { + unreachable!("remove tlc without previous tlc"); + } + } - let preimage = if let Some(preimage) = tlc.payment_preimage { - preimage - } else if let Some(preimage) = self.store.get_invoice_preimage(&tlc.payment_hash) { - preimage - } else { - // here maybe the tlc is not the last hop, we can not settle down it now. - // maybe we should exclude it from the settle down list. - continue; - }; - let command = RemoveTlcCommand { - id: tlc.get_id(), - reason: RemoveTlcReason::RemoveTlcFulfill(RemoveTlcFulfill { - payment_preimage: preimage, - }), - }; - let result = self.handle_remove_tlc_command(state, command); - info!("try to settle down tlc: {:?} result: {:?}", &tlc, &result); - if result.is_ok() && update_invoice_payment_hash { - let _ = self - .store - .update_invoice_status(&tlc.payment_hash, CkbInvoiceStatus::Paid); + fn try_to_settle_down_tlc(&self, state: &mut ChannelActorState, tlc_id: u64) { + let tlc_info = state.get_received_tlc(tlc_id).expect("expect tlc"); + let preimage = tlc_info + .payment_preimage + .or_else(|| self.store.get_invoice_preimage(&tlc_info.payment_hash)); + + let preimage = if let Some(preimage) = preimage { + preimage + } else { + return; + }; + + let mut remove_reason = RemoveTlcReason::RemoveTlcFulfill(RemoveTlcFulfill { + payment_preimage: preimage, + }); + let tlc = tlc_info.clone(); + if let Some(invoice) = self.store.get_invoice(&tlc.payment_hash) { + let status = self.get_invoice_status(&invoice); + match status { + CkbInvoiceStatus::Expired | CkbInvoiceStatus::Cancelled => { + let error_code = match status { + CkbInvoiceStatus::Expired => TlcErrorCode::InvoiceExpired, + CkbInvoiceStatus::Cancelled => TlcErrorCode::InvoiceCancelled, + _ => unreachable!("unexpected invoice status"), + }; + remove_reason = + RemoveTlcReason::RemoveTlcFail(TlcErrPacket::new(TlcErr::new(error_code))); + } + CkbInvoiceStatus::Paid => { + unreachable!("Paid invoice shold not be paid again"); + } + _ => { + self.store + .update_invoice_status(&tlc.payment_hash, CkbInvoiceStatus::Paid) + .expect("update invoice status error"); + } } - // we only handle one tlc at a time. - break; } + + debug!( + "register remove reason: {:?} with reason: {:?}", + tlc.tlc_id, remove_reason + ); + self.register_tlc_remove(state, tlc.tlc_id, remove_reason); } - async fn handle_add_tlc_peer_message( + async fn apply_add_tlc_operation( &self, state: &mut ChannelActorState, - add_tlc: AddTlc, + add_tlc: &AddTlcInfo, ) -> Result<(), ProcessingChannelError> { - state.check_for_tlc_update(Some(add_tlc.amount))?; state.check_tlc_expiry(add_tlc.expiry)?; - // check the onion_packet is valid or not, if not, we should return an error. - // If there is a next hop, we should send the AddTlc message to the next hop. - // If this is the last hop, we should check the payment hash and amount and then - // try to fulfill the payment, find the corresponding payment preimage from payment hash. - let mut preimage = None; - let mut update_invoice_payment_hash: Option = None; + assert!(state.get_received_tlc(add_tlc.tlc_id.into()).is_some()); + let payment_hash = add_tlc.payment_hash; let peeled_onion_packet = match add_tlc.onion_packet.clone() { Some(onion_packet) => Some( self.peel_onion_packet(onion_packet, add_tlc.payment_hash.clone()) @@ -842,33 +789,35 @@ where add_tlc.amount, forward_amount ); - // TODO: check the expiry time, if it's expired, we should return an error. if peeled_onion_packet.is_last() { if forward_amount != add_tlc.amount { return Err(ProcessingChannelError::FinalIncorrectHTLCAmount); } - let payment_hash = add_tlc.payment_hash; if let Some(invoice) = self.store.get_invoice(&payment_hash) { let invoice_status = self.get_invoice_status(&invoice); if invoice_status != CkbInvoiceStatus::Open { return Err(ProcessingChannelError::FinalInvoiceInvalid(invoice_status)); } - update_invoice_payment_hash = Some(payment_hash); + self.store + .update_invoice_status(&payment_hash, CkbInvoiceStatus::Received) + .expect("update invoice status failed"); } // if this is the last hop, store the preimage. // though we will RemoveTlcFulfill the TLC in try_to_settle_down_tlc function, // here we can do error check early here for better error handling. - preimage = peeled_onion_packet + let preimage = peeled_onion_packet .current .payment_preimage .or_else(|| self.store.get_invoice_preimage(&add_tlc.payment_hash)); + if let Some(preimage) = preimage { let filled_payment_hash: Hash256 = add_tlc.hash_algorithm.hash(preimage).into(); if add_tlc.payment_hash != filled_payment_hash { return Err(ProcessingChannelError::FinalIncorrectPreimage); } + state.set_received_tlc_preimage(add_tlc.tlc_id.into(), Some(preimage)); } else { return Err(ProcessingChannelError::FinalIncorrectPaymentHash); } @@ -889,31 +838,109 @@ where ); return Err(ProcessingChannelError::TlcForwardFeeIsTooLow); } + // if this is not the last hop, forward TLC to next hop + self.handle_forward_onion_packet( + state, + peeled_onion_packet.clone(), + add_tlc.tlc_id.into(), + ) + .await?; } } - let tlc = state.create_inbounding_tlc(add_tlc.clone(), preimage, peeled_onion_packet)?; - state.insert_tlc(tlc.clone())?; - if let Some(payment_hash) = update_invoice_payment_hash { - self.store - .update_invoice_status(&payment_hash, CkbInvoiceStatus::Received) - .expect("update invoice status failed"); - } if let Some(ref udt_type_script) = state.funding_udt_type_script { self.subscribers .pending_received_tlcs_subscribers .send(TlcNotification { - tlc: tlc.clone(), + tlc: add_tlc.clone(), channel_id: state.get_id(), script: udt_type_script.clone(), }); } - warn!("created tlc: {:?}", &tlc); - // TODO: here we didn't send any ack message to the peer. - // The peer may falsely believe that we have already processed this message, - // while we have crashed. We need a way to make sure that the peer will resend - // this message, and our processing of this message is idempotent. + // we don't need to settle down the tlc if it is not the last hop here, + // some e2e tests are calling AddTlc manually, so we can not use onion packet to + // check whether it's the last hop here, maybe need to revisit in future. + self.try_to_settle_down_tlc(state, add_tlc.tlc_id.into()); + + warn!("finished check tlc for peer message: {:?}", &add_tlc.tlc_id); + Ok(()) + } + + fn handle_add_tlc_peer_message( + &self, + state: &mut ChannelActorState, + add_tlc: AddTlc, + ) -> Result<(), ProcessingChannelError> { + // TODO: here we only check the error which sender didn't follow agreed rules, + // if any error happened here we need go to shutdown procedure + + state.check_for_tlc_update(Some(add_tlc.amount), false)?; + let tlc_info = state.create_inbounding_tlc(add_tlc.clone())?; + state.check_insert_tlc(&tlc_info)?; + state + .tlc_state + .add_remote_tlc(TlcKind::AddTlc(tlc_info.clone())); + state.increment_next_received_tlc_id(); + Ok(()) + } + + fn handle_remove_tlc_peer_message( + &self, + state: &mut ChannelActorState, + remove_tlc: RemoveTlc, + ) -> Result<(), ProcessingChannelError> { + state.check_for_tlc_update(None, false)?; + // TODO: here if we received a invalid remove tlc, it's maybe a malioucious peer, + // maybe we need to go through shutdown process for this error + state + .check_remove_tlc_with_reason(TLCId::Offered(remove_tlc.tlc_id), &remove_tlc.reason)?; + let tlc_kind = TlcKind::RemoveTlc(RemoveTlcInfo { + tlc_id: TLCId::Offered(remove_tlc.tlc_id), + channel_id: remove_tlc.channel_id, + reason: remove_tlc.reason.clone(), + }); + state.tlc_state.add_remote_tlc(tlc_kind.clone()); + Ok(()) + } + + async fn apply_remove_tlc_operation( + &self, + state: &mut ChannelActorState, + remove_tlc: RemoveTlcInfo, + ) -> Result<(), ProcessingChannelError> { + let channel_id = state.get_id(); + let remove_reason = remove_tlc.reason.clone(); + let tlc_info = state.remove_tlc_with_reason(remove_tlc.tlc_id, &remove_reason)?; + if let ( + Some(ref udt_type_script), + RemoveTlcReason::RemoveTlcFulfill(RemoveTlcFulfill { payment_preimage }), + ) = (state.funding_udt_type_script.clone(), &remove_reason) + { + let mut tlc = tlc_info.clone(); + tlc.payment_preimage = Some(*payment_preimage); + self.subscribers + .settled_tlcs_subscribers + .send(TlcNotification { + tlc, + channel_id, + script: udt_type_script.clone(), + }); + } + if tlc_info.previous_tlc.is_none() { + // only the original sender of the TLC should send `TlcRemoveReceived` event + // because only the original sender cares about the TLC event to settle the payment + self.network + .send_message(NetworkActorMessage::new_event( + NetworkActorEvent::TlcRemoveReceived(tlc_info.payment_hash, remove_reason), + )) + .expect("myself alive"); + } else { + // relay RemoveTlc to previous channel if needed + self.try_to_relay_remove_tlc(state, tlc_info.tlc_id.into()) + .await; + } + state.tlc_state.shrink_removed_tlcs(remove_tlc.tlc_id); Ok(()) } @@ -938,27 +965,11 @@ where .expect(ASSUME_NETWORK_ACTOR_ALIVE); // If we failed to forward the onion packet, we should remove the tlc. - if let Err(res) = recv.await.expect("expect command replied") { - error!("Error forwarding onion packet: {:?}", res); - let (send, recv) = oneshot::channel::>(); - let port = RpcReplyPort::from(send); - self.network - .send_message(NetworkActorMessage::new_command( - NetworkActorCommand::ControlFiberChannel(ChannelCommandWithId { - channel_id: state.get_id(), - command: ChannelCommand::RemoveTlc( - RemoveTlcCommand { - id: added_tlc_id, - reason: RemoveTlcReason::RemoveTlcFail(res), - }, - port, - ), - }), - )) - .expect(ASSUME_NETWORK_ACTOR_ALIVE); - let _ = recv.await.expect("RemoveTlc command replied"); + if let Err(_res) = recv.await.expect("expect command replied") { + return Err(ProcessingChannelError::PeelingOnionPacketError( + "failed to forward".to_string(), + )); } - state.set_received_tlc_forwarded(added_tlc_id); Ok(()) } @@ -1065,6 +1076,8 @@ where )) .expect("myself alive"); + state.save_remote_nonce_for_raa(); + match flags { CommitmentSignedFlags::SigningCommitment(flags) => { let flags = flags | SigningCommitmentFlags::OUR_COMMITMENT_SIGNED_SENT; @@ -1084,33 +1097,30 @@ where state: &mut ChannelActorState, command: AddTlcCommand, ) -> Result { - debug!("handle add tlc command : {:?}", &command); - state.check_for_tlc_update(Some(command.amount))?; + state.check_for_tlc_update(Some(command.amount), true)?; state.check_tlc_expiry(command.expiry)?; - let tlc = state.create_outbounding_tlc(command); - state.insert_tlc(tlc.clone())?; + let tlc = state.create_outbounding_tlc(command.clone()); + state.check_insert_tlc(tlc.as_add_tlc())?; + state.tlc_state.add_local_tlc(tlc.clone()); + state.increment_next_offered_tlc_id(); debug!("Inserted tlc into channel state: {:?}", &tlc); - // TODO: Note that since message sending is async, - // we can't guarantee anything about the order of message sending - // and state updating. And any of these may fail while the other succeeds. - // We may need to handle all these possibilities. - // To make things worse, we currently don't have a way to ACK all the messages. + + let add_tlc = AddTlc { + channel_id: state.get_id(), + tlc_id: tlc.tlc_id().into(), + amount: command.amount, + payment_hash: tlc.payment_hash(), + expiry: command.expiry, + hash_algorithm: command.hash_algorithm, + onion_packet: command.onion_packet, + }; // Send tlc update message to peer. - let msg = FiberMessageWithPeerId::new( - state.get_remote_peer_id(), - FiberMessage::add_tlc(AddTlc { - channel_id: state.get_id(), - tlc_id: tlc.id.into(), - amount: tlc.amount, - payment_hash: tlc.payment_hash, - expiry: tlc.expiry, - hash_algorithm: tlc.hash_algorithm, - onion_packet: tlc.peeled_onion_packet.and_then(|packet| packet.next), - }), - ); + let msg = + FiberMessageWithPeerId::new(state.get_remote_peer_id(), FiberMessage::add_tlc(add_tlc)); debug!("Sending AddTlc message: {:?}", &msg); + self.network .send_message(NetworkActorMessage::new_command( NetworkActorCommand::SendFiberMessage(msg), @@ -1118,7 +1128,8 @@ where .expect(ASSUME_NETWORK_ACTOR_ALIVE); self.handle_commitment_signed_command(state)?; - Ok(tlc.id.into()) + state.tlc_state.set_waiting_ack(true); + Ok(tlc.tlc_id().into()) } pub fn handle_remove_tlc_command( @@ -1126,8 +1137,14 @@ where state: &mut ChannelActorState, command: RemoveTlcCommand, ) -> ProcessingChannelResult { - state.check_for_tlc_update(None)?; - let tlc = state.remove_tlc_with_reason(TLCId::Received(command.id), &command.reason)?; + state.check_for_tlc_update(None, true)?; + state.check_remove_tlc_with_reason(TLCId::Received(command.id), &command.reason)?; + let tlc_kind = TlcKind::RemoveTlc(RemoveTlcInfo { + channel_id: state.get_id(), + tlc_id: TLCId::Received(command.id), + reason: command.reason.clone(), + }); + state.tlc_state.add_local_tlc(tlc_kind.clone()); let msg = FiberMessageWithPeerId::new( state.get_remote_peer_id(), FiberMessage::remove_tlc(RemoveTlc { @@ -1145,12 +1162,13 @@ where debug!( "Channel ({:?}) balance after removing tlc {:?}: local balance: {}, remote balance: {}", state.get_id(), - tlc, + tlc_kind, state.to_local_amount, state.to_remote_amount ); state.maybe_transition_to_shutdown(&self.network)?; self.handle_commitment_signed_command(state)?; + state.tlc_state.set_waiting_ack(true); Ok(()) } @@ -1263,6 +1281,40 @@ where Ok(()) } + pub fn register_tlc_remove( + &self, + state: &mut ChannelActorState, + tlc_id: TLCId, + reason: RemoveTlcReason, + ) { + let command = RemoveTlcCommand { + id: tlc_id.into(), + reason: reason.clone(), + }; + if let Ok(_) = self.handle_remove_tlc_command(state, command) { + return; + } else { + error!("Failed to remove tlc: {:?}, retry it later", &tlc_id); + state.tlc_state.set_tlc_pending_remove(tlc_id, reason); + } + } + + pub fn check_tlc_setdown(&self, state: &mut ChannelActorState) { + let pending_removes = state.tlc_state.get_pending_remove(); + for (tlc_id, reason) in pending_removes.iter() { + let id: u64 = (*tlc_id).into(); + let command = RemoveTlcCommand { + id, + reason: reason.clone(), + }; + if let Ok(_) = self.handle_remove_tlc_command(state, command) { + state.tlc_state.remove_pending_remove_tlc(&tlc_id); + } else { + error!("Failed to remove tlc: {:?}, retry it later", &tlc_id); + } + } + } + // This is the dual of `handle_tx_collaboration_msg`. Any logic error here is likely // to present in the other function as well. pub fn handle_tx_collaboration_command( @@ -1479,6 +1531,9 @@ where state.update_state(ChannelState::Closed(CloseFlags::UNCOOPERATIVE)); debug!("Channel closed with uncooperative close"); } + ChannelEvent::CheckTlcSetdown => { + self.check_tlc_setdown(state); + } ChannelEvent::PeerDisconnected => { myself.stop(Some("PeerDisconnected".to_string())); } @@ -1825,6 +1880,17 @@ where } } + async fn post_start( + &self, + myself: ActorRef, + _state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + myself.send_interval(AUTO_SETDOWN_TLC_INTERVAL, || { + ChannelActorMessage::Event(ChannelEvent::CheckTlcSetdown) + }); + Ok(()) + } + async fn handle( &self, myself: ActorRef, @@ -1904,76 +1970,560 @@ impl CommitmentNumbers { } } -#[derive(Copy, Clone, Debug, Serialize, Deserialize)] -pub struct TLCIds { - pub offering: u64, - pub received: u64, +#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Ord)] +pub enum TLCId { + Offered(u64), + Received(u64), } -impl Default for TLCIds { - fn default() -> Self { - Self::new() +impl From for u64 { + fn from(id: TLCId) -> u64 { + match id { + TLCId::Offered(id) => id, + TLCId::Received(id) => id, + } + } +} + +impl TLCId { + pub fn is_offered(&self) -> bool { + matches!(self, TLCId::Offered(_)) + } + + pub fn is_received(&self) -> bool { + !self.is_offered() + } + + pub fn flip(&self) -> Self { + match self { + TLCId::Offered(id) => TLCId::Received(*id), + TLCId::Received(id) => TLCId::Offered(*id), + } + } + + pub fn flip_mut(&mut self) { + *self = self.flip(); + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +pub enum TlcKind { + AddTlc(AddTlcInfo), + RemoveTlc(RemoveTlcInfo), +} + +impl TlcKind { + pub fn log(&self) -> String { + match self { + TlcKind::AddTlc(add_tlc) => { + format!("{:?}", &add_tlc.tlc_id) + } + TlcKind::RemoveTlc(remove_tlc) => { + format!("RemoveTlc({:?})", &remove_tlc.tlc_id) + } + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +pub struct AddTlcInfo { + pub channel_id: Hash256, + pub tlc_id: TLCId, + pub amount: u128, + pub payment_hash: Hash256, + pub expiry: u64, + pub hash_algorithm: HashAlgorithm, + // the onion packet for multi-hop payment + pub onion_packet: Option, + pub created_at: CommitmentNumbers, + pub removed_at: Option<(CommitmentNumbers, RemoveTlcReason)>, + pub payment_preimage: Option, + + /// Note: `previous_tlc` is used to track the tlc chain for a multi-tlc payment, + /// we need to know previous when removing tlc backwardly. + /// + /// Node A ---------> Node B ------------> Node C ----------> Node D + /// tlc_1 <---> (tlc_1) (tlc_2) <---> (tlc_2) (tlc_3) <----> tlc_3 + /// ^^^^ ^^^^ + /// + pub previous_tlc: Option<(Hash256, TLCId)>, +} + +impl AddTlcInfo { + pub fn is_offered(&self) -> bool { + self.tlc_id.is_offered() + } + + pub fn is_received(&self) -> bool { + !self.is_offered() + } + + pub fn get_commitment_numbers(&self) -> CommitmentNumbers { + self.created_at + } + + pub fn flip_mut(&mut self) { + self.tlc_id.flip_mut(); + } + + /// Get the value for the field `htlc_type` in commitment lock witness. + /// - Lowest 1 bit: 0 if the tlc is offered by the remote party, 1 otherwise. + /// - High 7 bits: + /// - 0: ckb hash + /// - 1: sha256 + pub fn get_htlc_type(&self) -> u8 { + let offered_flag = if self.is_offered() { 0u8 } else { 1u8 }; + ((self.hash_algorithm as u8) << 1) + offered_flag + } + + fn get_hash(&self) -> ShortHash { + self.payment_hash.as_ref()[..20] + .try_into() + .expect("short hash from payment hash") + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct RemoveTlcInfo { + pub channel_id: Hash256, + pub tlc_id: TLCId, + pub reason: RemoveTlcReason, +} + +impl TlcKind { + pub fn tlc_id_u64(&self) -> u64 { + match self { + TlcKind::AddTlc(add_tlc) => add_tlc.tlc_id.into(), + TlcKind::RemoveTlc(remove_tlc) => remove_tlc.tlc_id.into(), + } + } + + pub fn tlc_id(&self) -> TLCId { + match self { + TlcKind::AddTlc(info) => info.tlc_id, + TlcKind::RemoveTlc(remove_tlc) => remove_tlc.tlc_id, + } + } + + pub fn is_offered(&self) -> bool { + self.tlc_id().is_offered() + } + + pub fn is_received(&self) -> bool { + !self.is_offered() + } + + pub fn flip_mut(&mut self) { + match self { + TlcKind::AddTlc(info) => info.tlc_id.flip_mut(), + TlcKind::RemoveTlc(_) => { + unreachable!("RemoveTlc should not flip") + } + } + } + + pub fn amount(&self) -> u128 { + match self { + TlcKind::AddTlc(add_tlc) => add_tlc.amount, + TlcKind::RemoveTlc(..) => { + unreachable!("RemoveTlc should not have amount") + } + } + } + + pub fn payment_hash(&self) -> Hash256 { + match self { + TlcKind::AddTlc(add_tlc) => add_tlc.payment_hash, + TlcKind::RemoveTlc(..) => { + unreachable!("RemoveTlc should not have payment hash") + } + } + } + + pub fn as_add_tlc(&self) -> &AddTlcInfo { + match self { + TlcKind::AddTlc(add_tlc) => &add_tlc, + TlcKind::RemoveTlc(..) => { + unreachable!("RemoveTlc should not be AddTlc") + } + } } } -impl TLCIds { +#[derive(Default, Clone, Debug, Serialize, Deserialize)] +pub struct PendingTlcs { + tlcs: Vec, + committed_index: usize, + next_tlc_id: u64, +} + +impl PendingTlcs { pub fn new() -> Self { Self { - offering: 0, - received: 0, + tlcs: Vec::new(), + committed_index: 0, + next_tlc_id: 0, } } + #[cfg(test)] + pub fn print(&self, prefix: &str) { + debug!( + "{} pending tlcs: {:?}, committed_index: {:?}", + prefix, + self.tlcs.iter().map(|t| t.log()).collect::>(), + self.committed_index + ); + } + + pub fn next_tlc_id(&self) -> u64 { + self.next_tlc_id + } + + pub fn increment_next_tlc_id(&mut self) { + self.next_tlc_id += 1; + } + + pub fn add_tlc_operation(&mut self, tlc_op: TlcKind) { + self.tlcs.push(tlc_op); + } + + pub fn get_staging_tlcs(&self) -> &[TlcKind] { + &self.tlcs[self.committed_index..] + } + + pub fn get_committed_tlcs(&self) -> &[TlcKind] { + &self.tlcs[..self.committed_index] + } + + pub fn get_committed_tlcs_mut(&mut self) -> &mut [TlcKind] { + &mut self.tlcs[..self.committed_index] + } + + pub fn tlcs(&self) -> &[TlcKind] { + &self.tlcs + } + + pub fn tlcs_mut(&mut self) -> &mut Vec { + &mut self.tlcs + } + + pub fn push(&mut self, tlc: TlcKind) { + assert!(!self.is_tlc_present(&tlc)); + self.tlcs.push(tlc); + } + + fn is_tlc_present(&self, tlc: &TlcKind) -> bool { + self.tlcs.iter().any(|t| match (t, tlc) { + (TlcKind::AddTlc(info1), TlcKind::AddTlc(info2)) => info1.tlc_id == info2.tlc_id, + (TlcKind::RemoveTlc(info1), TlcKind::RemoveTlc(info2)) => info1.tlc_id == info2.tlc_id, + _ => false, + }) + } + + pub fn commit_tlcs(&mut self, committed_tlcs: &[TlcKind]) -> Vec { + let staging_tlcs = self.get_staging_tlcs().to_vec(); + for tlc in committed_tlcs { + if !self.is_tlc_present(tlc) { + self.tlcs.push(tlc.clone()); + } + } + self.committed_index = self.tlcs.len(); + return staging_tlcs; + } + + pub fn get_mut(&mut self, tlc_id: &TLCId) -> Option<&mut AddTlcInfo> { + self.tlcs.iter_mut().find_map(|tlc| match tlc { + TlcKind::AddTlc(info) if info.tlc_id == *tlc_id => Some(info), + _ => None, + }) + } + + pub fn drop_remove_tlc(&mut self, tlc_id: &TLCId) { + //assert_eq!(self.committed_index, self.tlcs.len()); + self.tlcs.retain(|tlc| match tlc { + TlcKind::RemoveTlc(info) => info.tlc_id != *tlc_id, + _ => true, + }); + self.committed_index = self.tlcs.len(); + } + + pub fn shrink_removed_tlcs(&mut self) { + //assert_eq!(self.committed_index, self.tlcs.len()); + let new_committed_index = self + .get_committed_tlcs() + .iter() + .filter(|tlc| match tlc { + TlcKind::AddTlc(info) => info.removed_at.is_none(), + _ => true, + }) + .count(); + self.tlcs.retain(|tlc| match tlc { + TlcKind::AddTlc(info) => info.removed_at.is_none(), + _ => true, + }); + self.committed_index = new_committed_index; + } +} + +#[derive(Default, Clone, Debug, Serialize, Deserialize)] +pub struct TlcState { + local_pending_tlcs: PendingTlcs, + remote_pending_tlcs: PendingTlcs, + // if the tlc is pending to be removed, the reason will be stored here + // this will only used for retrying remove TLC + pending_remove_tlcs_map: BTreeMap, + waiting_ack: bool, +} + +impl TlcState { pub fn get_next_offering(&self) -> u64 { - self.offering + self.local_pending_tlcs.next_tlc_id() } pub fn get_next_received(&self) -> u64 { - self.received + self.remote_pending_tlcs.next_tlc_id() } pub fn increment_offering(&mut self) { - self.offering += 1; + self.local_pending_tlcs.increment_next_tlc_id(); } pub fn increment_received(&mut self) { - self.received += 1; + self.remote_pending_tlcs.increment_next_tlc_id(); } -} -#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Ord)] -pub enum TLCId { - Offered(u64), - Received(u64), -} + pub fn set_waiting_ack(&mut self, waiting_ack: bool) { + self.waiting_ack = waiting_ack; + } -impl From for u64 { - fn from(id: TLCId) -> u64 { + pub fn set_tlc_pending_remove(&mut self, tlc_id: TLCId, reason: RemoveTlcReason) { + self.pending_remove_tlcs_map.insert(tlc_id, reason); + } + + pub fn get_pending_remove(&self) -> Vec<(TLCId, RemoveTlcReason)> { + self.pending_remove_tlcs_map + .iter() + .map(|(tlc_id, reason)| (tlc_id.clone(), reason.clone())) + .collect() + } + + pub fn remove_pending_remove_tlc(&mut self, tlc_id: &TLCId) { + self.pending_remove_tlcs_map.remove(tlc_id); + } + + pub fn get(&self, id: &TLCId) -> Option<&AddTlcInfo> { match id { - TLCId::Offered(id) => id, - TLCId::Received(id) => id, + TLCId::Offered(_id) => { + self.local_pending_tlcs + .tlcs() + .iter() + .find_map(|tlc| match tlc { + TlcKind::AddTlc(info) if info.tlc_id == *id => Some(info), + _ => None, + }) + } + TLCId::Received(_id) => { + self.remote_pending_tlcs + .tlcs() + .iter() + .find_map(|tlc| match tlc { + TlcKind::AddTlc(info) if info.tlc_id == *id => Some(info), + _ => None, + }) + } } } -} -impl TLCId { - pub fn is_offered(&self) -> bool { - matches!(self, TLCId::Offered(_)) + pub fn get_mut(&mut self, id: &TLCId) -> Option<&mut AddTlcInfo> { + match id { + TLCId::Offered(_id) => { + self.local_pending_tlcs + .tlcs + .iter_mut() + .find_map(|tlc| match tlc { + TlcKind::AddTlc(info) if info.tlc_id == *id => Some(info), + _ => None, + }) + } + TLCId::Received(_id) => { + self.remote_pending_tlcs + .tlcs + .iter_mut() + .find_map(|tlc| match tlc { + TlcKind::AddTlc(info) if info.tlc_id == *id => Some(info), + _ => None, + }) + } + } } - pub fn is_received(&self) -> bool { - !self.is_offered() + pub fn add_local_tlc(&mut self, tlc_info: TlcKind) { + self.local_pending_tlcs.push(tlc_info); } - pub fn flip(&self) -> Self { - match self { - TLCId::Offered(id) => TLCId::Received(*id), - TLCId::Received(id) => TLCId::Offered(*id), + pub fn add_remote_tlc(&mut self, tlc_info: TlcKind) { + self.remote_pending_tlcs.push(tlc_info); + } + + pub fn next_local_tlc_id(&self) -> u64 { + self.local_pending_tlcs.next_tlc_id() + } + + pub fn increment_local_tlc_id(&mut self) { + self.local_pending_tlcs.increment_next_tlc_id(); + } + + pub fn next_remote_tlc_id(&self) -> u64 { + self.remote_pending_tlcs.next_tlc_id() + } + + pub fn increment_remote_tlc_id(&mut self) { + self.remote_pending_tlcs.increment_next_tlc_id(); + } + + fn unify_tlcs<'a>( + &self, + staging_tlcs: impl Iterator, + committed_tlcs: impl Iterator, + ) -> Vec { + let mut add_tlcs: BTreeMap = Default::default(); + let mut remove_tlcs = vec![]; + for tlc in committed_tlcs { + match tlc { + TlcKind::AddTlc(info) => { + if info.removed_at.is_none() { + let _ = add_tlcs.insert(tlc.tlc_id(), tlc.clone()); + } + } + TlcKind::RemoveTlc(..) => { + unreachable!("RemoveTlc should not be in committed tlcs") + } + } + } + for tlc in staging_tlcs { + match tlc { + TlcKind::AddTlc(_info) => { + // If the tlc is already in the committed tlcs, we should not add it again. + // committed tlcs always have the latest tlc information + if add_tlcs.contains_key(&tlc.tlc_id()) { + continue; + } + let _ = add_tlcs.insert(tlc.tlc_id(), tlc.clone()); + } + TlcKind::RemoveTlc(..) => remove_tlcs.push(tlc.clone()), + } + } + for tlc in remove_tlcs { + let tlc_id = tlc.tlc_id(); + let _ = add_tlcs.remove(&tlc_id); } + add_tlcs.values().map(|tlc| tlc.clone()).collect() } - pub fn flip_mut(&mut self) { - *self = self.flip(); + pub fn get_tlcs_for_local(&self) -> Vec { + self.unify_tlcs( + self.local_pending_tlcs.get_staging_tlcs().into_iter(), + self.local_pending_tlcs + .get_committed_tlcs() + .into_iter() + .chain(self.remote_pending_tlcs.get_committed_tlcs().into_iter()), + ) + } + + pub fn get_tlcs_for_remote(&self) -> Vec { + self.unify_tlcs( + self.remote_pending_tlcs.get_staging_tlcs().into_iter(), + self.remote_pending_tlcs + .get_committed_tlcs() + .into_iter() + .chain(self.local_pending_tlcs.get_committed_tlcs().into_iter()), + ) + } + + pub fn get_tlcs_with(&self, local_commitment: bool) -> Vec { + if local_commitment { + self.get_tlcs_for_local() + } else { + self.get_tlcs_for_remote() + } + } + + pub fn commit_local_tlcs(&mut self) -> Vec { + self.local_pending_tlcs + .commit_tlcs(self.remote_pending_tlcs.get_committed_tlcs()) + } + + pub fn commit_remote_tlcs(&mut self) -> Vec { + self.remote_pending_tlcs + .commit_tlcs(self.local_pending_tlcs.get_committed_tlcs()) + } + + fn filter_add_tlcs<'a, I>(tlcs: I) -> impl Iterator + where + I: Iterator, + { + tlcs.filter_map(|tlc| match tlc { + TlcKind::AddTlc(info) => { + if info.removed_at.is_some() { + None + } else { + Some((info.tlc_id, info)) + } + } + TlcKind::RemoveTlc(..) => None, + }) + .collect::>() + .into_iter() + .map(|(_, v)| v) + } + + pub fn all_commited_tlcs(&self) -> impl Iterator { + Self::filter_add_tlcs( + self.local_pending_tlcs + .get_committed_tlcs() + .into_iter() + .chain(self.remote_pending_tlcs.get_committed_tlcs().into_iter()), + ) + } + + pub fn all_tlcs(&self) -> impl Iterator { + Self::filter_add_tlcs( + self.local_pending_tlcs + .tlcs() + .into_iter() + .chain(self.remote_pending_tlcs.tlcs().into_iter()), + ) + } + + pub fn apply_tlc_remove( + &mut self, + tlc_id: TLCId, + removed_at: CommitmentNumbers, + reason: RemoveTlcReason, + ) { + // we don't consider RemoveTLC in the pending TLCS when build commitment signature + // so it's safe to remove them all from remote and local pending TLCS + self.local_pending_tlcs.drop_remove_tlc(&tlc_id); + self.remote_pending_tlcs.drop_remove_tlc(&tlc_id); + if let Some(tlc) = self.local_pending_tlcs.get_mut(&tlc_id) { + tlc.removed_at = Some((removed_at, reason.clone())); + } + if let Some(tlc) = self.remote_pending_tlcs.get_mut(&tlc_id) { + tlc.removed_at = Some((removed_at, reason)); + } + } + + pub fn shrink_removed_tlcs(&mut self, tlc_id: TLCId) { + // it's safe to remove multiple removed tlcs from pending TLCS, + // just make sure the two partners are operating on correct pending list, + // in other words, when one is remove from local TLCS, + // the peer should remove it from remote TLCS + if tlc_id.is_offered() { + self.local_pending_tlcs.shrink_removed_tlcs(); + } else { + self.remote_pending_tlcs.shrink_removed_tlcs(); + } } } @@ -2003,7 +2553,6 @@ pub struct ChannelActorState { pub is_acceptor: bool, // TODO: consider transaction fee while building the commitment transaction. - // The invariant here is that the sum of `to_local_amount` and `to_remote_amount` // should be equal to the total amount of the channel. // The changes of both `to_local_amount` and `to_remote_amount` @@ -2057,14 +2606,8 @@ pub struct ChannelActorState { // Below are fields that are only usable after the channel is funded, // (or at some point of the state). - // The id of next offering/received tlc, must increment by 1 for each new tlc. - pub tlc_ids: TLCIds, - - // BtreeMap of tlc ids to pending tlcs. - // serde_as is required for serde to json, as json requires keys to be strings. - // See https://stackoverflow.com/questions/51276896/how-do-i-use-serde-to-serialize-a-hashmap-with-structs-as-keys-to-json - #[serde_as(as = "Vec<(_, _)>")] - pub tlcs: BTreeMap, + // all the TLC related information + pub tlc_state: TlcState, // The remote and local lock script for close channel, they are setup during the channel establishment. #[serde_as(as = "Option")] @@ -2072,10 +2615,15 @@ pub struct ChannelActorState { #[serde_as(as = "EntityHex")] pub local_shutdown_script: Script, + // While building a CommitmentSigned message, we use a nonce sent by the counterparty + // to partially sign the commitment transaction. This nonce is also used while handling the revoke_and_ack + // message from the peer. We need to save this nonce because the counterparty may send other nonces during + // the period when our CommitmentSigned is sent and the counterparty's RevokeAndAck is received. #[serde_as(as = "Option")] - pub previous_remote_nonce: Option, - #[serde_as(as = "Option")] - pub remote_nonce: Option, + pub last_used_nonce_in_commitment_signed: Option, + + #[serde_as(as = "Vec")] + pub remote_nonces: Vec, // The latest commitment transaction we're holding #[serde_as(as = "Option")] @@ -2166,6 +2714,7 @@ pub enum ChannelEvent { FundingTransactionConfirmed(BlockNumber, u32), CommitmentTransactionConfirmed, ClosingTransactionConfirmed, + CheckTlcSetdown, } pub type ProcessingChannelResult = Result<(), ProcessingChannelError>; @@ -2186,6 +2735,8 @@ pub enum ProcessingChannelError { Musig2VerifyError(#[from] VerifyError), #[error("Musig2 SigningError: {0}")] Musig2SigningError(#[from] SigningError), + #[error("Unable to handle TLC command in waiting TLC ACK state")] + WaitingTlcAck, #[error("Failed to peel onion packet: {0}")] PeelingOnionPacketError(String), #[error("The amount in the HTLC is not expected")] @@ -2784,16 +3335,15 @@ impl ChannelActorState { commitment_delay_epoch, funding_fee_rate, id: channel_id, - tlc_ids: Default::default(), - tlcs: Default::default(), + tlc_state: Default::default(), local_shutdown_script: local_shutdown_script, local_channel_public_keys: local_base_pubkeys, signer, remote_channel_public_keys: Some(remote_pubkeys), commitment_numbers: Default::default(), remote_shutdown_script: Some(remote_shutdown_script), - previous_remote_nonce: None, - remote_nonce: Some(remote_nonce), + last_used_nonce_in_commitment_signed: None, + remote_nonces: vec![remote_nonce], remote_commitment_points: vec![first_commitment_point, second_commitment_point], local_shutdown_info: None, remote_shutdown_info: None, @@ -2846,15 +3396,14 @@ impl ChannelActorState { commitment_delay_epoch, funding_fee_rate, id: temp_channel_id, - tlc_ids: Default::default(), - tlcs: Default::default(), + tlc_state: Default::default(), signer, local_channel_public_keys: local_pubkeys, max_tlc_number_in_flight, max_tlc_value_in_flight, remote_channel_public_keys: None, - previous_remote_nonce: None, - remote_nonce: None, + last_used_nonce_in_commitment_signed: None, + remote_nonces: vec![], commitment_numbers: Default::default(), remote_commitment_points: vec![], local_shutdown_script: shutdown_script, @@ -3016,14 +3565,14 @@ impl ChannelActorState { } pub fn get_offered_tlc_balance(&self) -> u128 { - self.get_active_offered_tlcs(true) - .map(|tlc| tlc.tlc.amount) + self.get_all_offer_tlcs() + .map(|tlc| tlc.amount) .sum::() } pub fn get_received_tlc_balance(&self) -> u128 { - self.get_active_received_tlcs(false) - .map(|tlc| tlc.tlc.amount) + self.get_all_received_tlcs() + .map(|tlc| tlc.amount) .sum::() } @@ -3290,7 +3839,7 @@ impl ChannelActorState { .concat(), ); let local_nonce = self.get_local_nonce(); - let remote_nonce = self.get_previous_remote_nonce(); + let remote_nonce = self.get_remote_nonce(); let nonces = [local_nonce, remote_nonce]; let agg_nonce = AggNonce::sum(nonces); let sign_ctx = Musig2SignContext { @@ -3302,143 +3851,22 @@ impl ChannelActorState { let signature = sign_ctx.sign(message.as_slice()).expect("valid signature"); // Note that we must update channel state here to update commitment number, - // so that next step will obtain the correct commitmen point. - self.update_state_on_raa_msg(false); + // so that next step will obtain the correct commitment point. + self.increment_remote_commitment_number(); let point = self.get_current_local_commitment_point(); - - network - .send_message(NetworkActorMessage::new_command( - NetworkActorCommand::SendFiberMessage(FiberMessageWithPeerId::new( - self.get_remote_peer_id(), - FiberMessage::revoke_and_ack(RevokeAndAck { - channel_id: self.get_id(), - partial_signature: signature, - next_per_commitment_point: point, - }), - )), - )) - .expect(ASSUME_NETWORK_ACTOR_ALIVE); - } - - fn get_tlcs_for_settle_down(&self) -> Vec { - self.tlcs - .values() - .filter(|tlc| { - tlc.is_received() - && tlc.creation_confirmed_at.is_some() - && tlc.removed_at.is_none() - && tlc.tlc.is_last_hop() - }) - .cloned() - .collect() - } - - fn get_tlcs_for_forwarding(&self) -> Vec { - self.tlcs - .values() - .filter(|tlc| { - tlc.is_received() - && tlc.creation_confirmed_at.is_some() - && tlc.removed_at.is_none() - && tlc.tlc.previous_tlc.is_none() - && tlc.relay_status == TlcRelayStatus::WaitingForward - && !tlc.tlc.is_last_hop() - }) - .cloned() - .collect() - } - - fn get_tlcs_for_sending_remove_tlcs(&self) -> Vec { - self.tlcs - .values() - .filter(|tlc| { - tlc.is_offered() - && tlc.creation_confirmed_at.is_some() - && tlc.removed_at.is_some() - && tlc.tlc.previous_tlc.is_some() - && tlc.relay_status == TlcRelayStatus::WaitingRemove - }) - .cloned() - .collect() - } - - // After sending or receiving a RevokeAndAck message, all messages before - // are considered confirmed by both parties. These messages include - // AddTlc and RemoveTlc to operate on TLCs. - // Update state on revoke and ack message received on sent. - // This may fill in the creation_confirmed_at and removal_confirmed_at fields - // of the tlcs. And update the to_local_amount and to_remote_amount. - fn update_state_on_raa_msg(&mut self, is_received: bool) { - if is_received { - self.increment_local_commitment_number(); - } else { - self.increment_remote_commitment_number(); - } - - // If this revoke_and_ack message is received from the counterparty, - // then we should be operating on remote commitment numbers. - let commitment_numbers = self.get_current_commitment_numbers(); - - let (mut to_local_amount, mut to_remote_amount) = - (self.to_local_amount, self.to_remote_amount); - - debug!("Updating local state on revoke_and_ack message {}, current commitment number: {:?}, to_local_amount: {}, to_remote_amount: {}", - if is_received { "received" } else { "sent" }, commitment_numbers, to_local_amount, to_remote_amount); - - self.tlcs.values_mut().for_each(|tlc| { - if tlc.removal_confirmed_at.is_some() { - return; - } - - let amount = tlc.tlc.amount; - // This tlc has not been committed yet. - if tlc.creation_confirmed_at.is_none() { - debug!( - "Setting local_committed_at for tlc {:?} to commitment number {:?}", - tlc.tlc.id, commitment_numbers - ); - tlc.creation_confirmed_at = Some(commitment_numbers); - } - match (tlc.removed_at.clone(), tlc.removal_confirmed_at) { - (Some((_removed_at, reason)), None) => { - tlc.removal_confirmed_at = Some(commitment_numbers); - match reason { - RemoveTlcReason::RemoveTlcFulfill(_) => { - if tlc.is_offered(){ - to_local_amount -= amount; - to_remote_amount += amount; - } else { - to_local_amount += amount; - to_remote_amount -= amount; - }; - debug!( - "Updated local amount to {} and remote amount to {} by removing fulfilled tlc {:?} from channel {:?} with reason {:?}", - to_local_amount, to_remote_amount, tlc.tlc.id, self.id, reason - ); - }, - RemoveTlcReason::RemoveTlcFail(_) => { - debug!("Removing failed tlc {:?} from channel {:?} with reason {:?}", tlc.tlc.id, self.id, reason); - }, - }; - debug!( - "Setting removal_confirmed_at for tlc {:?} to commitment number {:?}", - tlc.tlc.id, commitment_numbers) - } - (Some((removed_at, reason)), Some(removal_confirmed_at)) => { - debug!( - "TLC {:?} is already removed with reason {:?} at commitment number {:?} and is confirmed at {:?}", - tlc.tlc.id, reason, removed_at, removal_confirmed_at - ); - } - _ => { - debug!("Ignoring processing TLC {:?} as it is not removed yet", tlc.tlc.id); - } - } - }); - self.to_local_amount = to_local_amount; - self.to_remote_amount = to_remote_amount; - debug!("Updated local state on revoke_and_ack message {}: current commitment number: {:?}, to_local_amount: {}, to_remote_amount: {}", - if is_received { "received" } else { "sent" }, commitment_numbers, to_local_amount, to_remote_amount); + + network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::SendFiberMessage(FiberMessageWithPeerId::new( + self.get_remote_peer_id(), + FiberMessage::revoke_and_ack(RevokeAndAck { + channel_id: self.get_id(), + partial_signature: signature, + next_per_commitment_point: point, + }), + )), + )) + .expect(ASSUME_NETWORK_ACTOR_ALIVE); } pub fn get_id(&self) -> Hash256 { @@ -3472,17 +3900,39 @@ impl ChannelActorState { } pub fn get_remote_nonce(&self) -> PubNonce { - self.remote_nonce - .as_ref() - .expect("remote nonce exists") - .clone() + let comitment_number = self.get_remote_commitment_number(); + debug!( + "Getting remote nonce: commitment number {}, current nonces: {:?}", + comitment_number, &self.remote_nonces + ); + self.remote_nonces[comitment_number as usize].clone() } - pub fn get_previous_remote_nonce(&self) -> PubNonce { - self.previous_remote_nonce - .as_ref() - .expect("previous remote nonce exists") - .clone() + fn save_remote_nonce(&mut self, nonce: PubNonce) { + debug!( + "Saving remote nonce: new nonce {:?}, current nonces {:?}", + &nonce, &self.remote_nonces + ); + self.remote_nonces.push(nonce); + } + + fn save_remote_nonce_for_raa(&mut self) { + let nonce = self.get_remote_nonce(); + debug!( + "Saving remote nonce used in commitment signed: {:?}", + &nonce + ); + self.last_used_nonce_in_commitment_signed = Some(nonce); + } + + fn take_remote_nonce_for_raa(&mut self) -> PubNonce { + debug!( + "Taking remote nonce used in commitment signed: {:?}", + &self.last_used_nonce_in_commitment_signed + ); + self.last_used_nonce_in_commitment_signed + .take() + .expect("set last_used_nonce_in_commitment_signed in commitment signed") } pub fn get_current_commitment_numbers(&self) -> CommitmentNumbers { @@ -3506,20 +3956,10 @@ impl ChannelActorState { } pub fn increment_local_commitment_number(&mut self) { - debug!( - "Incrementing local commitment number from {} to {}", - self.get_local_commitment_number(), - self.get_local_commitment_number() + 1 - ); self.commitment_numbers.increment_local(); } pub fn increment_remote_commitment_number(&mut self) { - debug!( - "Incrementing remote commitment number from {} to {}", - self.get_remote_commitment_number(), - self.get_remote_commitment_number() + 1 - ); self.commitment_numbers.increment_remote(); } @@ -3536,67 +3976,47 @@ impl ChannelActorState { } pub fn get_next_offering_tlc_id(&self) -> u64 { - self.tlc_ids.get_next_offering() + self.tlc_state.get_next_offering() } pub fn get_next_received_tlc_id(&self) -> u64 { - self.tlc_ids.get_next_received() + self.tlc_state.get_next_received() } pub fn increment_next_offered_tlc_id(&mut self) { - self.tlc_ids.increment_offering(); + self.tlc_state.increment_offering(); } pub fn increment_next_received_tlc_id(&mut self) { - self.tlc_ids.increment_received(); - } - - pub fn get_offered_tlc(&self, tlc_id: u64) -> Option<&DetailedTLCInfo> { - self.tlcs.get(&TLCId::Offered(tlc_id)) + self.tlc_state.increment_received(); } - pub fn get_received_tlc(&self, tlc_id: u64) -> Option<&DetailedTLCInfo> { - self.tlcs.get(&TLCId::Received(tlc_id)) + pub fn get_offered_tlc(&self, tlc_id: u64) -> Option<&AddTlcInfo> { + self.tlc_state.get(&TLCId::Offered(tlc_id)) } - pub fn set_received_tlc_forwarded(&mut self, tlc_id: u64) { - if let Some(tlc) = self.tlcs.get_mut(&TLCId::Received(tlc_id)) { - tlc.relay_status = TlcRelayStatus::WaitingRemove; - } + pub fn get_received_tlc(&self, tlc_id: u64) -> Option<&AddTlcInfo> { + self.tlc_state.get(&TLCId::Received(tlc_id)) } - pub fn set_offered_tlc_removed(&mut self, tlc_id: u64) { - if let Some(tlc) = self.tlcs.get_mut(&TLCId::Offered(tlc_id)) { - tlc.relay_status = TlcRelayStatus::Removed; + pub(crate) fn set_received_tlc_preimage(&mut self, tlc_id: u64, preimage: Option) { + if let Some(tlc) = self.tlc_state.get_mut(&TLCId::Received(tlc_id)) { + tlc.payment_preimage = preimage; } } - pub fn insert_tlc(&mut self, tlc: TLC) -> Result { + pub fn check_insert_tlc(&mut self, tlc: &AddTlcInfo) -> Result<(), ProcessingChannelError> { let payment_hash = tlc.payment_hash; if let Some(tlc) = self - .tlcs - .values() - .find(|tlc| tlc.tlc.payment_hash == payment_hash && tlc.removed_at.is_none()) + .tlc_state + .all_tlcs() + .find(|tlc| tlc.payment_hash == payment_hash) { return Err(ProcessingChannelError::InvalidParameter(format!( "Trying to insert tlc with duplicate payment hash {:?} with tlc {:?}", payment_hash, tlc ))); } - if let Some(current) = self.tlcs.get(&tlc.id) { - if current.tlc == tlc { - debug!( - "Repeated processing of AddTlcCommand with id {:?}: current tlc {:?}", - tlc.id, current, - ); - return Ok(current.clone()); - } else { - return Err(ProcessingChannelError::InvalidParameter(format!( - "Trying to insert different tlcs with identical id {:?}: current tlc {:?}, new tlc {:?}", - tlc.id, current, tlc - ))); - } - }; if tlc.amount == 0 { return Err(ProcessingChannelError::TlcAmountIsTooLow); } @@ -3610,7 +4030,7 @@ impl ChannelActorState { if sent_tlc_value + tlc.amount > self.to_local_amount { return Err(ProcessingChannelError::InvalidParameter(format!( "Adding tlc {:?} with amount {} exceeds local balance {}", - tlc.id, + tlc.tlc_id, tlc.amount, self.to_local_amount - sent_tlc_value ))); @@ -3625,7 +4045,7 @@ impl ChannelActorState { if received_tlc_value + tlc.amount > self.to_remote_amount { return Err(ProcessingChannelError::InvalidParameter(format!( "Adding tlc {:?} with amount {} exceeds remote balance {}", - tlc.id, + tlc.tlc_id, tlc.amount, self.to_remote_amount - received_tlc_value ))); @@ -3639,34 +4059,7 @@ impl ChannelActorState { self.to_remote_amount ); - let relay_status = if !tlc.is_last_hop() { - if tlc.is_received() { - TlcRelayStatus::WaitingForward - } else { - if tlc.previous_tlc.is_none() { - TlcRelayStatus::NoForward - } else { - TlcRelayStatus::WaitingRemove - } - } - } else { - TlcRelayStatus::NoForward - }; - let detailed_tlc = DetailedTLCInfo { - tlc: tlc.clone(), - created_at: self.get_current_commitment_numbers(), - creation_confirmed_at: None, - removed_at: None, - removal_confirmed_at: None, - relay_status, - }; - self.tlcs.insert(tlc.id, detailed_tlc.clone()); - if tlc.is_offered() { - self.increment_next_offered_tlc_id(); - } else { - self.increment_next_received_tlc_id(); - } - Ok(detailed_tlc) + Ok(()) } // Remove a tlc with a reason. If the tlc is removed, then the channel @@ -3676,10 +4069,9 @@ impl ChannelActorState { &mut self, tlc_id: TLCId, reason: &RemoveTlcReason, - ) -> Result { + ) -> Result { let removed_at = self.get_current_commitment_numbers(); - - let tlc = match self.tlcs.get_mut(&tlc_id) { + let tlc = match self.tlc_state.get(&tlc_id).cloned() { None => { return Err(ProcessingChannelError::InvalidParameter(format!( "Trying to remove non-existing tlc with id {:?}", @@ -3687,6 +4079,7 @@ impl ChannelActorState { ))) } Some(current) => { + let add_tlc = current.clone(); match ¤t.removed_at { Some((current_removed_at, current_remove_reason)) if current_remove_reason == reason && removed_at == *current_removed_at => @@ -3694,7 +4087,9 @@ impl ChannelActorState { debug!( "Skipping removing of tlc {:?} as it is already removed at {:?} with the same reason {:?}", tlc_id, removed_at, reason ); - return Ok(current.clone()); + return Err(ProcessingChannelError::RepeatedProcessing( + "TLC is already removed".to_string(), + )); } Some((current_remove_reason, current_removed_at)) => { return Err(ProcessingChannelError::InvalidParameter( @@ -3704,21 +4099,35 @@ impl ChannelActorState { None => { debug!( "Inserting remove reason {:?} at commitment number {:?} for tlc {:?} hash_algorithm: {:?}", - reason, removed_at, current, current.tlc.hash_algorithm + reason, removed_at, current, add_tlc.hash_algorithm ); if let RemoveTlcReason::RemoveTlcFulfill(fulfill) = reason { - let filled_payment_hash: Hash256 = current - .tlc - .hash_algorithm - .hash(fulfill.payment_preimage) - .into(); - if current.tlc.payment_hash != filled_payment_hash { + let filled_payment_hash: Hash256 = + add_tlc.hash_algorithm.hash(fulfill.payment_preimage).into(); + if current.payment_hash != filled_payment_hash { return Err(ProcessingChannelError::FinalIncorrectPreimage); } } } }; - current.removed_at = Some((removed_at, reason.clone())); + + // update balance according to the tlc + let (mut to_local_amount, mut to_remote_amount) = + (self.to_local_amount, self.to_remote_amount); + if add_tlc.is_offered() { + to_local_amount -= add_tlc.amount; + to_remote_amount += add_tlc.amount; + } else { + to_local_amount += add_tlc.amount; + to_remote_amount -= add_tlc.amount; + } + self.to_local_amount = to_local_amount; + self.to_remote_amount = to_remote_amount; + debug!("Updated local balance to {} and remote balance to {} by removing tlc {:?} with reason {:?}", + to_local_amount, to_remote_amount, tlc_id, reason); + + self.tlc_state + .apply_tlc_remove(tlc_id, removed_at, reason.clone()); current } }; @@ -3872,85 +4281,38 @@ impl ChannelActorState { AggNonce::sum(nonces) } - // The parameter `local_commitment` indicates whether we are building a local or remote - // commitment. This field is used in some edge cases where we need to know whether we are - // safe to include a TLC in the commitment transaction. - // For example, if A sends a AddTlc to B, then A immediately sends a CommitmentSigned to B, - // this CommitmentSigned message should be the commitment transaction that includes the TLC. - // Now imagine while A sends CommitmentSigned to B, B also sends a CommitmentSigned message to A, - // then to verify this CommitmentSigned message, A needs to determine whether to include - // the TLC in the commitment transaction. Because it is possible that B has not received the - // AddTlc message from A, so A should not include the TLC in the commitment transaction. - fn should_tlc_be_included_in_commitment_tx( - info: &DetailedTLCInfo, - local_commitment: bool, - ) -> bool { - let DetailedTLCInfo { - tlc, - creation_confirmed_at, - removed_at, - removal_confirmed_at, - .. - } = info; - { - let am_i_sending_add_tlc_message = { - if tlc.is_offered() { - local_commitment - } else { - !local_commitment - } - }; - let am_i_sending_remove_tlc_message = !am_i_sending_add_tlc_message; - match (removal_confirmed_at, removed_at, creation_confirmed_at) { - (Some(_), _, _) => { - debug!( - "Not including TLC {:?} to commitment transction as it is already removed", - tlc.id - ); - return false; - } - (_, Some(_), Some(_)) => { - debug!("Will only include TLC {:?} to commitment transaction if I am sending remove tlc message ({})", tlc.id,am_i_sending_remove_tlc_message); - return am_i_sending_remove_tlc_message; - } - (_, Some(_), None) => { - if info.is_fullfill_removed() { - panic!("TLC {:?} is fullfilled but not confirmed yet", info); - } - // This is a failed tlc, there is not `creation_confirmed_at` field means our peer does not inserted this tlc, - // and we should not include it in the commitment transaction. - return false; - } - (_, _, Some(n)) => { - debug!("Including TLC {:?} to commitment transaction because tlc confirmed at {:?}", tlc.id, n); - return true; - } - (None, None, None) => { - debug!("Will only include TLC {:?} to commitment transaction if I am sending add tlc message ({})", tlc.id, am_i_sending_add_tlc_message); - am_i_sending_add_tlc_message - } - } - } - } - pub fn get_active_received_tlcs( &self, local_commitment: bool, - ) -> impl Iterator { - self.tlcs.values().filter(move |info| { - Self::should_tlc_be_included_in_commitment_tx(info, local_commitment) - && !info.is_offered() - }) + ) -> impl Iterator { + self.tlc_state + .get_tlcs_with(local_commitment) + .into_iter() + .filter_map(|tlc| match tlc { + TlcKind::AddTlc(tlc) if tlc.is_received() => Some(tlc), + _ => None, + }) } pub fn get_active_offered_tlcs( &self, local_commitment: bool, - ) -> impl Iterator { - self.tlcs.values().filter(move |info| { - Self::should_tlc_be_included_in_commitment_tx(info, local_commitment) - && info.is_offered() - }) + ) -> impl Iterator { + self.tlc_state + .get_tlcs_with(local_commitment) + .into_iter() + .filter_map(|tlc| match tlc { + TlcKind::AddTlc(tlc) if tlc.is_offered() => Some(tlc), + _ => None, + }) + } + + pub fn get_all_received_tlcs(&self) -> impl Iterator { + self.tlc_state.all_tlcs().filter(|tlc| tlc.is_received()) + } + + pub fn get_all_offer_tlcs(&self) -> impl Iterator { + self.tlc_state.all_tlcs().filter(|tlc| tlc.is_offered()) } // Get the pubkeys for the tlc. Tlc pubkeys are the pubkeys held by each party @@ -3959,12 +4321,12 @@ impl ChannelActorState { // The offerer who offered this tlc will have the first pubkey, and the receiver // will have the second pubkey. // This tlc must have valid local_committed_at and remote_committed_at fields. - pub fn get_tlc_pubkeys(&self, tlc: &DetailedTLCInfo, local: bool) -> (Pubkey, Pubkey) { - let is_offered = tlc.tlc.is_offered(); + pub fn get_tlc_pubkeys(&self, tlc: &AddTlcInfo) -> (Pubkey, Pubkey) { + let is_offered = tlc.is_offered(); let CommitmentNumbers { local: local_commitment_number, remote: remote_commitment_number, - } = tlc.get_commitment_numbers(local); + } = tlc.get_commitment_numbers(); debug!( "Local commitment number: {}, remote commitment number: {}", local_commitment_number, remote_commitment_number @@ -3988,62 +4350,59 @@ impl ChannelActorState { pub fn get_active_received_tlc_with_pubkeys( &self, local: bool, - ) -> impl Iterator { - self.get_active_received_tlcs(local).map(move |tlc| { - let (k1, k2) = self.get_tlc_pubkeys(tlc, local); - (tlc, k1, k2) - }) + ) -> Vec<(AddTlcInfo, Pubkey, Pubkey)> { + self.get_active_received_tlcs(local) + .map(move |tlc| { + let (k1, k2) = self.get_tlc_pubkeys(&tlc); + (tlc, k1, k2) + }) + .collect() } pub fn get_active_offered_tlc_with_pubkeys( &self, local: bool, - ) -> impl Iterator { - self.get_active_offered_tlcs(local).map(move |tlc| { - let (k1, k2) = self.get_tlc_pubkeys(tlc, local); - (tlc, k1, k2) - }) + ) -> Vec<(AddTlcInfo, Pubkey, Pubkey)> { + self.get_active_offered_tlcs(local) + .map(move |tlc| { + let (k1, k2) = self.get_tlc_pubkeys(&tlc); + (tlc, k1, k2) + }) + .collect() } fn get_active_htlcs(&self, local: bool) -> Vec { // Build a sorted array of TLC so that both party can generate the same commitment transaction. let tlcs = { let (mut received_tlcs, mut offered_tlcs) = ( - self.get_active_received_tlc_with_pubkeys(local) - .map(|(tlc, local, remote)| (tlc.clone(), local, remote)) - .collect::>(), - self.get_active_offered_tlc_with_pubkeys(local) - .map(|(tlc, local, remote)| (tlc.clone(), local, remote)) - .collect::>(), + self.get_active_received_tlc_with_pubkeys(local), + self.get_active_offered_tlc_with_pubkeys(local), ); - debug!("Received tlcs: {:?}", &received_tlcs); - debug!("Offered tlcs: {:?}", &offered_tlcs); let (mut a, mut b) = if local { (received_tlcs, offered_tlcs) } else { for (tlc, _, _) in received_tlcs.iter_mut().chain(offered_tlcs.iter_mut()) { // Need to flip these fields for the counterparty. - tlc.tlc.flip_mut(); + tlc.flip_mut(); } (offered_tlcs, received_tlcs) }; - a.sort_by(|x, y| u64::from(x.0.tlc.id).cmp(&u64::from(y.0.tlc.id))); - b.sort_by(|x, y| u64::from(x.0.tlc.id).cmp(&u64::from(y.0.tlc.id))); + a.sort_by(|x, y| u64::from(x.0.tlc_id).cmp(&u64::from(y.0.tlc_id))); + b.sort_by(|x, y| u64::from(x.0.tlc_id).cmp(&u64::from(y.0.tlc_id))); [a, b].concat() }; - debug!("Sorted tlcs: {:?}", &tlcs); if tlcs.is_empty() { Vec::new() } else { let mut result = vec![tlcs.len() as u8]; for (tlc, local, remote) in tlcs { - result.extend_from_slice(&tlc.tlc.get_htlc_type().to_le_bytes()); - result.extend_from_slice(&tlc.tlc.amount.to_le_bytes()); - result.extend_from_slice(&tlc.tlc.get_hash()); + result.extend_from_slice(&tlc.get_htlc_type().to_le_bytes()); + result.extend_from_slice(&tlc.amount.to_le_bytes()); + result.extend_from_slice(&tlc.get_hash()); result.extend_from_slice(&local.serialize()); result.extend_from_slice(&remote.serialize()); result.extend_from_slice( - &Since::new(SinceType::Timestamp, tlc.tlc.expiry, false) + &Since::new(SinceType::Timestamp, tlc.expiry, false) .value() .to_le_bytes(), ); @@ -4053,11 +4412,9 @@ impl ChannelActorState { } fn any_tlc_pending(&self) -> bool { - self.tlcs.values().any(|tlc| { - tlc.creation_confirmed_at.is_none() - || tlc.removal_confirmed_at.is_none() - || tlc.removed_at.is_none() - }) + self.tlc_state + .all_tlcs() + .any(|tlc| tlc.removed_at.is_none()) } pub fn get_local_funding_pubkey(&self) -> &Pubkey { @@ -4119,7 +4476,42 @@ impl ChannelActorState { Ok(()) } - fn check_for_tlc_update(&self, add_tlc_amount: Option) -> ProcessingChannelResult { + // Check whether the reason is valid for removing the tlc. + fn check_remove_tlc_with_reason( + &self, + tlc_id: TLCId, + reason: &RemoveTlcReason, + ) -> ProcessingChannelResult { + if let Some(tlc) = self.tlc_state.get(&tlc_id) { + if tlc.removed_at.is_some() { + return Err(ProcessingChannelError::RepeatedProcessing( + "TLC is already removed".to_string(), + )); + } + if let RemoveTlcReason::RemoveTlcFulfill(fulfill) = reason { + let filled_payment_hash: Hash256 = + tlc.hash_algorithm.hash(fulfill.payment_preimage).into(); + if tlc.payment_hash != filled_payment_hash { + return Err(ProcessingChannelError::FinalIncorrectPreimage); + } + } + Ok(()) + } else { + return Err(ProcessingChannelError::InvalidParameter(format!( + "Trying to remove non-existing tlc with id {:?}", + tlc_id + ))); + } + } + + fn check_for_tlc_update( + &self, + add_tlc_amount: Option, + is_tlc_command_message: bool, + ) -> ProcessingChannelResult { + if is_tlc_command_message && self.tlc_state.waiting_ack { + return Err(ProcessingChannelError::WaitingTlcAck); + } match self.state { ChannelState::ChannelReady() => {} ChannelState::ShuttingDown(_) if add_tlc_amount.is_none() => {} @@ -4137,27 +4529,42 @@ impl ChannelActorState { } if let Some(add_amount) = add_tlc_amount { - let active_tls_number = self.get_active_offered_tlcs(true).count() - + self.get_active_received_tlcs(true).count(); - - if active_tls_number as u64 + 1 > self.max_tlc_number_in_flight { - return Err(ProcessingChannelError::TlcNumberExceedLimit); + self.check_tlc_limits(add_amount, true)?; + if is_tlc_command_message { + // TODO: this should be replaced by using the remote channel's max_tlc_number_in_flight and max_tlc_value_in_flight + self.check_tlc_limits(add_amount, false)?; } + } + Ok(()) + } - if self - .get_active_received_tlcs(true) - .chain(self.get_active_offered_tlcs(true)) - .fold(0_u128, |sum, tlc| sum + tlc.tlc.amount) - + add_amount - > self.max_tlc_value_in_flight - { - return Err(ProcessingChannelError::TlcValueInflightExceedLimit); - } + // TODO: need to use `local` to check the limits for the remote peer + fn check_tlc_limits( + &self, + add_amount: u128, + _local: bool, + ) -> Result<(), ProcessingChannelError> { + let active_tls_number = + self.get_all_offer_tlcs().count() + self.get_all_received_tlcs().count(); + + if active_tls_number as u64 + 1 > self.max_tlc_number_in_flight { + return Err(ProcessingChannelError::TlcNumberExceedLimit); + } + + if self + .get_all_offer_tlcs() + .chain(self.get_all_received_tlcs()) + .fold(0_u128, |sum, tlc| sum + tlc.amount) + + add_amount + > self.max_tlc_value_in_flight + { + return Err(ProcessingChannelError::TlcValueInflightExceedLimit); } + Ok(()) } - pub fn create_outbounding_tlc(&self, command: AddTlcCommand) -> TLC { + pub fn create_outbounding_tlc(&self, command: AddTlcCommand) -> TlcKind { // TODO: we are filling the user command with a new id here. // The advantage of this is that we don't need to burden the users to // provide a next id for each tlc. The disadvantage is that users may @@ -4170,50 +4577,42 @@ impl ChannelActorState { "Must not have the same id in pending offered tlcs" ); - TLC { - id: TLCId::Offered(id), + TlcKind::AddTlc(AddTlcInfo { + channel_id: self.get_id(), + tlc_id: TLCId::Offered(id), amount: command.amount, payment_hash: command.payment_hash, expiry: command.expiry, - payment_preimage: None, hash_algorithm: command.hash_algorithm, - peeled_onion_packet: command.peeled_onion_packet, + created_at: self.get_current_commitment_numbers(), + payment_preimage: None, + removed_at: None, + onion_packet: command.onion_packet, previous_tlc: command .previous_tlc .map(|(channel_id, tlc_id)| (channel_id, TLCId::Received(tlc_id))), - } + }) } pub fn create_inbounding_tlc( &self, message: AddTlc, - payment_preimage: Option, - peeled_onion_packet: Option, - ) -> Result { - if self.get_received_tlc(message.tlc_id).is_some() { - return Err(ProcessingChannelError::InvalidParameter(format!( - "Trying to add tlc with existing id {:?}", - message.tlc_id - ))); - } - if message.tlc_id != self.get_next_received_tlc_id() { - return Err(ProcessingChannelError::InvalidParameter(format!( - "Trying to add tlc with id {:?} while expecting id {:?}", - message.tlc_id, - self.get_next_received_tlc_id() - ))); - } - - Ok(TLC { - peeled_onion_packet, - id: TLCId::Received(message.tlc_id), + ) -> Result { + let tlc_info = AddTlcInfo { + tlc_id: TLCId::Received(message.tlc_id), + channel_id: self.get_id(), amount: message.amount, payment_hash: message.payment_hash, expiry: message.expiry, - payment_preimage, hash_algorithm: message.hash_algorithm, + // will be set when apply AddTlc operations after the signature is checked + onion_packet: message.onion_packet, + created_at: self.get_current_commitment_numbers(), + payment_preimage: None, + removed_at: None, previous_tlc: None, - }) + }; + Ok(tlc_info) } pub fn create_witness_for_funding_cell( @@ -4279,7 +4678,7 @@ impl ChannelActorState { debug!( "Will not shutdown the channel because we require all tlcs resolved and both parties sent the Shutdown message, current state: {:?}, pending tlcs: {:?}", &self.state, - &self.tlcs + &self.tlc_state.all_commited_tlcs().collect::>() ); return Ok(()); } @@ -4376,7 +4775,7 @@ impl ChannelActorState { self.to_remote_amount = accept_channel.funding_amount; self.remote_reserved_ckb_amount = accept_channel.reserved_ckb_amount; - self.remote_nonce = Some(accept_channel.next_local_nonce.clone()); + self.save_remote_nonce(accept_channel.next_local_nonce.clone()); let remote_pubkeys = (&accept_channel).into(); self.remote_channel_public_keys = Some(remote_pubkeys); self.remote_commitment_points = vec![ @@ -4576,7 +4975,7 @@ impl ChannelActorState { // can be broadcasted to the network if necessary let num = self.get_current_commitment_number(false); - debug!( + info!( "Successfully handled commitment signed message: {:?}, tx: {:?}", &commitment_signed, &tx ); @@ -4598,8 +4997,7 @@ impl ChannelActorState { self.get_remote_nonce(), &commitment_signed.next_local_nonce ); - self.previous_remote_nonce = self.remote_nonce.clone(); - self.remote_nonce = Some(commitment_signed.next_local_nonce); + self.save_remote_nonce(commitment_signed.next_local_nonce); self.latest_commitment_transaction = Some(tx.data()); match flags { CommitmentSignedFlags::SigningCommitment(flags) => { @@ -4625,6 +5023,23 @@ impl ChannelActorState { } } } + + if let ChannelState::SigningCommitment(flags) = self.state { + if !flags.contains(SigningCommitmentFlags::OUR_COMMITMENT_SIGNED_SENT) { + // TODO: maybe we should send our commitment_signed message here. + debug!("CommitmentSigned message received, but we haven't sent our commitment_signed message yet"); + // Notify outside observers. + network + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::CommitmentSignaturePending( + self.get_remote_peer_id(), + self.get_id(), + self.get_current_commitment_number(false), + ), + )) + .expect(ASSUME_NETWORK_ACTOR_ALIVE); + } + } Ok(()) } @@ -4855,11 +5270,6 @@ impl ChannelActorState { ] .concat(); - println!( - "handle_revoke_and_ack_message commitment_lock_script_args: {:?}", - commitment_lock_script_args - ); - let message = blake2b_256( [ output.as_slice(), @@ -4870,15 +5280,15 @@ impl ChannelActorState { ); let local_nonce = self.get_local_nonce(); - let remote_nonce = self.get_remote_nonce(); - let nonces = [remote_nonce, local_nonce]; + let remote_nonce = self.take_remote_nonce_for_raa(); + let nonces = [remote_nonce.clone(), local_nonce]; let agg_nonce = AggNonce::sum(nonces); let verify_ctx = Musig2VerifyContext { key_agg_ctx: key_agg_ctx.clone(), agg_nonce: agg_nonce.clone(), pubkey: *self.get_remote_funding_pubkey(), - pubnonce: self.get_remote_nonce(), + pubnonce: remote_nonce, }; let RevokeAndAck { @@ -4886,6 +5296,7 @@ impl ChannelActorState { partial_signature, next_per_commitment_point, } = revoke_and_ack; + verify_ctx.verify(partial_signature, message.as_slice())?; let sign_ctx: Musig2SignContext = Musig2SignContext { @@ -4902,8 +5313,15 @@ impl ChannelActorState { [partial_signature, signature2], )?; - self.update_state_on_raa_msg(true); + self.increment_local_commitment_number(); self.append_remote_commitment_point(next_per_commitment_point); + let staging_tlcs = self.tlc_state.commit_local_tlcs(); + for tlc in staging_tlcs { + if let TlcKind::RemoveTlc(remove_tlc) = tlc { + self.remove_tlc_with_reason(remove_tlc.tlc_id, &remove_tlc.reason)?; + } + } + self.tlc_state.set_waiting_ack(false); network .send_message(NetworkActorMessage::new_notification( @@ -4918,7 +5336,6 @@ impl ChannelActorState { ), )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); - Ok(()) } @@ -4943,11 +5360,9 @@ impl ChannelActorState { if acutal_local_commitment_number == expected_local_commitment_number { // resend AddTlc, RemoveTlc and CommitmentSigned messages if needed let mut need_resend_commitment_signed = false; - for info in self.tlcs.values() { + for info in self.tlc_state.all_tlcs() { if info.is_offered() { - if info.created_at.get_local() >= acutal_local_commitment_number - && info.creation_confirmed_at.is_none() - { + if info.created_at.get_local() >= acutal_local_commitment_number { // resend AddTlc message network .send_message(NetworkActorMessage::new_command( @@ -4956,16 +5371,12 @@ impl ChannelActorState { self.get_remote_peer_id(), FiberMessage::add_tlc(AddTlc { channel_id: self.get_id(), - tlc_id: info.tlc.get_id(), - amount: info.tlc.amount, - payment_hash: info.tlc.payment_hash, - expiry: info.tlc.expiry, - hash_algorithm: info.tlc.hash_algorithm, - onion_packet: info - .tlc - .peeled_onion_packet - .as_ref() - .and_then(|packet| packet.next.clone()), + tlc_id: info.tlc_id.into(), + amount: info.amount, + payment_hash: info.payment_hash, + expiry: info.expiry, + hash_algorithm: info.hash_algorithm, + onion_packet: info.onion_packet.clone(), }), ), ), @@ -4984,7 +5395,7 @@ impl ChannelActorState { self.get_remote_peer_id(), FiberMessage::remove_tlc(RemoveTlc { channel_id: self.get_id(), - tlc_id: info.tlc.get_id(), + tlc_id: info.tlc_id.into(), reason: remove_reason.clone(), }), ), @@ -5425,11 +5836,11 @@ impl ChannelActorState { fn build_settlement_transaction_outputs(&self, local: bool) -> ([CellOutput; 2], [Bytes; 2]) { let received_tlc_value = self .get_active_received_tlcs(local) - .map(|tlc| tlc.tlc.amount) + .map(|tlc| tlc.amount) .sum::(); let offered_tlc_value = self .get_active_offered_tlcs(local) - .map(|tlc| tlc.tlc.amount) + .map(|tlc| tlc.amount) .sum::(); let to_local_value = @@ -5590,6 +6001,7 @@ impl ChannelActorState { ] .concat(), ); + let commitment_tx_partial_signature = sign_ctx.sign(message.as_slice())?; Ok(PartiallySignedCommitmentTransaction { @@ -5684,17 +6096,6 @@ pub struct Musig2VerifyContext { pub pubnonce: PubNonce, } -impl From for Musig2VerifyContext { - fn from(value: Musig2SignContext) -> Self { - Musig2VerifyContext { - key_agg_ctx: value.key_agg_ctx, - agg_nonce: value.agg_nonce, - pubkey: value.seckey.pubkey(), - pubnonce: value.secnonce.public_nonce(), - } - } -} - impl Musig2VerifyContext { pub fn verify(&self, signature: PartialSignature, message: &[u8]) -> ProcessingChannelResult { let result = verify_partial( @@ -5727,20 +6128,22 @@ pub struct Musig2SignContext { impl Musig2SignContext { pub fn sign(self, message: &[u8]) -> Result { + let result = sign_partial( + &self.key_agg_ctx, + self.seckey, + self.secnonce.clone(), + &self.agg_nonce, + message, + ); debug!( - "Musig2 signing partial message {:?} with nonce {:?} (public nonce: {:?}), agg nonce {:?}", + "Musig2 signing partial message {:?} with nonce {:?} (public nonce: {:?}), agg nonce {:?}: result {:?}", hex::encode(message), self.secnonce, self.secnonce.public_nonce(), - &self.agg_nonce - ); - Ok(sign_partial( - &self.key_agg_ctx, - self.seckey, - self.secnonce, &self.agg_nonce, - message, - )?) + &result + ); + Ok(result?) } } @@ -5793,155 +6196,6 @@ impl From<&AcceptChannel> for ChannelBasePublicKeys { type ShortHash = [u8; 20]; -/// A tlc output. -#[serde_as] -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct TLC { - /// The id of a TLC. - pub id: TLCId, - /// The value as it appears in the commitment transaction - pub amount: u128, - /// The expiry timestamp in millisecond. - pub expiry: u64, - /// The hash of the preimage which unlocks this HTLC. - pub payment_hash: Hash256, - /// The preimage of the hash to be sent to the counterparty. - pub payment_preimage: Option, - /// Which hash algorithm is applied on the preimage - pub hash_algorithm: HashAlgorithm, - - /// For received TLC from a payment, this is the peeled onion packet for the current node. - pub peeled_onion_packet: Option, - - /// The previous tlc id if this tlc is a part of a multi-tlc payment. - /// Note: this is used to track the tlc chain for a multi-tlc payment, - /// we need to know previous when removing tlc backwardly. - /// - /// Node A ---------> Node B ------------> Node C ----------> Node D - /// tlc_1 <---> (tlc_1) (tlc_2) <---> (tlc_2) (tlc_3) <----> tlc_3 - /// ^^^^ ^^^^ - /// - pub previous_tlc: Option<(Hash256, TLCId)>, -} - -impl TLC { - pub fn is_offered(&self) -> bool { - self.id.is_offered() - } - - pub fn is_received(&self) -> bool { - !self.is_offered() - } - - // Whether this TLC is the last hop in a payment route - pub fn is_last_hop(&self) -> bool { - self.peeled_onion_packet - .as_ref() - .map(|packet| packet.is_last()) - // Consider this TLC as the last hop when there's no peeled onion packet - .unwrap_or(true) - } - - // Change this tlc to the opposite side. - pub fn flip_mut(&mut self) { - self.id.flip_mut() - } - - /// Get the value for the field `htlc_type` in commitment lock witness. - /// - Lowest 1 bit: 0 if the tlc is offered by the remote party, 1 otherwise. - /// - High 7 bits: - /// - 0: ckb hash - /// - 1: sha256 - pub fn get_htlc_type(&self) -> u8 { - let offered_flag = if self.is_offered() { 0u8 } else { 1u8 }; - ((self.hash_algorithm as u8) << 1) + offered_flag - } - - fn get_hash(&self) -> ShortHash { - self.payment_hash.as_ref()[..20] - .try_into() - .expect("short hash from payment hash") - } - - fn get_id(&self) -> u64 { - match self.id { - TLCId::Offered(id) => id, - TLCId::Received(id) => id, - } - } -} - -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -enum TlcRelayStatus { - NoForward, - WaitingForward, - WaitingRemove, - Removed, -} - -/// A tlc output in a commitment transaction, including both the tlc output -/// and the commitment_number that it first appeared (will appear) in the -/// commitment transaction. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct DetailedTLCInfo { - tlc: TLC, - // The commitment numbers of both parties when this tlc is created - // as the offerer sees it. - // TODO: There is a potential bug here. The commitment number of the - // receiver may have been updated by the time this tlc is included - // in a commitment of the offerer. Currently we assume that the commitment - // number of the receiver when the time this tlc is actually committed by - // the offerer is just the same as the commitment number of the receiver - // when the this tlc is created. - created_at: CommitmentNumbers, - // The commitment number of the party that received this tlc - // (also called receiver) when this tlc is first included in - // the commitment transaction of the receiver. - creation_confirmed_at: Option, - // The commitment number of the party that removed this tlc - // (only the receiver is allowed to remove) when the tlc is removed. - removed_at: Option<(CommitmentNumbers, RemoveTlcReason)>, - // The initial commitment number of the party (the offerer) that - // has confirmed the removal of this tlc. - removal_confirmed_at: Option, - // indicates the status of the tlc relaying. - relay_status: TlcRelayStatus, -} - -impl DetailedTLCInfo { - fn is_offered(&self) -> bool { - self.tlc.is_offered() - } - - fn is_received(&self) -> bool { - self.tlc.is_received() - } - - fn get_commitment_numbers(&self, local: bool) -> CommitmentNumbers { - let am_i_sending_the_tlc = { - if self.is_offered() { - local - } else { - !local - } - }; - if am_i_sending_the_tlc { - self.created_at - } else { - self.creation_confirmed_at - .expect("Commitment number is present") - } - } - - fn is_fullfill_removed(&self) -> bool { - if let Some((_, removed_reason)) = &self.removed_at { - matches!(removed_reason, RemoveTlcReason::RemoveTlcFulfill(_)) - } else { - false - } - } -} - pub fn get_tweak_by_commitment_point(commitment_point: &Pubkey) -> [u8; 32] { let mut hasher = new_blake2b(); hasher.update(&commitment_point.serialize()); diff --git a/src/fiber/history.rs b/src/fiber/history.rs index b954fccc..97635c54 100644 --- a/src/fiber/history.rs +++ b/src/fiber/history.rs @@ -212,6 +212,9 @@ impl InternalResult { | TlcErrorCode::InvalidOnionPayload => { self.fail_node(nodes, 1); } + TlcErrorCode::IncorrectOrUnknownPaymentDetails | TlcErrorCode::InvoiceExpired => { + need_to_retry = false; + } _ => { // we can not penalize our own node, the whole payment session need to retry debug!("first hop failed with error: {:?}", tlc_err); @@ -286,6 +289,10 @@ impl InternalResult { self.fail_range_pairs(nodes, 0, index - 1); } } + TlcErrorCode::IncorrectOrUnknownPaymentDetails | TlcErrorCode::InvoiceExpired => { + need_to_retry = false; + self.succeed_range_pairs(nodes, 0, index - 1); + } _ => { self.fail_node(nodes, index); } diff --git a/src/fiber/network.rs b/src/fiber/network.rs index 9e34da1e..8d6612e6 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -503,6 +503,8 @@ pub enum NetworkServiceEvent { ChannelCreated(PeerId, Hash256), // A outgoing channel is pending to be accepted. ChannelPendingToBeAccepted(PeerId, Hash256), + // A AddTlc peer message processed with failure + AddTlcFailed(PeerId, Hash256, TlcErr), // The channel is ready to use (with funding transaction confirmed // and both parties sent ChannelReady messages). ChannelReady(PeerId, Hash256, OutPoint), @@ -600,6 +602,9 @@ pub enum NetworkActorEvent { // A tlc remove message is received. (payment_hash, remove_tlc) TlcRemoveReceived(Hash256, RemoveTlcReason), + + // A payment need to retry + RetrySendPayment(Hash256), } #[derive(Copy, Clone, Debug)] @@ -1257,9 +1262,12 @@ where } NetworkActorEvent::TlcRemoveReceived(payment_hash, remove_tlc_reason) => { // When a node is restarted, RemoveTLC will also be resent if necessary - self.on_remove_tlc_event(state, payment_hash, remove_tlc_reason) + self.on_remove_tlc_event(myself, state, payment_hash, remove_tlc_reason) .await; } + NetworkActorEvent::RetrySendPayment(payment_hash) => { + let _ = self.try_payment_session(myself, state, payment_hash).await; + } } Ok(()) } @@ -1588,7 +1596,7 @@ where let _ = reply.send(signature); } NetworkActorCommand::SendPayment(payment_request, reply) => { - match self.on_send_payment(state, payment_request).await { + match self.on_send_payment(myself, state, payment_request).await { Ok(payment) => { let _ = reply.send(Ok(payment)); } @@ -2179,7 +2187,7 @@ where payment_hash: info.payment_hash, expiry: info.expiry, hash_algorithm: info.hash_algorithm, - peeled_onion_packet: Some(peeled_onion_packet), + onion_packet: peeled_onion_packet.next.clone(), previous_tlc, }, rpc_reply, @@ -2209,6 +2217,7 @@ where async fn on_remove_tlc_event( &self, + myself: ActorRef, state: &mut NetworkActorState, payment_hash: Hash256, reason: RemoveTlcReason, @@ -2231,7 +2240,9 @@ where .await .record_payment_fail(&payment_session, error_detail.clone()); if need_to_retry { - let res = self.try_payment_session(state, payment_session).await; + let res = self + .try_payment_session(myself, state, payment_session.payment_hash()) + .await; if res.is_err() { debug!("Failed to retry payment session: {:?}", res); } @@ -2363,7 +2374,7 @@ where // This is the error implies we send payment request to the first hop failed // graph or payment history need to update and then have a retry self.update_graph_with_tlc_fail(&error_detail).await; - let _ = self + let need_to_retry = self .network_graph .write() .await @@ -2373,7 +2384,7 @@ where error_detail.error_code_as_str() ); self.set_payment_fail_with_error(payment_session, &err); - return Err(Error::SendPaymentFirstHopError(err)); + return Err(Error::SendPaymentFirstHopError(err, need_to_retry)); } else { // This expected never to be happended, to be safe, we will set the payment session to failed let err = format!("Failed to send onion packet, got malioucious error message"); @@ -2396,13 +2407,17 @@ where async fn try_payment_session( &self, + myself: ActorRef, state: &mut NetworkActorState, - mut payment_session: PaymentSession, + payment_hash: Hash256, ) -> Result { + let Some(mut payment_session) = self.store.get_payment_session(payment_hash) else { + return Err(Error::InvalidParameter(payment_hash.to_string())); + }; + let payment_data = payment_session.request.clone(); - while payment_session.can_retry() { + if payment_session.can_retry() { payment_session.retried_times += 1; - let hops_info = self .build_payment_route(&mut payment_session, &payment_data) .await?; @@ -2412,27 +2427,49 @@ where .await { Ok(payment_session) => return Ok(payment_session), - Err(Error::SendPaymentFirstHopError(_)) => { - // we will retry the payment session - continue; + Err(Error::SendPaymentFirstHopError(err, need_retry)) => { + if need_retry { + // If this is the first hop error, like the WaitingTlcAck error, + // we will just retry later, return Ok here for letting endpoint user + // know payment session is created successfully + myself.send_after(Duration::from_millis(500), move || { + NetworkActorMessage::new_event(NetworkActorEvent::RetrySendPayment( + payment_hash, + )) + }); + return Ok(payment_session); + } else { + return Err(Error::SendPaymentError(err)); + } } Err(e) => { + // we will retry the payment session, + // but we need to retry later to let the actor to process failure, + // so that we can make different choice for later try + let payment_hash = payment_data.payment_hash; + myself.send_after(Duration::from_millis(500), move || { + NetworkActorMessage::new_event(NetworkActorEvent::RetrySendPayment( + payment_hash, + )) + }); + debug!("send payment error: {:?}", e); return Err(e); } } + } else { + let error = payment_session.last_error.clone().unwrap_or_else(|| { + format!( + "Failed to send payment session: {:?}, retried times: {}", + payment_data.payment_hash, payment_session.retried_times + ) + }); + return Err(Error::SendPaymentError(error)); } - - let error = payment_session.last_error.clone().unwrap_or_else(|| { - format!( - "Failed to send payment session: {:?}, retried times: {}", - payment_data.payment_hash, payment_session.retried_times - ) - }); - return Err(Error::SendPaymentError(error)); } async fn on_send_payment( &self, + myself: ActorRef, state: &mut NetworkActorState, payment_request: SendPaymentCommand, ) -> Result { @@ -2467,7 +2504,9 @@ where let payment_session = PaymentSession::new(payment_data, 5); self.store.insert_payment_session(payment_session.clone()); - let session = self.try_payment_session(state, payment_session).await?; + let session = self + .try_payment_session(myself, state, payment_session.payment_hash()) + .await?; return Ok(session.into()); } } diff --git a/src/fiber/tests/channel.rs b/src/fiber/tests/channel.rs index 7cd24792..a01925cc 100644 --- a/src/fiber/tests/channel.rs +++ b/src/fiber/tests/channel.rs @@ -1,9 +1,13 @@ +use crate::fiber::channel::{ + AddTlcInfo, CommitmentNumbers, RemoveTlcInfo, TLCId, TlcKind, TlcState, +}; use crate::fiber::config::MAX_PAYMENT_TLC_EXPIRY_LIMIT; use crate::fiber::graph::PaymentSessionStatus; use crate::fiber::network::SendPaymentCommand; use crate::fiber::tests::test_utils::{ - gen_rand_public_key, gen_sha256_hash, NetworkNodeConfigBuilder, + gen_rand_public_key, gen_sha256_hash, generate_seckey, NetworkNodeConfigBuilder, }; +use crate::fiber::types::{PaymentHopData, PeeledOnionPacket, TlcErrorCode}; use crate::{ ckb::contracts::{get_cell_deps, Contract}, fiber::{ @@ -22,12 +26,14 @@ use crate::{ now_timestamp_as_millis_u64, NetworkServiceEvent, }; use ckb_jsonrpc_types::Status; +use ckb_types::packed::OutPointBuilder; use ckb_types::{ core::{FeeRate, TransactionView}, packed::{CellInput, Script, Transaction}, prelude::{AsTransactionBuilder, Builder, Entity, IntoTransactionView, Pack, Unpack}, }; use ractor::call; +use secp256k1::Secp256k1; use std::collections::HashSet; use super::test_utils::{init_tracing, NetworkNode}; @@ -54,6 +60,216 @@ fn test_derive_private_and_public_tlc_keys() { assert_eq!(derived_privkey.pubkey(), derived_pubkey); } +#[test] +fn test_pending_tlcs() { + let mut tlc_state = TlcState::default(); + let add_tlc1 = AddTlcInfo { + amount: 10000, + channel_id: gen_sha256_hash(), + payment_hash: gen_sha256_hash(), + expiry: now_timestamp_as_millis_u64() + 1000, + hash_algorithm: HashAlgorithm::Sha256, + onion_packet: None, + tlc_id: TLCId::Offered(0), + created_at: CommitmentNumbers::default(), + removed_at: None, + payment_preimage: None, + previous_tlc: None, + }; + let add_tlc2 = AddTlcInfo { + amount: 20000, + channel_id: gen_sha256_hash(), + payment_hash: gen_sha256_hash(), + expiry: now_timestamp_as_millis_u64() + 2000, + hash_algorithm: HashAlgorithm::Sha256, + onion_packet: None, + tlc_id: TLCId::Offered(1), + created_at: CommitmentNumbers::default(), + removed_at: None, + payment_preimage: None, + previous_tlc: None, + }; + tlc_state.add_local_tlc(TlcKind::AddTlc(add_tlc1.clone())); + tlc_state.add_local_tlc(TlcKind::AddTlc(add_tlc2.clone())); + + let mut tlc_state_2 = TlcState::default(); + tlc_state_2.add_remote_tlc(TlcKind::AddTlc(add_tlc1.clone())); + tlc_state_2.add_remote_tlc(TlcKind::AddTlc(add_tlc2.clone())); + + let tx1 = tlc_state.get_tlcs_for_local(); + let tx2 = tlc_state_2.get_tlcs_for_remote(); + + assert_eq!(tx1, tx2); + + let tlcs = tlc_state.commit_local_tlcs(); + assert_eq!(tlcs.len(), 2); + + let tlcs2 = tlc_state_2.commit_remote_tlcs(); + assert_eq!(tlcs2.len(), 2); + + assert_eq!(tx1, tx2); + + let tlcs = tlc_state.commit_local_tlcs(); + assert_eq!(tlcs.len(), 0); + + let tlcs2 = tlc_state_2.commit_remote_tlcs(); + assert_eq!(tlcs2.len(), 0); + + tlc_state_2.add_local_tlc(TlcKind::AddTlc(add_tlc1.clone())); + tlc_state_2.add_local_tlc(TlcKind::AddTlc(add_tlc2.clone())); + + tlc_state.add_remote_tlc(TlcKind::AddTlc(add_tlc1.clone())); + tlc_state.add_remote_tlc(TlcKind::AddTlc(add_tlc2.clone())); + + let tx1 = tlc_state.get_tlcs_for_remote(); + let tx2 = tlc_state_2.get_tlcs_for_local(); + + assert_eq!(tx1, tx2); + + let tlcs = tlc_state.commit_remote_tlcs(); + assert_eq!(tlcs.len(), 2); + let tlcs2 = tlc_state_2.commit_local_tlcs(); + assert_eq!(tlcs2.len(), 2); + + assert_eq!(tx1, tx2); + + let tlcs = tlc_state.commit_remote_tlcs(); + assert_eq!(tlcs.len(), 0); + let tlcs2 = tlc_state_2.commit_local_tlcs(); + assert_eq!(tlcs2.len(), 0); +} + +#[test] +fn test_pending_tlcs_duplicated_tlcs() { + let mut tlc_state = TlcState::default(); + let add_tlc1 = AddTlcInfo { + amount: 10000, + channel_id: gen_sha256_hash(), + payment_hash: gen_sha256_hash(), + expiry: now_timestamp_as_millis_u64() + 1000, + hash_algorithm: HashAlgorithm::Sha256, + onion_packet: None, + tlc_id: TLCId::Offered(0), + created_at: CommitmentNumbers::default(), + removed_at: None, + payment_preimage: None, + previous_tlc: None, + }; + tlc_state.add_local_tlc(TlcKind::AddTlc(add_tlc1.clone())); + + let mut tlc_state_2 = TlcState::default(); + tlc_state_2.add_remote_tlc(TlcKind::AddTlc(add_tlc1.clone())); + + let tx1 = tlc_state.get_tlcs_for_local(); + let tx2 = tlc_state_2.get_tlcs_for_remote(); + + assert_eq!(tx1, tx2); + + let tlcs = tlc_state.commit_local_tlcs(); + assert_eq!(tlcs.len(), 1); + + let tlcs2 = tlc_state_2.commit_remote_tlcs(); + assert_eq!(tlcs2.len(), 1); + + assert_eq!(tx1, tx2); + + let tlcs = tlc_state.commit_local_tlcs(); + assert_eq!(tlcs.len(), 0); + + let tlcs2 = tlc_state_2.commit_remote_tlcs(); + assert_eq!(tlcs2.len(), 0); + + let add_tlc2 = AddTlcInfo { + amount: 20000, + channel_id: gen_sha256_hash(), + payment_hash: gen_sha256_hash(), + expiry: now_timestamp_as_millis_u64() + 2000, + hash_algorithm: HashAlgorithm::Sha256, + onion_packet: None, + tlc_id: TLCId::Offered(1), + created_at: CommitmentNumbers::default(), + removed_at: None, + payment_preimage: None, + previous_tlc: None, + }; + + tlc_state_2.add_local_tlc(TlcKind::AddTlc(add_tlc2.clone())); + tlc_state.add_remote_tlc(TlcKind::AddTlc(add_tlc2.clone())); + + let tx1 = tlc_state.get_tlcs_for_remote(); + let tx2 = tlc_state_2.get_tlcs_for_local(); + + assert_eq!(tx1, tx2); + + let tlcs = tlc_state.commit_remote_tlcs(); + assert_eq!(tlcs.len(), 1); + let tlcs2 = tlc_state_2.commit_local_tlcs(); + assert_eq!(tlcs2.len(), 1); + assert_eq!(tx1, tx2); + + let tlcs = tlc_state.commit_remote_tlcs(); + assert_eq!(tlcs.len(), 0); + let tlcs2 = tlc_state_2.commit_local_tlcs(); + assert_eq!(tlcs2.len(), 0); + + let committed_tlcs1 = tlc_state.all_commited_tlcs().collect::>(); + let committed_tlcs2 = tlc_state_2.all_commited_tlcs().collect::>(); + assert_eq!(committed_tlcs1, committed_tlcs2); +} + +#[test] +fn test_pending_tlcs_with_remove_tlc() { + let mut tlc_state = TlcState::default(); + let add_tlc1 = AddTlcInfo { + amount: 10000, + channel_id: gen_sha256_hash(), + payment_hash: gen_sha256_hash(), + expiry: now_timestamp_as_millis_u64() + 1000, + hash_algorithm: HashAlgorithm::Sha256, + onion_packet: None, + tlc_id: TLCId::Offered(0), + created_at: CommitmentNumbers::default(), + removed_at: None, + payment_preimage: None, + previous_tlc: None, + }; + let add_tlc2 = AddTlcInfo { + amount: 20000, + channel_id: gen_sha256_hash(), + payment_hash: gen_sha256_hash(), + expiry: now_timestamp_as_millis_u64() + 2000, + hash_algorithm: HashAlgorithm::Sha256, + onion_packet: None, + tlc_id: TLCId::Offered(1), + created_at: CommitmentNumbers::default(), + removed_at: None, + payment_preimage: None, + previous_tlc: None, + }; + let remote_tlc = RemoveTlcInfo { + channel_id: gen_sha256_hash(), + tlc_id: TLCId::Offered(0), + reason: RemoveTlcReason::RemoveTlcFulfill(RemoveTlcFulfill { + payment_preimage: gen_sha256_hash(), + }), + }; + + tlc_state.add_local_tlc(TlcKind::AddTlc(add_tlc1.clone())); + tlc_state.add_local_tlc(TlcKind::AddTlc(add_tlc2.clone())); + tlc_state.add_local_tlc(TlcKind::RemoveTlc(remote_tlc.clone())); + + let tx1 = tlc_state.get_tlcs_for_local(); + assert_eq!(tx1.len(), 1); + let first = &tx1[0]; + assert_eq!(first.tlc_id(), TLCId::Offered(1)); + + let tx1 = tlc_state.commit_local_tlcs(); + assert_eq!(tx1.len(), 3); + + let all_tlcs: Vec<&AddTlcInfo> = tlc_state.all_commited_tlcs().collect(); + assert_eq!(all_tlcs.len(), 2); +} + #[tokio::test] async fn test_open_channel_to_peer() { let [node_a, mut node_b] = NetworkNode::new_n_interconnected_nodes().await; @@ -234,9 +450,11 @@ async fn test_public_channel_saved_to_the_other_nodes_graph() { let (_channel_id, funding_tx) = establish_channel_between_nodes( &mut node1, &mut node2, + true, node1_funding_amount, node2_funding_amount, - true, + None, + None, ) .await; let status = node3.submit_tx(funding_tx).await; @@ -274,9 +492,11 @@ async fn test_public_channel_with_unconfirmed_funding_tx() { let (_channel_id, _funding_tx) = establish_channel_between_nodes( &mut node1, &mut node2, + true, node1_funding_amount, node2_funding_amount, - true, + None, + None, ) .await; @@ -306,7 +526,7 @@ async fn test_network_send_payment_normal_keysend_workflow() { create_nodes_with_established_channel(node_a_funding_amount, node_b_funding_amount, true) .await; // Wait for the channel announcement to be broadcasted - tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await; let node_b_pubkey = node_b.pubkey.clone(); let message = |rpc_reply| -> NetworkActorMessage { @@ -346,6 +566,7 @@ async fn test_network_send_payment_normal_keysend_workflow() { .unwrap(); tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + assert_eq!(res.status, PaymentSessionStatus::Success); assert_eq!(res.failed_error, None); @@ -375,6 +596,324 @@ async fn test_network_send_payment_normal_keysend_workflow() { .unwrap(); // this is the payment_hash generated by keysend assert_eq!(res.status, PaymentSessionStatus::Inflight); + let payment_hash = res.payment_hash; + + tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await; + let message = |rpc_reply| -> NetworkActorMessage { + NetworkActorMessage::Command(NetworkActorCommand::GetPayment(payment_hash, rpc_reply)) + }; + let res = call!(node_a.network_actor, message) + .expect("node_a alive") + .unwrap(); + + assert_eq!(res.status, PaymentSessionStatus::Success); + assert_eq!(res.failed_error, None); +} + +#[tokio::test] +async fn test_network_send_payment_send_each_other() { + init_tracing(); + + let _span = tracing::info_span!("node", node = "test").entered(); + let node_a_funding_amount = 100000000000; + let node_b_funding_amount = 6200000000; + + let (node_a, node_b, _new_channel_id) = + create_nodes_with_established_channel(node_a_funding_amount, node_b_funding_amount, true) + .await; + // Wait for the channel announcement to be broadcasted + tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await; + + let node_a_pubkey = node_a.pubkey.clone(); + let node_b_pubkey = node_b.pubkey.clone(); + let message = |rpc_reply| -> NetworkActorMessage { + NetworkActorMessage::Command(NetworkActorCommand::SendPayment( + SendPaymentCommand { + target_pubkey: Some(node_b_pubkey), + amount: Some(10000), + payment_hash: None, + final_tlc_expiry_delta: None, + tlc_expiry_limit: None, + invoice: None, + timeout: None, + max_fee_amount: None, + max_parts: None, + keysend: Some(true), + udt_type_script: None, + allow_self_payment: false, + dry_run: false, + }, + rpc_reply, + )) + }; + let res1 = call!(node_a.network_actor, message) + .expect("node_a alive") + .unwrap(); + // this is the payment_hash generated by keysend + assert_eq!(res1.status, PaymentSessionStatus::Inflight); + let payment_hash1 = res1.payment_hash; + + // the second payment is send from node_b to node_a + let message = |rpc_reply| -> NetworkActorMessage { + NetworkActorMessage::Command(NetworkActorCommand::SendPayment( + SendPaymentCommand { + target_pubkey: Some(node_a_pubkey), + amount: Some(10000), + payment_hash: None, + final_tlc_expiry_delta: None, + invoice: None, + timeout: None, + tlc_expiry_limit: None, + max_fee_amount: None, + max_parts: None, + keysend: Some(true), + udt_type_script: None, + allow_self_payment: false, + dry_run: false, + }, + rpc_reply, + )) + }; + let res2 = call!(node_b.network_actor, message) + .expect("node_a alive") + .unwrap(); + // this is the payment_hash generated by keysend + assert_eq!(res2.status, PaymentSessionStatus::Inflight); + let payment_hash2 = res2.payment_hash; + + // sleep for 2 seconds to make sure the payment is processed + tokio::time::sleep(tokio::time::Duration::from_millis(6000)).await; + + let message = |rpc_reply| -> NetworkActorMessage { + NetworkActorMessage::Command(NetworkActorCommand::GetPayment(payment_hash1, rpc_reply)) + }; + let res = call!(node_a.network_actor, message) + .expect("node_a alive") + .unwrap(); + assert_eq!(res.status, PaymentSessionStatus::Success); + assert_eq!(res.failed_error, None); + + let message = |rpc_reply| -> NetworkActorMessage { + NetworkActorMessage::Command(NetworkActorCommand::GetPayment(payment_hash2, rpc_reply)) + }; + let res = call!(node_b.network_actor, message) + .expect("node_a alive") + .unwrap(); + + assert_eq!(res.status, PaymentSessionStatus::Success); + assert_eq!(res.failed_error, None); +} + +#[tokio::test] +async fn test_network_send_payment_send_with_ack() { + init_tracing(); + + let _span = tracing::info_span!("node", node = "test").entered(); + let node_a_funding_amount = 100000000000; + let node_b_funding_amount = 6200000000; + + let (node_a, node_b, _new_channel_id) = + create_nodes_with_established_channel(node_a_funding_amount, node_b_funding_amount, true) + .await; + // Wait for the channel announcement to be broadcasted + tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await; + + let node_b_pubkey = node_b.pubkey.clone(); + let message = |rpc_reply| -> NetworkActorMessage { + NetworkActorMessage::Command(NetworkActorCommand::SendPayment( + SendPaymentCommand { + target_pubkey: Some(node_b_pubkey), + amount: Some(10000), + payment_hash: None, + final_tlc_expiry_delta: None, + tlc_expiry_limit: None, + invoice: None, + timeout: None, + max_fee_amount: None, + max_parts: None, + keysend: Some(true), + udt_type_script: None, + allow_self_payment: false, + dry_run: false, + }, + rpc_reply, + )) + }; + let res1 = call!(node_a.network_actor, message) + .expect("node_a alive") + .unwrap(); + // this is the payment_hash generated by keysend + assert_eq!(res1.status, PaymentSessionStatus::Inflight); + let payment_hash1 = res1.payment_hash; + + // DON'T WAIT FOR A MOMENT, so the second payment will meet WaitingTlcAck first + // but payment session will handle this case + + // we can make the same payment again, since payment_hash will be generated randomly + let message = |rpc_reply| -> NetworkActorMessage { + NetworkActorMessage::Command(NetworkActorCommand::SendPayment( + SendPaymentCommand { + target_pubkey: Some(node_b_pubkey), + amount: Some(10000), + payment_hash: None, + final_tlc_expiry_delta: None, + invoice: None, + timeout: None, + tlc_expiry_limit: None, + max_fee_amount: None, + max_parts: None, + keysend: Some(true), + udt_type_script: None, + allow_self_payment: false, + dry_run: false, + }, + rpc_reply, + )) + }; + let res2 = call!(node_a.network_actor, message).expect("node_a alive"); + // the second send_payment will be blocked by WaitingTlcAck, since we didn't wait for a moment + assert!(res2.is_ok()); + let payment_hash2 = res2.unwrap().payment_hash; + + tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await; + let message = |rpc_reply| -> NetworkActorMessage { + NetworkActorMessage::Command(NetworkActorCommand::GetPayment(payment_hash1, rpc_reply)) + }; + let res = call!(node_a.network_actor, message) + .expect("node_a alive") + .unwrap(); + + assert_eq!(res.status, PaymentSessionStatus::Success); + assert_eq!(res.failed_error, None); + + let message = |rpc_reply| -> NetworkActorMessage { + NetworkActorMessage::Command(NetworkActorCommand::GetPayment(payment_hash2, rpc_reply)) + }; + let res = call!(node_a.network_actor, message) + .expect("node_a alive") + .unwrap(); + + assert_eq!(res.status, PaymentSessionStatus::Success); + assert_eq!(res.failed_error, None); +} + +#[tokio::test] +async fn test_network_send_previous_tlc_error() { + init_tracing(); + + let _span = tracing::info_span!("node", node = "test").entered(); + let node_a_funding_amount = 100000000000; + let node_b_funding_amount = 6200000000; + + let (node_a, mut node_b, new_channel_id) = + create_nodes_with_established_channel(node_a_funding_amount, node_b_funding_amount, true) + .await; + // Wait for the channel announcement to be broadcasted + tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await; + + let secp = Secp256k1::new(); + let keys: Vec = std::iter::repeat_with(|| generate_seckey().into()) + .take(1) + .collect(); + let payment_hash = [1; 32].into(); + let hops_infos = vec![ + PaymentHopData { + payment_hash, + amount: 2, + expiry: 3, + next_hop: Some(keys[0].pubkey().into()), + channel_outpoint: Some(OutPointBuilder::default().build().into()), + hash_algorithm: HashAlgorithm::Sha256, + payment_preimage: None, + }, + PaymentHopData { + payment_hash, + amount: 8, + expiry: 9, + next_hop: None, + channel_outpoint: Some(OutPointBuilder::default().build().into()), + hash_algorithm: HashAlgorithm::Sha256, + payment_preimage: None, + }, + ]; + let packet = PeeledOnionPacket::create(generate_seckey().into(), hops_infos.clone(), &secp) + .expect("create peeled packet"); + + // step1: try to send a invalid onion_packet with add_tlc + // ================================================================================== + let generated_payment_hash = gen_sha256_hash(); + let message = |rpc_reply| -> NetworkActorMessage { + NetworkActorMessage::Command(NetworkActorCommand::ControlFiberChannel( + ChannelCommandWithId { + channel_id: new_channel_id, + command: ChannelCommand::AddTlc( + AddTlcCommand { + amount: 10000, + payment_hash: generated_payment_hash, + expiry: DEFAULT_EXPIRY_DELTA + now_timestamp_as_millis_u64(), + hash_algorithm: HashAlgorithm::Sha256, + // invalid onion packet + onion_packet: packet.next.clone(), + previous_tlc: None, + }, + rpc_reply, + ), + }, + )) + }; + + let res = call!(node_a.network_actor, message).expect("node_a alive"); + assert!(res.is_ok()); + let node_b_peer_id = node_b.peer_id.clone(); + node_b + .expect_event(|event| match event { + NetworkServiceEvent::AddTlcFailed(peer_id, payment_hash, err) => { + assert_eq!(peer_id, &node_b_peer_id); + assert_eq!(payment_hash, &generated_payment_hash); + assert_eq!(err.error_code, TlcErrorCode::InvalidOnionPayload); + true + } + _ => false, + }) + .await; + // sleep 2 seconds to make sure node_b processed handle_add_tlc_peer_message + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + // XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX + + // step2: try to send the second valid payment, expect it to success + let message = |rpc_reply| -> NetworkActorMessage { + NetworkActorMessage::Command(NetworkActorCommand::SendPayment( + SendPaymentCommand { + target_pubkey: Some(node_b.pubkey), + amount: Some(10000), + payment_hash: None, + final_tlc_expiry_delta: None, + invoice: None, + timeout: None, + max_fee_amount: None, + max_parts: None, + keysend: Some(true), + udt_type_script: None, + allow_self_payment: false, + dry_run: false, + tlc_expiry_limit: None, + }, + rpc_reply, + )) + }; + + let res = call!(node_a.network_actor, message).expect("node_a alive"); + assert!(res.is_ok()); + let payment_hash = res.unwrap().payment_hash; + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + let message = |rpc_reply| -> NetworkActorMessage { + NetworkActorMessage::Command(NetworkActorCommand::GetPayment(payment_hash, rpc_reply)) + }; + let res = call!(node_a.network_actor, message).expect("node_a alive"); + assert!(res.is_ok()); + assert_eq!(res.unwrap().status, PaymentSessionStatus::Success); } #[tokio::test] @@ -639,19 +1178,15 @@ async fn test_network_send_payment_with_dry_run() { #[tokio::test] async fn test_send_payment_with_3_nodes() { init_tracing(); - let _span = tracing::info_span!("node", node = "test").entered(); - let (node_a, _node_b, node_c, _, _) = create_3_nodes_with_established_channel( (100000000000, 100000000000), (100000000000, 100000000000), true, ) .await; - // sleep for 2 seconds to make sure the channel is established tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await; - let node_c_pubkey = node_c.pubkey.clone(); let message = |rpc_reply| -> NetworkActorMessage { NetworkActorMessage::Command(NetworkActorCommand::SendPayment( @@ -678,7 +1213,6 @@ async fn test_send_payment_with_3_nodes() { let res = res.unwrap(); assert_eq!(res.status, PaymentSessionStatus::Inflight); assert!(res.fee > 0); - // sleep for 2 seconds to make sure the payment is sent tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; let message = |rpc_reply| -> NetworkActorMessage { @@ -694,7 +1228,6 @@ async fn test_send_payment_with_3_nodes() { #[tokio::test] async fn test_send_payment_fail_with_3_nodes_invalid_hash() { init_tracing(); - let _span = tracing::info_span!("node", node = "test").entered(); let (node_a, _node_b, node_c, _, _) = create_3_nodes_with_established_channel( @@ -703,10 +1236,8 @@ async fn test_send_payment_fail_with_3_nodes_invalid_hash() { true, ) .await; - // sleep for 2 seconds to make sure the channel is established tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await; - let node_c_pubkey = node_c.pubkey.clone(); let message = |rpc_reply| -> NetworkActorMessage { NetworkActorMessage::Command(NetworkActorCommand::SendPayment( @@ -733,7 +1264,6 @@ async fn test_send_payment_fail_with_3_nodes_invalid_hash() { let res = res.unwrap(); assert_eq!(res.status, PaymentSessionStatus::Inflight); assert!(res.fee > 0); - // sleep for 2 seconds to make sure the payment is sent tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await; let message = |rpc_reply| -> NetworkActorMessage { @@ -742,13 +1272,11 @@ async fn test_send_payment_fail_with_3_nodes_invalid_hash() { let res = call!(node_a.network_actor, message) .expect("node_a alive") .unwrap(); - - // FIXME: this should be failed after fixing the HTLC forwarding and removing issues - assert_eq!(res.status, PaymentSessionStatus::Inflight); - // assert_eq!( - // res.failed_error, - // Some("IncorrectOrUnknownPaymentDetails".to_string()) - // ); + assert_eq!(res.status, PaymentSessionStatus::Failed); + assert_eq!( + res.failed_error, + Some("IncorrectOrUnknownPaymentDetails".to_string()) + ); } #[tokio::test] @@ -813,7 +1341,6 @@ async fn test_network_send_payment_dry_run_can_still_query() { )) }; let res = call!(node_a.network_actor, message).expect("node_a alive"); - eprintln!("{:?}", res); assert!(res.is_ok()); } @@ -879,7 +1406,6 @@ async fn test_network_send_payment_dry_run_will_not_create_payment_session() { )) }; let res = call!(node_a.network_actor, message).expect("node_a alive"); - eprintln!("{:?}", res); assert!(res.is_ok()); } @@ -1007,7 +1533,7 @@ async fn do_test_channel_commitment_tx_after_add_tlc(algorithm: HashAlgorithm) { hash_algorithm: algorithm, payment_hash: digest.into(), expiry: now_timestamp_as_millis_u64() + DEFAULT_EXPIRY_DELTA, - peeled_onion_packet: None, + onion_packet: None, previous_tlc: None, }, rpc_reply, @@ -1099,9 +1625,11 @@ async fn test_channel_commitment_tx_after_add_tlc_sha256() { async fn establish_channel_between_nodes( node_a: &mut NetworkNode, node_b: &mut NetworkNode, + public: bool, node_a_funding_amount: u128, node_b_funding_amount: u128, - public: bool, + max_tlc_number_in_flight: Option, + max_tlc_value_in_flight: Option, ) -> (Hash256, TransactionView) { let message = |rpc_reply| { NetworkActorMessage::Command(NetworkActorCommand::OpenChannel( @@ -1118,8 +1646,8 @@ async fn establish_channel_between_nodes( tlc_min_value: None, tlc_max_value: None, tlc_fee_proportional_millionths: None, - max_tlc_number_in_flight: None, - max_tlc_value_in_flight: None, + max_tlc_number_in_flight, + max_tlc_value_in_flight, }, rpc_reply, )) @@ -1201,9 +1729,11 @@ async fn create_nodes_with_established_channel( let (channel_id, _funding_tx) = establish_channel_between_nodes( &mut node_a, &mut node_b, + public, node_a_funding_amount, node_b_funding_amount, - public, + None, + None, ) .await; @@ -1220,9 +1750,11 @@ async fn create_3_nodes_with_established_channel( let (channel_id_ab, funding_tx_ab) = establish_channel_between_nodes( &mut node_a, &mut node_b, + public, channel_1_amount_a, channel_1_amount_b, - public, + None, + None, ) .await; @@ -1232,9 +1764,11 @@ async fn create_3_nodes_with_established_channel( let (channel_id_bc, funding_tx_bc) = establish_channel_between_nodes( &mut node_b, &mut node_c, + public, channel_2_amount_b, channel_2_amount_c, - public, + None, + None, ) .await; @@ -1268,7 +1802,7 @@ async fn do_test_remove_tlc_with_wrong_hash_algorithm( hash_algorithm: correct_algorithm, payment_hash: digest.into(), expiry: now_timestamp_as_millis_u64() + DEFAULT_EXPIRY_DELTA, - peeled_onion_packet: None, + onion_packet: None, previous_tlc: None, }, rpc_reply, @@ -1306,6 +1840,9 @@ async fn do_test_remove_tlc_with_wrong_hash_algorithm( dbg!("Sleeping for some time to wait for the RemoveTlc processed by both party"); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + let preimage = [2; 32]; + // create a new payment hash + let digest = correct_algorithm.hash(&preimage); let add_tlc_result = call!(node_a.network_actor, |rpc_reply| { NetworkActorMessage::Command(NetworkActorCommand::ControlFiberChannel( ChannelCommandWithId { @@ -1316,7 +1853,7 @@ async fn do_test_remove_tlc_with_wrong_hash_algorithm( hash_algorithm: wrong_algorithm, payment_hash: digest.into(), expiry: now_timestamp_as_millis_u64() + DEFAULT_EXPIRY_DELTA, - peeled_onion_packet: None, + onion_packet: None, previous_tlc: None, }, rpc_reply, @@ -1373,7 +1910,7 @@ async fn do_test_remove_tlc_with_expiry_error() { hash_algorithm: HashAlgorithm::CkbHash, payment_hash: digest.into(), expiry: now_timestamp_as_millis_u64() + 10, - peeled_onion_packet: None, + onion_packet: None, previous_tlc: None, }; @@ -1395,7 +1932,7 @@ async fn do_test_remove_tlc_with_expiry_error() { hash_algorithm: HashAlgorithm::CkbHash, payment_hash: digest.into(), expiry: now_timestamp_as_millis_u64() + MAX_PAYMENT_TLC_EXPIRY_LIMIT + 10, - peeled_onion_packet: None, + onion_packet: None, previous_tlc: None, }; @@ -1412,6 +1949,193 @@ async fn do_test_remove_tlc_with_expiry_error() { assert!(add_tlc_result.is_err()); } +#[tokio::test] +async fn do_test_add_tlc_duplicated() { + let node_a_funding_amount = 100000000000; + let node_b_funding_amount = 6200000000; + + let (node_a, _node_b, new_channel_id) = + create_nodes_with_established_channel(node_a_funding_amount, node_b_funding_amount, false) + .await; + + let preimage = [1; 32]; + let digest = HashAlgorithm::CkbHash.hash(&preimage); + let tlc_amount = 1000000000; + + for i in 1..=2 { + std::thread::sleep(std::time::Duration::from_millis(400)); + // add tlc command with expiry soon + let add_tlc_command = AddTlcCommand { + amount: tlc_amount, + hash_algorithm: HashAlgorithm::CkbHash, + payment_hash: digest.into(), + expiry: now_timestamp_as_millis_u64() + 10, + onion_packet: None, + previous_tlc: None, + }; + let add_tlc_result = call!(node_a.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::ControlFiberChannel( + ChannelCommandWithId { + channel_id: new_channel_id, + command: ChannelCommand::AddTlc(add_tlc_command, rpc_reply), + }, + )) + }) + .expect("node_b alive"); + if i == 1 { + assert!(add_tlc_result.is_ok()); + } + if i == 2 { + assert!(add_tlc_result.is_err()); + } + } +} + +#[tokio::test] +async fn do_test_add_tlc_waiting_ack() { + let node_a_funding_amount = 100000000000; + let node_b_funding_amount = 6200000000; + + let (node_a, _node_b, new_channel_id) = + create_nodes_with_established_channel(node_a_funding_amount, node_b_funding_amount, false) + .await; + + let tlc_amount = 1000000000; + + for i in 1..=2 { + std::thread::sleep(std::time::Duration::from_millis(400)); + let add_tlc_command = AddTlcCommand { + amount: tlc_amount, + hash_algorithm: HashAlgorithm::CkbHash, + payment_hash: gen_sha256_hash().into(), + expiry: now_timestamp_as_millis_u64() + 100000000, + onion_packet: None, + previous_tlc: None, + }; + let add_tlc_result = call!(node_a.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::ControlFiberChannel( + ChannelCommandWithId { + channel_id: new_channel_id, + command: ChannelCommand::AddTlc(add_tlc_command, rpc_reply), + }, + )) + }) + .expect("node_b alive"); + if i == 2 { + // we are sending AddTlc constantly, so we should get a TemporaryChannelFailure + assert!(add_tlc_result.is_err()); + let code = add_tlc_result.unwrap_err().decode().unwrap(); + assert_eq!(code.error_code, TlcErrorCode::TemporaryChannelFailure); + } else { + assert!(add_tlc_result.is_ok()); + } + } +} + +#[tokio::test] +async fn do_test_add_tlc_number_limit() { + let node_a_funding_amount = 100000000000; + let node_b_funding_amount = 6200000000; + + let [mut node_a, mut node_b] = NetworkNode::new_n_interconnected_nodes().await; + + let max_tlc_number = 3; + let (new_channel_id, _funding_tx) = establish_channel_between_nodes( + &mut node_a, + &mut node_b, + true, + node_a_funding_amount, + node_b_funding_amount, + Some(3), + None, + ) + .await; + + let tlc_amount = 1000000000; + + for i in 1..=max_tlc_number + 1 { + std::thread::sleep(std::time::Duration::from_millis(400)); + let add_tlc_command = AddTlcCommand { + amount: tlc_amount, + hash_algorithm: HashAlgorithm::CkbHash, + payment_hash: gen_sha256_hash().into(), + expiry: now_timestamp_as_millis_u64() + 100000000, + onion_packet: None, + previous_tlc: None, + }; + let add_tlc_result = call!(node_a.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::ControlFiberChannel( + ChannelCommandWithId { + channel_id: new_channel_id, + command: ChannelCommand::AddTlc(add_tlc_command, rpc_reply), + }, + )) + }) + .expect("node_b alive"); + tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; + if i == max_tlc_number + 1 { + assert!(add_tlc_result.is_err()); + let code = add_tlc_result.unwrap_err().decode().unwrap(); + assert_eq!(code.error_code, TlcErrorCode::TemporaryChannelFailure); + } else { + dbg!(&add_tlc_result); + assert!(add_tlc_result.is_ok()); + } + } +} + +#[tokio::test] +async fn do_test_add_tlc_value_limit() { + let node_a_funding_amount = 100000000000; + let node_b_funding_amount = 6200000000; + + let [mut node_a, mut node_b] = NetworkNode::new_n_interconnected_nodes().await; + + let max_tlc_number = 3; + let (new_channel_id, _funding_tx) = establish_channel_between_nodes( + &mut node_a, + &mut node_b, + true, + node_a_funding_amount, + node_b_funding_amount, + None, + Some(3000000000), + ) + .await; + + let tlc_amount = 1000000000; + + for i in 1..=max_tlc_number + 1 { + std::thread::sleep(std::time::Duration::from_millis(400)); + let add_tlc_command = AddTlcCommand { + amount: tlc_amount, + hash_algorithm: HashAlgorithm::CkbHash, + payment_hash: gen_sha256_hash().into(), + expiry: now_timestamp_as_millis_u64() + 100000000, + onion_packet: None, + previous_tlc: None, + }; + let add_tlc_result = call!(node_a.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::ControlFiberChannel( + ChannelCommandWithId { + channel_id: new_channel_id, + command: ChannelCommand::AddTlc(add_tlc_command, rpc_reply), + }, + )) + }) + .expect("node_b alive"); + // sleep for a while to make sure the AddTlc processed by both party + tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; + if i == max_tlc_number + 1 { + assert!(add_tlc_result.is_err()); + let code = add_tlc_result.unwrap_err().decode().unwrap(); + assert_eq!(code.error_code, TlcErrorCode::TemporaryChannelFailure); + } else { + assert!(add_tlc_result.is_ok()); + } + } +} + #[tokio::test] async fn test_remove_tlc_with_wrong_hash_algorithm() { let supported_algorithms = HashAlgorithm::supported_algorithms(); @@ -1447,7 +2171,7 @@ async fn do_test_channel_with_simple_update_operation(algorithm: HashAlgorithm) hash_algorithm: algorithm, payment_hash: digest.into(), expiry: now_timestamp_as_millis_u64() + DEFAULT_EXPIRY_DELTA, - peeled_onion_packet: None, + onion_packet: None, previous_tlc: None, }, rpc_reply, diff --git a/src/fiber/tests/graph.rs b/src/fiber/tests/graph.rs index 679c37ad..22a72f02 100644 --- a/src/fiber/tests/graph.rs +++ b/src/fiber/tests/graph.rs @@ -159,7 +159,6 @@ impl MockNetworkGraph { tlc_minimum_value: min_htlc_value.unwrap_or(0), channel_outpoint: channel_outpoint.clone(), }; - eprintln!("add rev channel_update: {:?}", channel_update); self.graph.process_channel_update(channel_update).unwrap(); } } @@ -580,7 +579,6 @@ fn test_graph_build_router_fee_rate_optimize() { allow_self_payment: false, dry_run: false, }); - eprintln!("route: {:?}", route); assert!(route.is_ok()); let route = route.unwrap(); let amounts = route.iter().map(|x| x.amount).collect::>(); diff --git a/src/fiber/tests/network.rs b/src/fiber/tests/network.rs index 970c80fc..a1266ac5 100644 --- a/src/fiber/tests/network.rs +++ b/src/fiber/tests/network.rs @@ -673,7 +673,6 @@ fn test_announcement_message_serialize() { channel_announcement.udt_type_script = Some(ScriptBuilder::default().build()); - eprintln!("channel_announcement: {:#?}", channel_announcement); let serialized = bincode::serialize(&channel_announcement).unwrap(); let deserialized: ChannelAnnouncement = bincode::deserialize(&serialized).unwrap(); assert_eq!(channel_announcement, deserialized); @@ -943,7 +942,6 @@ fn test_send_payment_validate_htlc_expiry_delta() { }; let result = SendPaymentData::new(send_command); - eprintln!("{:?}", result); assert!(result.is_err()); assert!(result .unwrap_err() diff --git a/src/fiber/types.rs b/src/fiber/types.rs index 03f8aa0b..dd088073 100644 --- a/src/fiber/types.rs +++ b/src/fiber/types.rs @@ -1030,7 +1030,7 @@ impl TryFrom for ClosingSigned { } } -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct AddTlc { pub channel_id: Hash256, pub tlc_id: u64, @@ -1240,6 +1240,7 @@ pub struct TlcErrPacket { impl TlcErrPacket { pub fn new(tlc_fail: TlcErr) -> Self { + dbg!(&tlc_fail); TlcErrPacket { onion_packet: tlc_fail.serialize(), } @@ -1389,7 +1390,7 @@ impl TryFrom for RemoveTlcReason { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct RemoveTlc { pub channel_id: Hash256, pub tlc_id: u64, diff --git a/src/rpc/channel.rs b/src/rpc/channel.rs index 816fb430..a260eccc 100644 --- a/src/rpc/channel.rs +++ b/src/rpc/channel.rs @@ -575,7 +575,7 @@ where payment_hash: params.payment_hash, expiry: params.expiry, hash_algorithm: params.hash_algorithm.unwrap_or_default(), - peeled_onion_packet: None, + onion_packet: None, previous_tlc: None, }, rpc_reply, diff --git a/src/store/tests/store.rs b/src/store/tests/store.rs index 6227d4ac..cb2d2e22 100644 --- a/src/store/tests/store.rs +++ b/src/store/tests/store.rs @@ -285,8 +285,7 @@ fn test_channel_actor_state_store() { commitment_delay_epoch: 100, funding_fee_rate: 100, id: gen_sha256_hash(), - tlc_ids: Default::default(), - tlcs: Default::default(), + tlc_state: Default::default(), local_shutdown_script: Script::default(), local_channel_public_keys: ChannelBasePublicKeys { funding_pubkey: generate_pubkey().into(), @@ -299,8 +298,8 @@ fn test_channel_actor_state_store() { }), commitment_numbers: Default::default(), remote_shutdown_script: Some(Script::default()), - previous_remote_nonce: Some(pub_nonce.clone()), - remote_nonce: Some(pub_nonce.clone()), + last_used_nonce_in_commitment_signed: None, + remote_nonces: vec![pub_nonce.clone()], remote_commitment_points: vec![generate_pubkey().into(), generate_pubkey().into()], local_shutdown_info: None, remote_shutdown_info: None, diff --git a/tests/bruno/e2e/3-nodes-transfer/14-node3-remove-tlc.bru b/tests/bruno/e2e/3-nodes-transfer/14-node3-remove-tlc.bru index b8225549..2aedc3ad 100644 --- a/tests/bruno/e2e/3-nodes-transfer/14-node3-remove-tlc.bru +++ b/tests/bruno/e2e/3-nodes-transfer/14-node3-remove-tlc.bru @@ -46,7 +46,7 @@ script:post-response { // Sleep for sometime to make sure current operation finishes before next request starts. await new Promise(r => setTimeout(r, 100)); console.log("remove tlc result: ", res.body); - if (!(res.body.error.message.includes("Illegally removing the same tlc"))) { + if (!(res.body.error.message.includes("TLC is already removed"))) { throw new Error("Assertion failed: error message is not right"); } diff --git a/tests/bruno/e2e/3-nodes-transfer/19-node3-remove-tlc.bru b/tests/bruno/e2e/3-nodes-transfer/19-node3-remove-tlc.bru index 62ef0b7c..2f186a0e 100644 --- a/tests/bruno/e2e/3-nodes-transfer/19-node3-remove-tlc.bru +++ b/tests/bruno/e2e/3-nodes-transfer/19-node3-remove-tlc.bru @@ -45,7 +45,7 @@ script:pre-request { script:post-response { // Sleep for sometime to make sure current operation finishes before next request starts. await new Promise(r => setTimeout(r, 100)); - if (!(res.body.error.message.includes("Illegally removing the same tlc"))) { + if (!(res.body.error.message.includes("TLC is already removed"))) { throw new Error("Assertion failed: error message is not right"); } } diff --git a/tests/bruno/e2e/router-pay/29-node1-pay-self-with-node2-succ.bru b/tests/bruno/e2e/router-pay/29-node1-pay-self-with-node2-succ.bru index 935715ab..6d698445 100644 --- a/tests/bruno/e2e/router-pay/29-node1-pay-self-with-node2-succ.bru +++ b/tests/bruno/e2e/router-pay/29-node1-pay-self-with-node2-succ.bru @@ -37,6 +37,6 @@ assert { script:post-response { // Sleep for sometime to make sure current operation finishes before next request starts. await new Promise(r => setTimeout(r, 100)); - console.log("31 step result: ", res.body); + console.log("29 step result: ", res.body); // for pay self router A -> B -> A, note A -> B and B -> A will use different channel } diff --git a/tests/bruno/e2e/udt/09-node2-remove-tlc.bru b/tests/bruno/e2e/udt/09-node2-remove-tlc.bru index 798323a2..0ebbea0e 100644 --- a/tests/bruno/e2e/udt/09-node2-remove-tlc.bru +++ b/tests/bruno/e2e/udt/09-node2-remove-tlc.bru @@ -45,7 +45,7 @@ script:pre-request { script:post-response { // Sleep for sometime to make sure current operation finishes before next request starts. await new Promise(r => setTimeout(r, 100)); - if (!(res.body.error.message.includes("Illegally removing the same tlc"))) { + if (!(res.body.error.message.includes("TLC is already removed"))) { throw new Error("Assertion failed: error message is not right"); } } diff --git a/tests/bruno/e2e/udt/14-node2-list-channel-expect-two-channel.bru b/tests/bruno/e2e/udt/14-node2-list-channel-expect-two-channel.bru index b84a84e8..343d82f1 100644 --- a/tests/bruno/e2e/udt/14-node2-list-channel-expect-two-channel.bru +++ b/tests/bruno/e2e/udt/14-node2-list-channel-expect-two-channel.bru @@ -37,7 +37,8 @@ script:post-response { // Sleep for sometime to make sure current operation finishes before next request starts. await new Promise(r => setTimeout(r, 2000)); console.log("accept channel result: ", res.body.result.channels); - if (res.body.result.channels.length != 2) { + const readyChannels = res.body.result.channels.filter(channel => channel.state.state_name === "CHANNEL_READY"); + if (readyChannels.length != 2) { throw new Error("Assertion failed: expect there are 2 channels"); } }