Skip to content

Commit

Permalink
Add default open timeout for new link establishment
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Dec 12, 2024
1 parent cc201aa commit b3d4394
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 45 deletions.
4 changes: 3 additions & 1 deletion DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,9 @@
/// Configure internal transport parameters
transport: {
unicast: {
/// Timeout in milliseconds when opening a link
/// Timeout in milliseconds when accepting a link
open_timeout: 10000,
/// Timeout in milliseconds when accepting a link
accept_timeout: 10000,
/// Maximum number of zenoh session in pending state while accepting
accept_pending: 100,
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl Default for ConnectConfig {
impl Default for TransportUnicastConf {
fn default() -> Self {
Self {
open_timeout: 10_000,
accept_timeout: 10_000,
accept_pending: 100,
max_sessions: 1_000,
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ validated_struct::validator! {
pub transport: #[derive(Default)]
TransportConf {
pub unicast: TransportUnicastConf {
/// Timeout in milliseconds when opening a link (default: 10000).
open_timeout: u64,
/// Timeout in milliseconds when opening a link (default: 10000).
accept_timeout: u64,
/// Number of links that may stay pending during accept phase (default: 100).
Expand Down
16 changes: 15 additions & 1 deletion io/zenoh-transport/src/unicast/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::{
pub struct TransportManagerConfigUnicast {
pub lease: Duration,
pub keep_alive: usize,
pub open_timeout: Duration,
pub accept_timeout: Duration,
pub accept_pending: usize,
pub max_sessions: usize,
Expand Down Expand Up @@ -105,6 +106,7 @@ pub struct TransportManagerBuilderUnicast {
// target interval.
pub(super) lease: Duration,
pub(super) keep_alive: usize,
pub(super) open_timeout: Duration,
pub(super) accept_timeout: Duration,
pub(super) accept_pending: usize,
pub(super) max_sessions: usize,
Expand All @@ -131,6 +133,11 @@ impl TransportManagerBuilderUnicast {
self
}

pub fn open_timeout(mut self, open_timeout: Duration) -> Self {
self.open_timeout = open_timeout;
self
}

pub fn accept_timeout(mut self, accept_timeout: Duration) -> Self {
self.accept_timeout = accept_timeout;
self
Expand Down Expand Up @@ -225,6 +232,7 @@ impl TransportManagerBuilderUnicast {
let config = TransportManagerConfigUnicast {
lease: self.lease,
keep_alive: self.keep_alive,
open_timeout: self.open_timeout,
accept_timeout: self.accept_timeout,
accept_pending: self.accept_pending,
max_sessions: self.max_sessions,
Expand Down Expand Up @@ -274,6 +282,7 @@ impl Default for TransportManagerBuilderUnicast {
Self {
lease: Duration::from_millis(*link_tx.lease()),
keep_alive: *link_tx.keep_alive(),
open_timeout: Duration::from_millis(*transport.open_timeout()),
accept_timeout: Duration::from_millis(*transport.accept_timeout()),
accept_pending: *transport.accept_pending(),
max_sessions: *transport.max_sessions(),
Expand Down Expand Up @@ -725,7 +734,12 @@ impl TransportManager {
// Create a new link associated by calling the Link Manager
let link = manager.new_link(endpoint.clone()).await?;
// Open the link
super::establishment::open::open_link(endpoint, link, self).await
tokio::time::timeout(
self.config.unicast.open_timeout,
super::establishment::open::open_link(endpoint, link, self),
)
.await
.map_err(|e| zerror!("{e}"))?
}

pub async fn get_transport_unicast(&self, peer: &ZenohIdProto) -> Option<TransportUnicast> {
Expand Down
55 changes: 12 additions & 43 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,7 @@ impl Runtime {
);
if retry_config.timeout().is_zero() || self.get_global_connect_timeout().is_zero() {
// try to connect and exit immediately without retry
if self
.peer_connector(endpoint, retry_config.timeout())
.await
.is_ok()
{
if self.peer_connector(endpoint).await.is_ok() {
return Ok(());
}
} else {
Expand All @@ -379,7 +375,7 @@ impl Runtime {
);
if retry_config.timeout().is_zero() || self.get_global_connect_timeout().is_zero() {
// try to connect and exit immediately without retry
if let Err(e) = self.peer_connector(endpoint, retry_config.timeout()).await {
if let Err(e) = self.peer_connector(endpoint).await {
if retry_config.exit_on_failure {
return Err(e);
}
Expand All @@ -398,18 +394,12 @@ impl Runtime {
Ok(())
}

async fn peer_connector(&self, peer: EndPoint, timeout: std::time::Duration) -> ZResult<()> {
match tokio::time::timeout(timeout, self.manager().open_transport_unicast(peer.clone()))
.await
{
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => {
tracing::warn!("Unable to connect to {}! {}", peer, e);
Err(e)
}
async fn peer_connector(&self, peer: EndPoint) -> ZResult<()> {
match self.manager().open_transport_unicast(peer.clone()).await {
Ok(_) => Ok(()),
Err(e) => {
tracing::warn!("Unable to connect to {}! {}", peer, e);
Err(e.into())
Err(e)
}
}
}
Expand Down Expand Up @@ -795,9 +785,9 @@ impl Runtime {
tracing::trace!("Trying to connect to configured peer {}", peer);
let endpoint = peer.clone();
tokio::select! {
res = tokio::time::timeout(retry_config.timeout(), self.manager().open_transport_unicast(endpoint)) => {
res = self.manager().open_transport_unicast(endpoint) => {
match res {
Ok(Ok(transport)) => {
Ok(transport) => {
tracing::debug!("Successfully connected to configured peer {}", peer);
if let Ok(Some(orch_transport)) = transport.get_callback() {
if let Some(orch_transport) = orch_transport
Expand All @@ -809,14 +799,6 @@ impl Runtime {
}
return transport.get_zid();
}
Ok(Err(e)) => {
tracing::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
period.duration()
);
}
Err(e) => {
tracing::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
Expand Down Expand Up @@ -977,7 +959,6 @@ impl Runtime {
};

let endpoint = locator.to_owned().into();
let retry_config = self.get_connect_retry_config(&endpoint);
let priorities = locator
.metadata()
.get(Metadata::PRIORITIES)
Expand All @@ -997,35 +978,23 @@ impl Runtime {
})
{
if is_multicast {
match tokio::time::timeout(
retry_config.timeout(),
manager.open_transport_multicast(endpoint),
)
.await
{
Ok(Ok(transport)) => {
match manager.open_transport_multicast(endpoint).await {
Ok(transport) => {
tracing::debug!(
"Successfully connected to newly scouted peer: {:?}",
transport
);
}
Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e),
Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e),
}
} else {
match tokio::time::timeout(
retry_config.timeout(),
manager.open_transport_unicast(endpoint),
)
.await
{
Ok(Ok(transport)) => {
match manager.open_transport_unicast(endpoint).await {
Ok(transport) => {
tracing::debug!(
"Successfully connected to newly scouted peer: {:?}",
transport
);
}
Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e),
Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e),
}
}
Expand Down

0 comments on commit b3d4394

Please sign in to comment.