diff --git a/src/protocol/libp2p/identify.rs b/src/protocol/libp2p/identify.rs index e8fa4f50..39f74581 100644 --- a/src/protocol/libp2p/identify.rs +++ b/src/protocol/libp2p/identify.rs @@ -28,10 +28,11 @@ use crate::{ substream::Substream, transport::Endpoint, types::{protocol::ProtocolName, SubstreamId}, + utils::futures_stream::FuturesStream, PeerId, DEFAULT_CHANNEL_SIZE, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; +use futures::{future::BoxFuture, Stream, StreamExt}; use multiaddr::Multiaddr; use prost::Message; use tokio::sync::mpsc::{channel, Sender}; @@ -181,10 +182,10 @@ pub(crate) struct Identify { protocols: Vec, /// Pending outbound substreams. - pending_outbound: FuturesUnordered>>, + pending_outbound: FuturesStream>>, /// Pending inbound substreams. - pending_inbound: FuturesUnordered>, + pending_inbound: FuturesStream>, } impl Identify { @@ -197,8 +198,8 @@ impl Identify { public: config.public.expect("public key to be supplied"), protocol_version: config.protocol_version, user_agent: config.user_agent.unwrap_or(DEFAULT_AGENT.to_string()), - pending_inbound: FuturesUnordered::new(), - pending_outbound: FuturesUnordered::new(), + pending_inbound: FuturesStream::new(), + pending_outbound: FuturesStream::new(), protocols: config.protocols.iter().map(|protocol| protocol.to_string()).collect(), } } @@ -356,7 +357,10 @@ impl Identify { loop { tokio::select! { event = self.service.next() => match event { - None => return, + None => { + tracing::warn!(target: LOG_TARGET, "transport service stream ended, terminating identify event loop"); + return + }, Some(TransportEvent::ConnectionEstablished { peer, endpoint }) => { let _ = self.on_connection_established(peer, endpoint); } @@ -390,7 +394,7 @@ impl Identify { .await; } Some(Err(error)) => tracing::debug!(target: LOG_TARGET, ?error, "failed to read ipfs identify response"), - None => return, + None => {} } } } diff --git a/src/utils/futures_stream.rs b/src/utils/futures_stream.rs index 6393296d..60ef7172 100644 --- a/src/utils/futures_stream.rs +++ b/src/utils/futures_stream.rs @@ -50,6 +50,11 @@ impl FuturesStream { self.futures.len() } + /// Check if the stream is empty. + pub fn is_empty(&self) -> bool { + self.futures.is_empty() + } + /// Push a future for processing. pub fn push(&mut self, future: F) { self.futures.push(future);