diff --git a/Cargo.lock b/Cargo.lock index 317af86ff..7a00e0de1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5467,6 +5467,7 @@ dependencies = [ "rustc_version 0.4.1", "serde", "serde_json", + "slab", "socket2 0.5.7", "tokio", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index b5853f489..c33219f46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -158,6 +158,7 @@ serde = { version = "1.0.210", default-features = false, features = [ ] } # Default features are disabled due to usage in no_std crates serde_json = "1.0.128" serde_yaml = "0.9.34" +slab = "0.4.9" static_init = "1.0.3" stabby = "36.1.1" sha3 = "0.10.8" diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 94f6d6eb4..1bc323095 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -86,6 +86,7 @@ phf = { workspace = true } rand = { workspace = true, features = ["default"] } serde = { workspace = true, features = ["default"] } serde_json = { workspace = true } +slab = { workspace = true } socket2 = { workspace = true } uhlc = { workspace = true, features = ["default"] } vec_map = { workspace = true } diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 6e1db6bbf..0b3078a80 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -19,12 +19,13 @@ use std::{ time::Duration, }; +use slab::Slab; use tokio_util::sync::CancellationToken; use zenoh_protocol::{ core::{ExprId, Reliability, WhatAmI, ZenohIdProto}, network::{ interest::{InterestId, InterestMode, InterestOptions}, - Mapping, Push, Request, RequestId, Response, ResponseFinal, + Mapping, Push, Request, Response, ResponseFinal, }, zenoh::RequestBody, }; @@ -70,8 +71,7 @@ pub struct FaceState { HashMap, CancellationToken)>, pub(crate) local_mappings: HashMap>, pub(crate) remote_mappings: HashMap>, - pub(crate) next_qid: RequestId, - pub(crate) pending_queries: HashMap, CancellationToken)>, + pub(crate) pending_queries: Slab<(Arc, CancellationToken)>, pub(crate) mcast_group: Option, pub(crate) in_interceptors: Option>, pub(crate) hat: Box, @@ -102,8 +102,7 @@ impl FaceState { pending_current_interests: HashMap::new(), local_mappings: HashMap::new(), remote_mappings: HashMap::new(), - next_qid: 0, - pending_queries: HashMap::new(), + pending_queries: Slab::new(), mcast_group, in_interceptors, hat, diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index f8a9f1f12..e62a912c4 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -290,13 +290,11 @@ pub(crate) fn update_matches_query_routes(tables: &Tables, res: &Arc) #[inline] fn insert_pending_query(outface: &mut Arc, query: Arc) -> RequestId { let outface_mut = get_mut_unchecked(outface); - outface_mut.next_qid += 1; - let qid = outface_mut.next_qid; - outface_mut.pending_queries.insert( - qid, - (query, outface_mut.task_controller.get_cancellation_token()), - ); - qid + outface_mut + .pending_queries + .insert((query, outface_mut.task_controller.get_cancellation_token())) + .try_into() + .expect("too many pending queries") } #[inline] @@ -381,7 +379,7 @@ impl QueryCleanup { qid, timeout, }; - if let Some((_, cancellation_token)) = face.pending_queries.get(&qid) { + if let Some((_, cancellation_token)) = face.pending_queries.get(qid as usize) { let c_cancellation_token = cancellation_token.clone(); face.task_controller .spawn_with_rt(zenoh_runtime::ZRuntime::Net, async move { @@ -422,7 +420,7 @@ impl Timed for QueryCleanup { let queries_lock = zwrite!(self.tables.queries_lock); if let Some(query) = get_mut_unchecked(&mut face) .pending_queries - .remove(&self.qid) + .try_remove(self.qid as usize) { drop(queries_lock); tracing::warn!( @@ -682,7 +680,7 @@ pub(crate) fn route_send_response( inc_res_stats!(face, rx, admin, body) } - match face.pending_queries.get(&qid) { + match face.pending_queries.get(qid as usize) { Some((query, _)) => { drop(queries_lock); @@ -717,7 +715,10 @@ pub(crate) fn route_send_response_final( qid: RequestId, ) { let queries_lock = zwrite!(tables_ref.queries_lock); - match get_mut_unchecked(face).pending_queries.remove(&qid) { + match get_mut_unchecked(face) + .pending_queries + .try_remove(qid as usize) + { Some(query) => { drop(queries_lock); tracing::debug!( @@ -735,7 +736,7 @@ pub(crate) fn route_send_response_final( pub(crate) fn finalize_pending_queries(tables_ref: &TablesLock, face: &mut Arc) { let queries_lock = zwrite!(tables_ref.queries_lock); - for (_, query) in get_mut_unchecked(face).pending_queries.drain() { + for query in get_mut_unchecked(face).pending_queries.drain() { finalize_pending_query(query); } drop(queries_lock); diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index 301698eea..225b7e64a 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -70,7 +70,7 @@ use crate::{ pub(crate) struct RuntimeState { zid: ZenohId, whatami: WhatAmI, - next_id: AtomicU32, + next_id: AtomicU32, // @TODO: manage rollover and uniqueness router: Arc, config: Notifier, manager: TransportManager,