From 874dce5fee8a97ffa3e07dd7db466057fd293763 Mon Sep 17 00:00:00 2001 From: YI Date: Tue, 18 Jun 2024 16:04:22 +0800 Subject: [PATCH 1/8] Add function to obtain only active channels --- src/ckb/channel.rs | 28 ++++++++++++++++++++++++++++ src/ckb/network.rs | 2 +- src/rpc/channel.rs | 2 +- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/ckb/channel.rs b/src/ckb/channel.rs index a2cc154a..d64466a8 100644 --- a/src/ckb/channel.rs +++ b/src/ckb/channel.rs @@ -1584,6 +1584,12 @@ pub enum ChannelState { Closed, } +impl ChannelState { + fn is_closed(&self) -> bool { + matches!(self, ChannelState::Closed) + } +} + pub fn new_channel_id_from_seed(seed: &[u8]) -> Hash256 { blake2b_256(seed).into() } @@ -1803,6 +1809,10 @@ impl ChannelActorState { .as_micros() as u64 } + pub fn is_closed(&self) -> bool { + self.state.is_closed() + } + fn update_state(&mut self, new_state: ChannelState) { debug!( "Updating channel state from {:?} to {:?}", @@ -3947,7 +3957,25 @@ pub trait ChannelActorStateStore { fn insert_channel_actor_state(&self, state: ChannelActorState); fn delete_channel_actor_state(&self, id: &Hash256); fn get_channel_ids_by_peer(&self, peer_id: &PeerId) -> Vec; + fn get_active_channel_ids_by_peer(&self, peer_id: &PeerId) -> Vec { + self.get_channel_ids_by_peer(peer_id) + .into_iter() + .filter(|id| match self.get_channel_actor_state(&id) { + Some(state) if !state.is_closed() => true, + _ => false, + }) + .collect() + } fn get_channel_states(&self, peer_id: Option) -> Vec<(PeerId, Hash256, ChannelState)>; + fn get_active_channel_states( + &self, + peer_id: Option, + ) -> Vec<(PeerId, Hash256, ChannelState)> { + self.get_channel_states(peer_id) + .into_iter() + .filter(|(_, _, state)| !state.is_closed()) + .collect() + } } /// A wrapper on CommitmentTransaction that has a partial signature along with diff --git a/src/ckb/network.rs b/src/ckb/network.rs index 8b207ec1..8bd16ac9 100644 --- a/src/ckb/network.rs +++ b/src/ckb/network.rs @@ -1011,7 +1011,7 @@ impl NetworkActorState { ) { self.peer_session_map.insert(peer_id.clone(), session.id); - for channel_id in store.get_channel_ids_by_peer(peer_id) { + for channel_id in store.get_active_channel_ids_by_peer(&peer_id) { debug!("Reestablishing channel {:x}", &channel_id); if let Ok((channel, _)) = Actor::spawn_linked( Some(generate_channel_actor_name(&self.peer_id, peer_id)), diff --git a/src/rpc/channel.rs b/src/rpc/channel.rs index f9b2a1dc..63eed9c2 100644 --- a/src/rpc/channel.rs +++ b/src/rpc/channel.rs @@ -245,7 +245,7 @@ where ) -> Result { let mut channels: Vec<_> = self .store - .get_channel_states(params.peer_id) + .get_active_channel_states(params.peer_id) .into_iter() .filter_map(|(peer_id, channel_id, _state)| { self.store From e067f2507f15a1fe204bae4aeb2b0f8b4c8e9006 Mon Sep 17 00:00:00 2001 From: YI Date: Tue, 18 Jun 2024 16:15:11 +0800 Subject: [PATCH 2/8] Add option to keep closed channels --- src/ckb/channel.rs | 13 +++++++++++-- src/ckb/config.rs | 12 ++++++++++++ src/ckb/network.rs | 24 +++++++++++++++++++++--- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/src/ckb/channel.rs b/src/ckb/channel.rs index d64466a8..c331c9ee 100644 --- a/src/ckb/channel.rs +++ b/src/ckb/channel.rs @@ -185,14 +185,21 @@ pub struct ChannelActor { peer_id: PeerId, network: ActorRef, store: S, + keep_on_closed: bool, } impl ChannelActor { - pub fn new(peer_id: PeerId, network: ActorRef, store: S) -> Self { + pub fn new( + peer_id: PeerId, + network: ActorRef, + store: S, + keep_on_closed: bool, + ) -> Self { Self { peer_id, network, store, + keep_on_closed, } } @@ -1229,7 +1236,9 @@ where match state.state { ChannelState::Closed => { myself.stop(Some("ChannelClosed".to_string())); - self.store.delete_channel_actor_state(&state.get_id()); + if self.keep_on_closed { + self.store.delete_channel_actor_state(&state.get_id()); + } } _ => { self.store.insert_channel_actor_state(state.clone()); diff --git a/src/ckb/config.rs b/src/ckb/config.rs index 471f38e6..6154546c 100644 --- a/src/ckb/config.rs +++ b/src/ckb/config.rs @@ -83,6 +83,14 @@ pub struct CkbConfig { help = "whether to accept open channel requests with ckb funding amount automatically, unit: shannons [default: 6200000000 shannons], if this is set to zero, it means to disable auto accept" )] pub auto_accept_channel_ckb_funding_amount: Option, + /// whether to keep closed channels from store [default: false] + #[arg( + name = "CKB_KEEP_CLOSED_CHANNELS", + long = "ckb-keep-closed-channels", + env, + help = "whether to keep closed channels from store [default: false]" + )] + pub keep_closed_channels: Option, } impl CkbConfig { @@ -120,6 +128,10 @@ impl CkbConfig { self.auto_accept_channel_ckb_funding_amount .unwrap_or(DEFAULT_CHANNEL_MINIMAL_CKB_AMOUNT) } + + pub fn keep_closed_channels(&self) -> bool { + self.keep_closed_channels.unwrap_or(false) + } } // Basically ckb_sdk::types::NetworkType. But we added a `Mocknet` variant. diff --git a/src/ckb/network.rs b/src/ckb/network.rs index 8bd16ac9..67db2120 100644 --- a/src/ckb/network.rs +++ b/src/ckb/network.rs @@ -782,6 +782,8 @@ pub struct NetworkActorState { open_channel_auto_accept_min_ckb_funding_amount: u64, // Tha default amount of CKB to be funded when auto accepting a channel. auto_accept_channel_ckb_funding_amount: u64, + // If true, the network actor will keep closed channels in database. + keep_closed_channels: bool, } static CHANNEL_ACTOR_NAME_PREFIX: AtomicU64 = AtomicU64::new(0u64); @@ -836,7 +838,12 @@ impl NetworkActorState { let (tx, rx) = oneshot::channel::(); let channel = Actor::spawn_linked( Some(generate_channel_actor_name(&self.peer_id, &peer_id)), - ChannelActor::new(peer_id.clone(), network.clone(), store), + ChannelActor::new( + peer_id.clone(), + network.clone(), + store, + self.keep_closed_channels, + ), ChannelInitializationParameter::OpenChannel(OpenChannelParameter { funding_amount, seed, @@ -887,7 +894,12 @@ impl NetworkActorState { let (tx, rx) = oneshot::channel::(); let channel = Actor::spawn_linked( Some(generate_channel_actor_name(&self.peer_id, &peer_id)), - ChannelActor::new(peer_id.clone(), network.clone(), store), + ChannelActor::new( + peer_id.clone(), + network.clone(), + store, + self.keep_closed_channels, + ), ChannelInitializationParameter::AcceptChannel(AcceptChannelParameter { funding_amount, reserved_ckb_amount, @@ -1015,7 +1027,12 @@ impl NetworkActorState { debug!("Reestablishing channel {:x}", &channel_id); if let Ok((channel, _)) = Actor::spawn_linked( Some(generate_channel_actor_name(&self.peer_id, peer_id)), - ChannelActor::new(peer_id.clone(), self.network.clone(), store.clone()), + ChannelActor::new( + peer_id.clone(), + self.network.clone(), + store.clone(), + self.keep_closed_channels, + ), ChannelInitializationParameter::ReestablishChannel(channel_id), self.network.get_cell(), ) @@ -1300,6 +1317,7 @@ where open_channel_auto_accept_min_ckb_funding_amount: config .open_channel_auto_accept_min_ckb_funding_amount(), auto_accept_channel_ckb_funding_amount: config.auto_accept_channel_ckb_funding_amount(), + keep_closed_channels: config.keep_closed_channels(), }) } From dfb6ed49c3df89428304d54030cb63abd6eee32d Mon Sep 17 00:00:00 2001 From: YI Date: Tue, 18 Jun 2024 17:35:05 +0800 Subject: [PATCH 3/8] Wait for closing transaction before deleting channel --- src/ckb/channel.rs | 18 +++-- src/ckb/network.rs | 180 +++++++++++++++++++++++++++++++-------------- 2 files changed, 137 insertions(+), 61 deletions(-) diff --git a/src/ckb/channel.rs b/src/ckb/channel.rs index c331c9ee..db7a3efc 100644 --- a/src/ckb/channel.rs +++ b/src/ckb/channel.rs @@ -188,7 +188,7 @@ pub struct ChannelActor { keep_on_closed: bool, } -impl ChannelActor { +impl ChannelActor { pub fn new( peer_id: PeerId, network: ActorRef, @@ -880,6 +880,12 @@ impl ChannelActor { ChannelEvent::PeerDisconnected => { myself.stop(Some("PeerDisconnected".to_string())); } + ChannelEvent::ClosingTransactionConfirmed => { + myself.stop(Some("ChannelClosed".to_string())); + if !self.keep_on_closed { + self.store.delete_channel_actor_state(&state.get_id()); + } + } } Ok(()) } @@ -1235,10 +1241,9 @@ where } match state.state { ChannelState::Closed => { - myself.stop(Some("ChannelClosed".to_string())); - if self.keep_on_closed { - self.store.delete_channel_actor_state(&state.get_id()); - } + debug!( + "The channel is closed, waiting for the closing transaction to be confirmed." + ); } _ => { self.store.insert_channel_actor_state(state.clone()); @@ -1474,8 +1479,9 @@ pub struct ClosedChannel {} #[derive(Debug)] pub enum ChannelEvent { - FundingTransactionConfirmed, PeerDisconnected, + FundingTransactionConfirmed, + ClosingTransactionConfirmed, } pub type ProcessingChannelResult = Result<(), ProcessingChannelError>; diff --git a/src/ckb/network.rs b/src/ckb/network.rs index 67db2120..8cd5e961 100644 --- a/src/ckb/network.rs +++ b/src/ckb/network.rs @@ -1,10 +1,10 @@ use crate::{debug, error, info, warn}; use ckb_jsonrpc_types::Status; use ckb_types::core::TransactionView; -use ckb_types::packed::{OutPoint, Script, Transaction}; +use ckb_types::packed::{Byte32, OutPoint, Script, Transaction}; use ckb_types::prelude::{IntoTransactionView, Pack, Unpack}; use ractor::{ - async_trait as rasync_trait, call_t, Actor, ActorCell, ActorProcessingErr, ActorRef, + async_trait as rasync_trait, call_t, Actor, ActorCell, ActorProcessingErr, ActorRef, RactorErr, RpcReplyPort, SupervisionEvent, }; use std::collections::{HashMap, HashSet}; @@ -203,6 +203,12 @@ pub enum NetworkActorEvent { /// A commitment transaction is signed by us and has sent to the other party. LocalCommitmentSigned(PeerId, Hash256, u64, TransactionView, Vec), + /// A closing transaction has been confirmed. + ClosingTransactionConfirmed(PeerId, Hash256, Byte32), + + /// A closing transaction has failed (either because of invalid transaction or timeout) + ClosingTransactionFailed(PeerId, Hash256, Byte32), + /// Network service events to be sent to outside observers. /// These events may be both present at `NetworkActorEvent` and /// this branch of `NetworkActorEvent`. This is because some events @@ -446,7 +452,9 @@ where .expect(ASSUME_NETWORK_MYSELF_ALIVE); } NetworkActorEvent::ChannelClosed(channel_id, peer_id, tx) => { - state.on_channel_closed(&channel_id, &peer_id); + state + .on_channel_closed(channel_id, peer_id.clone(), tx.clone()) + .await; info!( "Channel ({:?}) to peer {:?} is already closed. Closing transaction {:?} can be broacasted now.", channel_id, peer_id, tx @@ -471,10 +479,15 @@ where myself .send_message(NetworkActorMessage::new_event( NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelClosed( - peer_id, channel_id, tx, + peer_id.clone(), + channel_id, + tx.clone(), )), )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); + state + .on_channel_closed(channel_id, peer_id.clone(), tx.clone()) + .await; } NetworkActorEvent::PeerMessage(peer_id, message) => { self.handle_peer_message(state, peer_id, message).await? @@ -487,8 +500,27 @@ where NetworkActorEvent::FundingTransactionConfirmed(outpoint) => { state.on_funding_transaction_confirmed(outpoint).await; } - NetworkActorEvent::FundingTransactionFailed(_outpoint) => { - unimplemented!("handling funding transaction failed"); + NetworkActorEvent::FundingTransactionFailed(outpoint) => { + error!("Funding transaction failed: {:?}", outpoint); + } + NetworkActorEvent::ClosingTransactionConfirmed(peer_id, channel_id, _tx_hash) => { + // TODO: We should remove the channel from the session_channels_map. + state.channels.remove(&channel_id); + if let Some(session) = state.get_peer_session(&peer_id) { + state.session_channels_map.get_mut(&session).map(|set| { + set.remove(&channel_id); + }); + } + state.send_message_to_channel_actor( + channel_id, + ChannelActorMessage::Event(ChannelEvent::ClosingTransactionConfirmed), + ) + } + NetworkActorEvent::ClosingTransactionFailed(peer_id, tx_hash, channel_id) => { + error!( + "Closing transaction failed for channel {:?}, tx hash: {:?}, peer id: {:?}", + &channel_id, &tx_hash, &peer_id + ); } NetworkActorEvent::LocalCommitmentSigned( peer_id, @@ -915,6 +947,44 @@ impl NetworkActorState { Ok((channel, temp_channel_id, new_id)) } + async fn broadcast_tx_with_callback(&self, transaction: TransactionView, callback: F) + where + F: Send + 'static + FnOnce(Result>) -> R, + { + debug!("Trying to broadcast transaction {:?}", &transaction); + let chain = self.chain_actor.clone(); + call_t!( + &chain, + CkbChainMessage::SendTx, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + transaction.clone() + ) + .expect(ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW) + .expect("valid tx to broadcast"); + + let tx_hash = transaction.hash(); + info!("Transactoin sent to the network: {}", tx_hash); + + // TODO: make number of confirmation to transaction configurable. + const NUM_CONFIRMATIONS: u64 = 4; + let request = TraceTxRequest { + tx_hash: tx_hash.clone().into(), + confirmations: NUM_CONFIRMATIONS, + }; + + // Spawn a new task to avoid blocking current actor message processing. + ractor::concurrency::tokio_primatives::spawn(async move { + debug!("Tracing transaction status {:?}", &request.tx_hash); + let result = call_t!( + chain, + CkbChainMessage::TraceTx, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + request.clone() + ); + callback(result); + }); + } + fn get_peer_session(&self, peer_id: &PeerId) -> Option { self.peer_session_map.get(peer_id).cloned() } @@ -1072,13 +1142,45 @@ impl NetworkActorState { } } - fn on_channel_closed(&mut self, id: &Hash256, peer_id: &PeerId) { - self.channels.remove(id); - if let Some(session) = self.get_peer_session(peer_id) { - if let Some(set) = self.session_channels_map.get_mut(&session) { - set.remove(id); + async fn on_channel_closed( + &mut self, + channel_id: Hash256, + peer_id: PeerId, + transaction: TransactionView, + ) { + let tx_hash: Byte32 = transaction.hash(); + info!( + "Channel ({:?}) to peer {:?} is closed. Broadcasting closing transaction ({:?}) now.", + &channel_id, &peer_id, &tx_hash + ); + let network: ActorRef = self.network.clone(); + self.broadcast_tx_with_callback(transaction, move |result| { + let message = match result { + Ok(status) if status == Status::Committed => { + info!("Cloisng transaction {:?} confirmed", &tx_hash); + NetworkActorEvent::ClosingTransactionConfirmed( + peer_id, + channel_id, + tx_hash.into(), + ) + } + Ok(status) => { + error!( + "Closing transaction {:?} failed to be confirmed with final status {:?}", + &tx_hash, &status + ); + NetworkActorEvent::ClosingTransactionFailed(peer_id, channel_id, tx_hash.into()) + } + Err(err) => { + error!("Failed to trace transaction {:?}: {:?}", &tx_hash, &err); + NetworkActorEvent::ClosingTransactionFailed(peer_id, channel_id, tx_hash.into()) + } }; - } + network + .send_message(NetworkActorMessage::new_event(message)) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + }) + .await; } pub async fn on_open_channel_msg( @@ -1140,61 +1242,28 @@ impl NetworkActorState { } self.pending_channels.insert(outpoint.clone(), channel_id); // TODO: try to broadcast the transaction to the network. + let transaction = transaction.into_view(); + let tx_hash = transaction.hash(); debug!( - "Funding transaction (outpoint {:?}) for channel {:?} is now ready. We can broadcast transaction {:?} now.", - &outpoint, &channel_id, &transaction + "Funding transaction (outpoint {:?}) for channel {:?} is now ready. Broadcast it {:?} now.", + &outpoint, &channel_id, &tx_hash ); - let transaction = transaction.into_view(); - debug!("Trying to broadcast funding transaction {:?}", &transaction); - - call_t!( - self.chain_actor, - CkbChainMessage::SendTx, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - transaction.clone() - ) - .expect(ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW) - .expect("valid funding tx"); - - let hash = transaction.hash(); - - info!("Funding transactoin sent to the network: {}", hash); - - // Trace the transaction status. - - // TODO: make number of confirmation to transaction configurable. - const NUM_CONFIRMATIONS: u64 = 4; - let request = TraceTxRequest { - tx_hash: hash, - confirmations: NUM_CONFIRMATIONS, - }; - let chain = self.chain_actor.clone(); let network = self.network.clone(); - // Spawn a new task to avoid blocking current actor message processing. - ractor::concurrency::tokio_primatives::spawn(async move { - debug!("Tracing transaction status {:?}", &request.tx_hash); - let message = match call_t!( - chain, - CkbChainMessage::TraceTx, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - request.clone() - ) { + self.broadcast_tx_with_callback(transaction, move |result| { + let message = match result { Ok(Status::Committed) => { - info!("Funding transaction {:?} confirmed", &request.tx_hash,); + info!("Funding transaction {:?} confirmed", &tx_hash); NetworkActorEvent::FundingTransactionConfirmed(outpoint) } Ok(status) => { error!( "Funding transaction {:?} failed to be confirmed with final status {:?}", - &request.tx_hash, &status + &tx_hash, &status ); NetworkActorEvent::FundingTransactionFailed(outpoint) } Err(err) => { - error!( - "Failed to trace transaction {:?}: {:?}", - &request.tx_hash, &err - ); + error!("Failed to trace transaction {:?}: {:?}", &tx_hash, &err); NetworkActorEvent::FundingTransactionFailed(outpoint) } }; @@ -1203,7 +1272,8 @@ impl NetworkActorState { network .send_message(NetworkActorMessage::new_event(message)) .expect(ASSUME_NETWORK_MYSELF_ALIVE); - }); + }) + .await; } async fn on_funding_transaction_confirmed(&mut self, outpoint: OutPoint) { From 1aed0c2bd7df135d29a30c9da33e0fc1de054f92 Mon Sep 17 00:00:00 2001 From: YI Date: Fri, 5 Jul 2024 00:16:39 +0800 Subject: [PATCH 4/8] Fix clippy --- src/ckb/channel.rs | 7 +++---- src/ckb/network.rs | 16 +++++++--------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/src/ckb/channel.rs b/src/ckb/channel.rs index db7a3efc..28dcddb7 100644 --- a/src/ckb/channel.rs +++ b/src/ckb/channel.rs @@ -3975,10 +3975,9 @@ pub trait ChannelActorStateStore { fn get_active_channel_ids_by_peer(&self, peer_id: &PeerId) -> Vec { self.get_channel_ids_by_peer(peer_id) .into_iter() - .filter(|id| match self.get_channel_actor_state(&id) { - Some(state) if !state.is_closed() => true, - _ => false, - }) + .filter( + |id| matches!(self.get_channel_actor_state(id), Some(state) if !state.is_closed()), + ) .collect() } fn get_channel_states(&self, peer_id: Option) -> Vec<(PeerId, Hash256, ChannelState)>; diff --git a/src/ckb/network.rs b/src/ckb/network.rs index 8cd5e961..a06b1932 100644 --- a/src/ckb/network.rs +++ b/src/ckb/network.rs @@ -507,9 +507,7 @@ where // TODO: We should remove the channel from the session_channels_map. state.channels.remove(&channel_id); if let Some(session) = state.get_peer_session(&peer_id) { - state.session_channels_map.get_mut(&session).map(|set| { - set.remove(&channel_id); - }); + if let Some(set) = state.session_channels_map.get_mut(&session) { set.remove(&channel_id); } } state.send_message_to_channel_actor( channel_id, @@ -968,7 +966,7 @@ impl NetworkActorState { // TODO: make number of confirmation to transaction configurable. const NUM_CONFIRMATIONS: u64 = 4; let request = TraceTxRequest { - tx_hash: tx_hash.clone().into(), + tx_hash: tx_hash.clone(), confirmations: NUM_CONFIRMATIONS, }; @@ -1093,7 +1091,7 @@ impl NetworkActorState { ) { self.peer_session_map.insert(peer_id.clone(), session.id); - for channel_id in store.get_active_channel_ids_by_peer(&peer_id) { + for channel_id in store.get_active_channel_ids_by_peer(peer_id) { debug!("Reestablishing channel {:x}", &channel_id); if let Ok((channel, _)) = Actor::spawn_linked( Some(generate_channel_actor_name(&self.peer_id, peer_id)), @@ -1156,12 +1154,12 @@ impl NetworkActorState { let network: ActorRef = self.network.clone(); self.broadcast_tx_with_callback(transaction, move |result| { let message = match result { - Ok(status) if status == Status::Committed => { + Ok(Status::Committed) => { info!("Cloisng transaction {:?} confirmed", &tx_hash); NetworkActorEvent::ClosingTransactionConfirmed( peer_id, channel_id, - tx_hash.into(), + tx_hash, ) } Ok(status) => { @@ -1169,11 +1167,11 @@ impl NetworkActorState { "Closing transaction {:?} failed to be confirmed with final status {:?}", &tx_hash, &status ); - NetworkActorEvent::ClosingTransactionFailed(peer_id, channel_id, tx_hash.into()) + NetworkActorEvent::ClosingTransactionFailed(peer_id, channel_id, tx_hash) } Err(err) => { error!("Failed to trace transaction {:?}: {:?}", &tx_hash, &err); - NetworkActorEvent::ClosingTransactionFailed(peer_id, channel_id, tx_hash.into()) + NetworkActorEvent::ClosingTransactionFailed(peer_id, channel_id, tx_hash) } }; network From c989118e2dd9e9eb29b9bf274f600300a608b896 Mon Sep 17 00:00:00 2001 From: YI Date: Fri, 5 Jul 2024 14:08:08 +0800 Subject: [PATCH 5/8] Remove channel keep_on_close option --- src/ckb/channel.rs | 12 +----------- src/ckb/config.rs | 12 ------------ src/ckb/network.rs | 34 +++++++--------------------------- 3 files changed, 8 insertions(+), 50 deletions(-) diff --git a/src/ckb/channel.rs b/src/ckb/channel.rs index 28dcddb7..b445f2c7 100644 --- a/src/ckb/channel.rs +++ b/src/ckb/channel.rs @@ -185,21 +185,14 @@ pub struct ChannelActor { peer_id: PeerId, network: ActorRef, store: S, - keep_on_closed: bool, } impl ChannelActor { - pub fn new( - peer_id: PeerId, - network: ActorRef, - store: S, - keep_on_closed: bool, - ) -> Self { + pub fn new(peer_id: PeerId, network: ActorRef, store: S) -> Self { Self { peer_id, network, store, - keep_on_closed, } } @@ -882,9 +875,6 @@ impl ChannelActor { } ChannelEvent::ClosingTransactionConfirmed => { myself.stop(Some("ChannelClosed".to_string())); - if !self.keep_on_closed { - self.store.delete_channel_actor_state(&state.get_id()); - } } } Ok(()) diff --git a/src/ckb/config.rs b/src/ckb/config.rs index 6154546c..471f38e6 100644 --- a/src/ckb/config.rs +++ b/src/ckb/config.rs @@ -83,14 +83,6 @@ pub struct CkbConfig { help = "whether to accept open channel requests with ckb funding amount automatically, unit: shannons [default: 6200000000 shannons], if this is set to zero, it means to disable auto accept" )] pub auto_accept_channel_ckb_funding_amount: Option, - /// whether to keep closed channels from store [default: false] - #[arg( - name = "CKB_KEEP_CLOSED_CHANNELS", - long = "ckb-keep-closed-channels", - env, - help = "whether to keep closed channels from store [default: false]" - )] - pub keep_closed_channels: Option, } impl CkbConfig { @@ -128,10 +120,6 @@ impl CkbConfig { self.auto_accept_channel_ckb_funding_amount .unwrap_or(DEFAULT_CHANNEL_MINIMAL_CKB_AMOUNT) } - - pub fn keep_closed_channels(&self) -> bool { - self.keep_closed_channels.unwrap_or(false) - } } // Basically ckb_sdk::types::NetworkType. But we added a `Mocknet` variant. diff --git a/src/ckb/network.rs b/src/ckb/network.rs index a06b1932..70487729 100644 --- a/src/ckb/network.rs +++ b/src/ckb/network.rs @@ -507,7 +507,9 @@ where // TODO: We should remove the channel from the session_channels_map. state.channels.remove(&channel_id); if let Some(session) = state.get_peer_session(&peer_id) { - if let Some(set) = state.session_channels_map.get_mut(&session) { set.remove(&channel_id); } + if let Some(set) = state.session_channels_map.get_mut(&session) { + set.remove(&channel_id); + } } state.send_message_to_channel_actor( channel_id, @@ -812,8 +814,6 @@ pub struct NetworkActorState { open_channel_auto_accept_min_ckb_funding_amount: u64, // Tha default amount of CKB to be funded when auto accepting a channel. auto_accept_channel_ckb_funding_amount: u64, - // If true, the network actor will keep closed channels in database. - keep_closed_channels: bool, } static CHANNEL_ACTOR_NAME_PREFIX: AtomicU64 = AtomicU64::new(0u64); @@ -868,12 +868,7 @@ impl NetworkActorState { let (tx, rx) = oneshot::channel::(); let channel = Actor::spawn_linked( Some(generate_channel_actor_name(&self.peer_id, &peer_id)), - ChannelActor::new( - peer_id.clone(), - network.clone(), - store, - self.keep_closed_channels, - ), + ChannelActor::new(peer_id.clone(), network.clone(), store), ChannelInitializationParameter::OpenChannel(OpenChannelParameter { funding_amount, seed, @@ -924,12 +919,7 @@ impl NetworkActorState { let (tx, rx) = oneshot::channel::(); let channel = Actor::spawn_linked( Some(generate_channel_actor_name(&self.peer_id, &peer_id)), - ChannelActor::new( - peer_id.clone(), - network.clone(), - store, - self.keep_closed_channels, - ), + ChannelActor::new(peer_id.clone(), network.clone(), store), ChannelInitializationParameter::AcceptChannel(AcceptChannelParameter { funding_amount, reserved_ckb_amount, @@ -1095,12 +1085,7 @@ impl NetworkActorState { debug!("Reestablishing channel {:x}", &channel_id); if let Ok((channel, _)) = Actor::spawn_linked( Some(generate_channel_actor_name(&self.peer_id, peer_id)), - ChannelActor::new( - peer_id.clone(), - self.network.clone(), - store.clone(), - self.keep_closed_channels, - ), + ChannelActor::new(peer_id.clone(), self.network.clone(), store.clone()), ChannelInitializationParameter::ReestablishChannel(channel_id), self.network.get_cell(), ) @@ -1156,11 +1141,7 @@ impl NetworkActorState { let message = match result { Ok(Status::Committed) => { info!("Cloisng transaction {:?} confirmed", &tx_hash); - NetworkActorEvent::ClosingTransactionConfirmed( - peer_id, - channel_id, - tx_hash, - ) + NetworkActorEvent::ClosingTransactionConfirmed(peer_id, channel_id, tx_hash) } Ok(status) => { error!( @@ -1385,7 +1366,6 @@ where open_channel_auto_accept_min_ckb_funding_amount: config .open_channel_auto_accept_min_ckb_funding_amount(), auto_accept_channel_ckb_funding_amount: config.auto_accept_channel_ckb_funding_amount(), - keep_closed_channels: config.keep_closed_channels(), }) } From 18be6361b0de4b73c9f7f74cc8455fa455d11dc4 Mon Sep 17 00:00:00 2001 From: YI Date: Mon, 8 Jul 2024 13:47:42 +0800 Subject: [PATCH 6/8] Fix repeatedly call on_channel_closed --- src/ckb/channel.rs | 8 +++++- src/ckb/network.rs | 62 ++++++++++++++++------------------------------ 2 files changed, 28 insertions(+), 42 deletions(-) diff --git a/src/ckb/channel.rs b/src/ckb/channel.rs index b445f2c7..1615a037 100644 --- a/src/ckb/channel.rs +++ b/src/ckb/channel.rs @@ -823,10 +823,12 @@ impl ChannelActor { ChannelCommand::Shutdown(command, reply) => { match self.handle_shutdown_command(state, command) { Ok(_) => { + debug!("Shutdown command processed successfully"); let _ = reply.send(Ok(())); Ok(()) } Err(err) => { + debug!("Error processing shutdown command: {:?}", &err); let _ = reply.send(Err(err.to_string())); Err(err) } @@ -2767,7 +2769,11 @@ impl ChannelActorState { network .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::ChannelClosed(self.get_id(), self.peer_id.clone(), tx), + NetworkActorEvent::ClosingTransactionPending( + self.get_id(), + self.peer_id.clone(), + tx, + ), )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); } diff --git a/src/ckb/network.rs b/src/ckb/network.rs index 70487729..1d1dfc5a 100644 --- a/src/ckb/network.rs +++ b/src/ckb/network.rs @@ -141,7 +141,7 @@ pub enum NetworkServiceEvent { ChannelPendingToBeAccepted(PeerId, Hash256), ChannelReady(PeerId, Hash256), ChannelShutDown(PeerId, Hash256), - ChannelClosed(PeerId, Hash256, TransactionView), + ClosingTransactionPending(PeerId, Hash256, TransactionView), // We should sign a commitment transaction and send it to the other party. CommitmentSignaturePending(PeerId, Hash256, u64), // We have signed a commitment transaction and sent it to the other party. @@ -189,7 +189,7 @@ pub enum NetworkActorEvent { /// A channel is being shutting down. ChannelShutdown(Hash256, PeerId), /// A channel is already closed. - ChannelClosed(Hash256, PeerId, TransactionView), + ClosingTransactionPending(Hash256, PeerId, TransactionView), /// Both parties are now able to broadcast a valid funding transaction. FundingTransactionPending(Transaction, OutPoint, Hash256), @@ -451,44 +451,6 @@ where )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); } - NetworkActorEvent::ChannelClosed(channel_id, peer_id, tx) => { - state - .on_channel_closed(channel_id, peer_id.clone(), tx.clone()) - .await; - info!( - "Channel ({:?}) to peer {:?} is already closed. Closing transaction {:?} can be broacasted now.", - channel_id, peer_id, tx - ); - match call_t!( - self.chain_actor, - CkbChainMessage::SendTx, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - tx.clone() - ) - .expect(ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW) - { - Ok(_) => { - info!("Closing transaction sent to the network: {:x}", tx.hash()); - } - Err(err) => { - error!("Failed to send closing transaction to the network: {}", err); - } - } - - // Notify outside observers. - myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelClosed( - peer_id.clone(), - channel_id, - tx.clone(), - )), - )) - .expect(ASSUME_NETWORK_MYSELF_ALIVE); - state - .on_channel_closed(channel_id, peer_id.clone(), tx.clone()) - .await; - } NetworkActorEvent::PeerMessage(peer_id, message) => { self.handle_peer_message(state, peer_id, message).await? } @@ -503,6 +465,24 @@ where NetworkActorEvent::FundingTransactionFailed(outpoint) => { error!("Funding transaction failed: {:?}", outpoint); } + NetworkActorEvent::ClosingTransactionPending(channel_id, peer_id, tx) => { + state + .on_closing_transaction_pending(channel_id, peer_id.clone(), tx.clone()) + .await; + + // Notify outside observers. + myself + .send_message(NetworkActorMessage::new_event( + NetworkActorEvent::NetworkServiceEvent( + NetworkServiceEvent::ClosingTransactionPending( + peer_id.clone(), + channel_id, + tx.clone(), + ), + ), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + } NetworkActorEvent::ClosingTransactionConfirmed(peer_id, channel_id, _tx_hash) => { // TODO: We should remove the channel from the session_channels_map. state.channels.remove(&channel_id); @@ -1125,7 +1105,7 @@ impl NetworkActorState { } } - async fn on_channel_closed( + async fn on_closing_transaction_pending( &mut self, channel_id: Hash256, peer_id: PeerId, From cc429a04a8ae52ea4e853365fe3e578c6d0c593e Mon Sep 17 00:00:00 2001 From: YI Date: Mon, 8 Jul 2024 13:57:19 +0800 Subject: [PATCH 7/8] Clarify all the channel events --- src/ckb/network.rs | 73 ++++++++++++++++++++-------------------------- 1 file changed, 31 insertions(+), 42 deletions(-) diff --git a/src/ckb/network.rs b/src/ckb/network.rs index 1d1dfc5a..6686983b 100644 --- a/src/ckb/network.rs +++ b/src/ckb/network.rs @@ -137,11 +137,15 @@ pub enum NetworkServiceEvent { NetworkStarted(PeerId, Multiaddr), PeerConnected(PeerId, Multiaddr), PeerDisConnected(PeerId, Multiaddr), + // An incoming/outgoing channel is created. ChannelCreated(PeerId, Hash256), + // A outgoing channel is pending to be accepted. ChannelPendingToBeAccepted(PeerId, Hash256), + // The channel is ready to use (with funding transaction confirmed + // and both parties sent ChannelReady messages). ChannelReady(PeerId, Hash256), - ChannelShutDown(PeerId, Hash256), - ClosingTransactionPending(PeerId, Hash256, TransactionView), + // The channel is closed (closing transaction is confirmed). + ChannelClosed(PeerId, Hash256), // We should sign a commitment transaction and send it to the other party. CommitmentSignaturePending(PeerId, Hash256, u64), // We have signed a commitment transaction and sent it to the other party. @@ -186,8 +190,6 @@ pub enum NetworkActorEvent { ), /// A channel is ready to use. ChannelReady(Hash256, PeerId), - /// A channel is being shutting down. - ChannelShutdown(Hash256, PeerId), /// A channel is already closed. ClosingTransactionPending(Hash256, PeerId, TransactionView), @@ -437,20 +439,6 @@ where )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); } - NetworkActorEvent::ChannelShutdown(channel_id, peer_id) => { - info!( - "Channel ({:?}) to peer {:?} is being shutdown.", - channel_id, peer_id - ); - // Notify outside observers. - myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::ChannelShutDown(peer_id, channel_id), - ), - )) - .expect(ASSUME_NETWORK_MYSELF_ALIVE); - } NetworkActorEvent::PeerMessage(peer_id, message) => { self.handle_peer_message(state, peer_id, message).await? } @@ -469,32 +457,11 @@ where state .on_closing_transaction_pending(channel_id, peer_id.clone(), tx.clone()) .await; - - // Notify outside observers. - myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::ClosingTransactionPending( - peer_id.clone(), - channel_id, - tx.clone(), - ), - ), - )) - .expect(ASSUME_NETWORK_MYSELF_ALIVE); } NetworkActorEvent::ClosingTransactionConfirmed(peer_id, channel_id, _tx_hash) => { - // TODO: We should remove the channel from the session_channels_map. - state.channels.remove(&channel_id); - if let Some(session) = state.get_peer_session(&peer_id) { - if let Some(set) = state.session_channels_map.get_mut(&session) { - set.remove(&channel_id); - } - } - state.send_message_to_channel_actor( - channel_id, - ChannelActorMessage::Event(ChannelEvent::ClosingTransactionConfirmed), - ) + state + .on_closing_transaction_confirmed(&peer_id, &channel_id) + .await; } NetworkActorEvent::ClosingTransactionFailed(peer_id, tx_hash, channel_id) => { error!( @@ -1142,6 +1109,28 @@ impl NetworkActorState { .await; } + async fn on_closing_transaction_confirmed(&mut self, peer_id: &PeerId, channel_id: &Hash256) { + self.channels.remove(&channel_id); + if let Some(session) = self.get_peer_session(&peer_id) { + if let Some(set) = self.session_channels_map.get_mut(&session) { + set.remove(&channel_id); + } + } + self.send_message_to_channel_actor( + *channel_id, + ChannelActorMessage::Event(ChannelEvent::ClosingTransactionConfirmed), + ); + // Notify outside observers. + self.network + .send_message(NetworkActorMessage::new_event( + NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelClosed( + peer_id.clone(), + *channel_id, + )), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + } + pub async fn on_open_channel_msg( &mut self, peer_id: PeerId, From 6e18ed633a7590b67dbec3949120a74881af06dd Mon Sep 17 00:00:00 2001 From: YI Date: Tue, 9 Jul 2024 00:16:09 +0800 Subject: [PATCH 8/8] Don't return error on repeated channel shutdown --- src/ckb/channel.rs | 11 ++++++++--- .../22-node2-send-shutdown-channel-1.bru | 8 ++------ .../24-node3-send-shutdown-channel-2.bru | 15 +++------------ .../udt/12-node2-send-shutdown-channel-error.bru | 15 +++------------ 4 files changed, 16 insertions(+), 33 deletions(-) diff --git a/src/ckb/channel.rs b/src/ckb/channel.rs index 1615a037..47cdbb89 100644 --- a/src/ckb/channel.rs +++ b/src/ckb/channel.rs @@ -645,6 +645,10 @@ impl ChannelActor { ) -> ProcessingChannelResult { debug!("Handling shutdown command: {:?}", &command); let flags = match state.state { + ChannelState::Closed => { + debug!("Channel already closed, ignoring shutdown command"); + return Ok(()); + } ChannelState::ChannelReady() => { debug!("Handling shutdown command in ChannelReady state"); ShuttingDownFlags::empty() @@ -652,9 +656,10 @@ impl ChannelActor { ChannelState::ShuttingDown(flags) => flags, _ => { debug!("Handling shutdown command in state {:?}", &state.state); - return Err(ProcessingChannelError::InvalidState( - "Trying to send shutdown message while in invalid state".to_string(), - )); + return Err(ProcessingChannelError::InvalidState(format!( + "Trying to send shutdown message while in invalid state {:?}", + &state.state + ))); } }; diff --git a/tests/bruno/e2e/3-nodes-transfer/22-node2-send-shutdown-channel-1.bru b/tests/bruno/e2e/3-nodes-transfer/22-node2-send-shutdown-channel-1.bru index 022658c3..976927e7 100644 --- a/tests/bruno/e2e/3-nodes-transfer/22-node2-send-shutdown-channel-1.bru +++ b/tests/bruno/e2e/3-nodes-transfer/22-node2-send-shutdown-channel-1.bru @@ -35,15 +35,11 @@ body:json { } assert { - res.body.error: isDefined - res.body.result: isUndefined + res.body.error: isUndefined + res.body.result: isNull } script:post-response { // Sleep for sometime to make sure current operation finishes before next request starts. - // will get error message since channel is closed in previous step await new Promise(r => setTimeout(r, 100)); - if (!(res.body.error.message === "Messaging failed because channel is closed")) { - throw new Error("Assertion failed: error message is not right"); - } } diff --git a/tests/bruno/e2e/3-nodes-transfer/24-node3-send-shutdown-channel-2.bru b/tests/bruno/e2e/3-nodes-transfer/24-node3-send-shutdown-channel-2.bru index f0d3d0bd..4431e746 100644 --- a/tests/bruno/e2e/3-nodes-transfer/24-node3-send-shutdown-channel-2.bru +++ b/tests/bruno/e2e/3-nodes-transfer/24-node3-send-shutdown-channel-2.bru @@ -35,15 +35,6 @@ body:json { } assert { - res.body.error: isDefined - res.body.result: isUndefined -} - -script:post-response { - // Sleep for sometime to make sure current operation finishes before next request starts. - // will get error message since channel is closed in previous step - await new Promise(r => setTimeout(r, 100)); - if (!(res.body.error.message === "Messaging failed because channel is closed")) { - throw new Error("Assertion failed: error message is not right"); - } -} + res.body.error: isUndefined + res.body.result: isNull +} \ No newline at end of file diff --git a/tests/bruno/e2e/udt/12-node2-send-shutdown-channel-error.bru b/tests/bruno/e2e/udt/12-node2-send-shutdown-channel-error.bru index 9470f5b2..729b7c5f 100644 --- a/tests/bruno/e2e/udt/12-node2-send-shutdown-channel-error.bru +++ b/tests/bruno/e2e/udt/12-node2-send-shutdown-channel-error.bru @@ -35,15 +35,6 @@ body:json { } assert { - res.body.error: isDefined - res.body.result: isUndefined -} - -script:post-response { - // Sleep for sometime to make sure current operation finishes before next request starts. - // will get error message since channel is closed in previous step - await new Promise(r => setTimeout(r, 100)); - if (!(res.body.error.message === "Messaging failed because channel is closed")) { - throw new Error("Assertion failed: error message is not right"); - } -} + res.body.error: isUndefined + res.body.result: isNull +} \ No newline at end of file