Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default open timeout for new link establishment #1663

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@
transport: {
unicast: {
/// Timeout in milliseconds when opening a link
open_timeout: 10000,
/// Timeout in milliseconds when accepting a link
accept_timeout: 10000,
/// Maximum number of zenoh session in pending state while accepting
accept_pending: 100,
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl Default for ConnectConfig {
impl Default for TransportUnicastConf {
fn default() -> Self {
Self {
open_timeout: 10_000,
accept_timeout: 10_000,
accept_pending: 100,
max_sessions: 1_000,
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ validated_struct::validator! {
TransportConf {
pub unicast: TransportUnicastConf {
/// Timeout in milliseconds when opening a link (default: 10000).
open_timeout: u64,
/// Timeout in milliseconds when accepting a link (default: 10000).
accept_timeout: u64,
/// Number of links that may stay pending during accept phase (default: 100).
accept_pending: usize,
Expand Down
16 changes: 15 additions & 1 deletion io/zenoh-transport/src/unicast/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::{
pub struct TransportManagerConfigUnicast {
pub lease: Duration,
pub keep_alive: usize,
pub open_timeout: Duration,
pub accept_timeout: Duration,
pub accept_pending: usize,
pub max_sessions: usize,
Expand Down Expand Up @@ -105,6 +106,7 @@ pub struct TransportManagerBuilderUnicast {
// target interval.
pub(super) lease: Duration,
pub(super) keep_alive: usize,
pub(super) open_timeout: Duration,
pub(super) accept_timeout: Duration,
pub(super) accept_pending: usize,
pub(super) max_sessions: usize,
Expand All @@ -131,6 +133,11 @@ impl TransportManagerBuilderUnicast {
self
}

pub fn open_timeout(mut self, open_timeout: Duration) -> Self {
self.open_timeout = open_timeout;
self
}

pub fn accept_timeout(mut self, accept_timeout: Duration) -> Self {
self.accept_timeout = accept_timeout;
self
Expand Down Expand Up @@ -225,6 +232,7 @@ impl TransportManagerBuilderUnicast {
let config = TransportManagerConfigUnicast {
lease: self.lease,
keep_alive: self.keep_alive,
open_timeout: self.open_timeout,
accept_timeout: self.accept_timeout,
accept_pending: self.accept_pending,
max_sessions: self.max_sessions,
Expand Down Expand Up @@ -274,6 +282,7 @@ impl Default for TransportManagerBuilderUnicast {
Self {
lease: Duration::from_millis(*link_tx.lease()),
keep_alive: *link_tx.keep_alive(),
open_timeout: Duration::from_millis(*transport.open_timeout()),
accept_timeout: Duration::from_millis(*transport.accept_timeout()),
accept_pending: *transport.accept_pending(),
max_sessions: *transport.max_sessions(),
Expand Down Expand Up @@ -725,7 +734,12 @@ impl TransportManager {
// Create a new link associated by calling the Link Manager
let link = manager.new_link(endpoint.clone()).await?;
// Open the link
super::establishment::open::open_link(endpoint, link, self).await
tokio::time::timeout(
self.config.unicast.open_timeout,
super::establishment::open::open_link(endpoint, link, self),
)
.await
.map_err(|e| zerror!("{e}"))?
}

pub async fn get_transport_unicast(&self, peer: &ZenohIdProto) -> Option<TransportUnicast> {
Expand Down
55 changes: 12 additions & 43 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,7 @@ impl Runtime {
);
if retry_config.timeout().is_zero() || self.get_global_connect_timeout().is_zero() {
// try to connect and exit immediately without retry
if self
.peer_connector(endpoint, retry_config.timeout())
.await
.is_ok()
{
if self.peer_connector(endpoint).await.is_ok() {
return Ok(());
}
} else {
Expand All @@ -379,7 +375,7 @@ impl Runtime {
);
if retry_config.timeout().is_zero() || self.get_global_connect_timeout().is_zero() {
// try to connect and exit immediately without retry
if let Err(e) = self.peer_connector(endpoint, retry_config.timeout()).await {
if let Err(e) = self.peer_connector(endpoint).await {
if retry_config.exit_on_failure {
return Err(e);
}
Expand All @@ -398,18 +394,12 @@ impl Runtime {
Ok(())
}

async fn peer_connector(&self, peer: EndPoint, timeout: std::time::Duration) -> ZResult<()> {
match tokio::time::timeout(timeout, self.manager().open_transport_unicast(peer.clone()))
.await
{
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => {
tracing::warn!("Unable to connect to {}! {}", peer, e);
Err(e)
}
async fn peer_connector(&self, peer: EndPoint) -> ZResult<()> {
match self.manager().open_transport_unicast(peer.clone()).await {
Ok(_) => Ok(()),
Err(e) => {
tracing::warn!("Unable to connect to {}! {}", peer, e);
Err(e.into())
Err(e)
}
}
}
Expand Down Expand Up @@ -795,9 +785,9 @@ impl Runtime {
tracing::trace!("Trying to connect to configured peer {}", peer);
let endpoint = peer.clone();
tokio::select! {
res = tokio::time::timeout(retry_config.timeout(), self.manager().open_transport_unicast(endpoint)) => {
res = self.manager().open_transport_unicast(endpoint) => {
match res {
Ok(Ok(transport)) => {
Ok(transport) => {
tracing::debug!("Successfully connected to configured peer {}", peer);
if let Ok(Some(orch_transport)) = transport.get_callback() {
if let Some(orch_transport) = orch_transport
Expand All @@ -809,14 +799,6 @@ impl Runtime {
}
return transport.get_zid();
}
Ok(Err(e)) => {
tracing::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
period.duration()
);
}
Err(e) => {
tracing::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
Expand Down Expand Up @@ -977,7 +959,6 @@ impl Runtime {
};

let endpoint = locator.to_owned().into();
let retry_config = self.get_connect_retry_config(&endpoint);
let priorities = locator
.metadata()
.get(Metadata::PRIORITIES)
Expand All @@ -997,35 +978,23 @@ impl Runtime {
})
{
if is_multicast {
match tokio::time::timeout(
retry_config.timeout(),
manager.open_transport_multicast(endpoint),
)
.await
{
Ok(Ok(transport)) => {
match manager.open_transport_multicast(endpoint).await {
Ok(transport) => {
tracing::debug!(
"Successfully connected to newly scouted peer: {:?}",
transport
);
}
Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e),
Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e),
}
} else {
match tokio::time::timeout(
retry_config.timeout(),
manager.open_transport_unicast(endpoint),
)
.await
{
Ok(Ok(transport)) => {
match manager.open_transport_unicast(endpoint).await {
Ok(transport) => {
tracing::debug!(
"Successfully connected to newly scouted peer: {:?}",
transport
);
}
Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e),
Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e),
}
}
Expand Down
Loading