From 9bf67901d4b4bdef7ccd83add185c785f2f8e721 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 11 Dec 2024 13:49:42 +0200 Subject: [PATCH 1/3] identify: Replace FuturesUnordered with FutureStream Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/identify.rs | 16 ++++++++++------ src/utils/futures_stream.rs | 5 +++++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/protocol/libp2p/identify.rs b/src/protocol/libp2p/identify.rs index e8fa4f50..69ff2d50 100644 --- a/src/protocol/libp2p/identify.rs +++ b/src/protocol/libp2p/identify.rs @@ -28,10 +28,11 @@ use crate::{ substream::Substream, transport::Endpoint, types::{protocol::ProtocolName, SubstreamId}, + utils::futures_stream::FuturesStream, PeerId, DEFAULT_CHANNEL_SIZE, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; +use futures::{future::BoxFuture, Stream, StreamExt}; use multiaddr::Multiaddr; use prost::Message; use tokio::sync::mpsc::{channel, Sender}; @@ -181,10 +182,10 @@ pub(crate) struct Identify { protocols: Vec, /// Pending outbound substreams. - pending_outbound: FuturesUnordered>>, + pending_outbound: FuturesStream>>, /// Pending inbound substreams. - pending_inbound: FuturesUnordered>, + pending_inbound: FuturesStream>, } impl Identify { @@ -197,8 +198,8 @@ impl Identify { public: config.public.expect("public key to be supplied"), protocol_version: config.protocol_version, user_agent: config.user_agent.unwrap_or(DEFAULT_AGENT.to_string()), - pending_inbound: FuturesUnordered::new(), - pending_outbound: FuturesUnordered::new(), + pending_inbound: FuturesStream::new(), + pending_outbound: FuturesStream::new(), protocols: config.protocols.iter().map(|protocol| protocol.to_string()).collect(), } } @@ -356,7 +357,10 @@ impl Identify { loop { tokio::select! { event = self.service.next() => match event { - None => return, + None => { + tracing::warn!(target: LOG_TARGET, "transport service stream ended, terminating identify event loop"); + return + }, Some(TransportEvent::ConnectionEstablished { peer, endpoint }) => { let _ = self.on_connection_established(peer, endpoint); } diff --git a/src/utils/futures_stream.rs b/src/utils/futures_stream.rs index 6393296d..60ef7172 100644 --- a/src/utils/futures_stream.rs +++ b/src/utils/futures_stream.rs @@ -50,6 +50,11 @@ impl FuturesStream { self.futures.len() } + /// Check if the stream is empty. + pub fn is_empty(&self) -> bool { + self.futures.is_empty() + } + /// Push a future for processing. pub fn push(&mut self, future: F) { self.futures.push(future); From 11a1b7d6354bbfeb6593a0ba9d58296bfeb2bf73 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 11 Dec 2024 13:50:14 +0200 Subject: [PATCH 2/3] identify: Do not exit on pending outbound event Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/identify.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/libp2p/identify.rs b/src/protocol/libp2p/identify.rs index 69ff2d50..f2a9a281 100644 --- a/src/protocol/libp2p/identify.rs +++ b/src/protocol/libp2p/identify.rs @@ -394,7 +394,7 @@ impl Identify { .await; } Some(Err(error)) => tracing::debug!(target: LOG_TARGET, ?error, "failed to read ipfs identify response"), - None => return, + _ => {} } } } From a9c2e96363047ab1b3147a9a6168d60cae513f26 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Wed, 11 Dec 2024 18:57:38 +0200 Subject: [PATCH 3/3] Update src/protocol/libp2p/identify.rs Co-authored-by: Dmitry Markin --- src/protocol/libp2p/identify.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/libp2p/identify.rs b/src/protocol/libp2p/identify.rs index f2a9a281..39f74581 100644 --- a/src/protocol/libp2p/identify.rs +++ b/src/protocol/libp2p/identify.rs @@ -394,7 +394,7 @@ impl Identify { .await; } Some(Err(error)) => tracing::debug!(target: LOG_TARGET, ?error, "failed to read ipfs identify response"), - _ => {} + None => {} } } }