diff --git a/app/app.rs b/app/app.rs index cd0b906..c50d3f7 100644 --- a/app/app.rs +++ b/app/app.rs @@ -33,6 +33,8 @@ pub enum Error { "Unable to verify existence of CUSF mainchain service(s) at {0}: {1}" )] VerifyMainchainServices(url::Url, tonic::Status), + #[error("failed to connect to CUSF mainchain enforcer at {0}")] + ConnectMainchain(url::Url, #[source] tonic::Status), #[error("io error")] Io(#[from] std::io::Error), #[error("miner error")] @@ -210,11 +212,21 @@ impl App { .connect_lazy(); let (cusf_mainchain, cusf_mainchain_wallet) = if runtime .block_on(Self::check_proto_support(transport.clone())) - .map_err(|err| { - Error::VerifyMainchainServices( + .map_err(|err| match err { + status + // Kind of crude, but I'm unable to match this on a std::io::Error... + if status.code() == tonic::Code::Unavailable + && status.message().contains("tcp connect error") => + { + Error::ConnectMainchain( + config.mainchain_grpc_address.clone(), + status, + ) + } + _ => Error::VerifyMainchainServices( config.mainchain_grpc_address.clone(), err, - ) + ), })? { ( mainchain::ValidatorClient::new(transport.clone()), diff --git a/lib/net/mod.rs b/lib/net/mod.rs index 489fa30..03bc01b 100644 --- a/lib/net/mod.rs +++ b/lib/net/mod.rs @@ -235,13 +235,25 @@ impl Net { } } - pub fn remove_active_peer(&self, addr: SocketAddr) { - tracing::trace!(%addr, "remove active peer: starting"); + /// Removes a peer from the active peers list. + #[instrument(skip_all, fields(addr))] + pub fn remove_active_peer( + &self, + env: &heed::Env, + addr: SocketAddr, + ) -> Result<(), Error> { let mut active_peers_write = self.active_peers.write(); if let Some(peer_connection) = active_peers_write.remove(&addr) { drop(peer_connection); tracing::info!(%addr, "remove active peer: disconnected"); } + + let mut rwtxn = env.write_txn()?; + self.known_peers.delete(&mut rwtxn, &addr)?; + rwtxn.commit()?; + tracing::info!(%addr, "remove active peer: removed from known peers"); + + Ok(()) } // TODO: This should have more context. @@ -317,16 +329,8 @@ impl Net { match env.open_database(&rwtxn, Some("known_peers"))? { Some(known_peers) => known_peers, None => { - let known_peers = - env.create_database(&mut rwtxn, Some("known_peers"))?; - const SEED_NODE_ADDR: SocketAddr = SocketAddr::new( - std::net::IpAddr::V4(std::net::Ipv4Addr::new( - 172, 105, 148, 135, - )), - 4000 + THIS_SIDECHAIN as u16, - ); - known_peers.put(&mut rwtxn, &SEED_NODE_ADDR, &())?; - known_peers + tracing::info!("creating known peers database"); + env.create_database(&mut rwtxn, Some("known_peers"))? } }; rwtxn.commit()?; @@ -339,16 +343,33 @@ impl Net { peer_info_tx, known_peers, }; - #[allow(clippy::let_and_return)] let known_peers: Vec<_> = { - let rotxn = env.read_txn()?; + let mut txn = env.write_txn()?; + + // important: we need to always have at least one peer. we + // might end up with this peer misbehaving, causing us to + // disconnect and delete it from the database. always try + // and stick it back in, if it's missing! + if net.known_peers.is_empty(&txn)? { + const SEED_NODE_ADDR: SocketAddr = SocketAddr::new( + std::net::IpAddr::V4(std::net::Ipv4Addr::new( + 172, 105, 148, 135, + )), + 4000 + THIS_SIDECHAIN as u16, + ); + + tracing::info!("empty list of known peers, adding seed node {SEED_NODE_ADDR}"); + net.known_peers.put(&mut txn, &SEED_NODE_ADDR, &())?; + } + let known_peers = net .known_peers - .iter(&rotxn)? + .iter(&txn)? .transpose_into_fallible() .collect()?; known_peers }; + let () = known_peers.into_iter().try_for_each(|(peer_addr, _)| { tracing::trace!( "new net: connecting to already known peer at {peer_addr}" @@ -360,9 +381,7 @@ impl Net { tracing::warn!( %addr, "new net: known peer with invalid remote address, removing" ); - let mut tx = env.write_txn()?; - net.known_peers.delete(&mut tx, &peer_addr)?; - tx.commit()?; + net.remove_active_peer(env, peer_addr)?; tracing::info!( %addr, diff --git a/lib/node/net_task.rs b/lib/node/net_task.rs index d1b322f..524d228 100644 --- a/lib/node/net_task.rs +++ b/lib/node/net_task.rs @@ -434,7 +434,7 @@ where if header.hash() != block_hash { // Invalid response tracing::warn!(%addr, ?req, ?resp,"Invalid response from peer; unexpected block hash"); - let () = ctxt.net.remove_active_peer(addr); + let () = ctxt.net.remove_active_peer(&ctxt.env, addr)?; return Ok(()); } { @@ -567,13 +567,13 @@ where // check that the end header is as requested let Some(end_header) = headers.last() else { tracing::warn!(%addr, ?req, "Invalid response from peer; missing end header"); - let () = ctxt.net.remove_active_peer(addr); + let () = ctxt.net.remove_active_peer(&ctxt.env, addr)?; return Ok(()); }; let end_header_hash = end_header.hash(); if end_header_hash != end { tracing::warn!(%addr, ?req, ?end_header,"Invalid response from peer; unexpected end header"); - let () = ctxt.net.remove_active_peer(addr); + let () = ctxt.net.remove_active_peer(&ctxt.env, addr)?; return Ok(()); } // Must be at least one header due to previous check @@ -583,7 +583,7 @@ where && !start.contains(&start_hash) { tracing::warn!(%addr, ?req, %start_hash, "Invalid response from peer; invalid start hash"); - let () = ctxt.net.remove_active_peer(addr); + let () = ctxt.net.remove_active_peer(&ctxt.env, addr)?; return Ok(()); } // check that the end header height is as expected @@ -602,7 +602,8 @@ where }; if end_height != height { tracing::warn!(%addr, ?req, ?start_hash, "Invalid response from peer; invalid end height"); - let () = ctxt.net.remove_active_peer(addr); + let () = + ctxt.net.remove_active_peer(&ctxt.env, addr)?; return Ok(()); } } @@ -611,7 +612,8 @@ where for header in &headers { if header.prev_side_hash != prev_side_hash { tracing::warn!(%addr, ?req, ?headers,"Invalid response from peer; non-sequential headers"); - let () = ctxt.net.remove_active_peer(addr); + let () = + ctxt.net.remove_active_peer(&ctxt.env, addr)?; return Ok(()); } prev_side_hash = Some(header.hash()); @@ -669,7 +671,7 @@ where ) => { // Invalid response tracing::warn!(%addr, ?req, ?resp,"Invalid response from peer"); - let () = ctxt.net.remove_active_peer(addr); + let () = ctxt.net.remove_active_peer(&ctxt.env, addr)?; Ok(()) } } @@ -856,7 +858,10 @@ where MailboxItem::PeerInfo(Some((addr, None))) => { // peer connection is closed, remove it tracing::warn!(%addr, "Connection to peer closed"); - let () = self.ctxt.net.remove_active_peer(addr); + let () = self + .ctxt + .net + .remove_active_peer(&self.ctxt.env, addr)?; continue; } MailboxItem::PeerInfo(Some((addr, Some(peer_info)))) => { @@ -865,7 +870,10 @@ where PeerConnectionInfo::Error(err) => { let err = anyhow::anyhow!(err); tracing::error!(%addr, err = format!("{err:#}"), "Peer connection error"); - let () = self.ctxt.net.remove_active_peer(addr); + let () = self + .ctxt + .net + .remove_active_peer(&self.ctxt.env, addr)?; } PeerConnectionInfo::NeedBmmVerification { main_hash,