diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 6a193829e4..529e2dd83e 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1972,7 +1972,38 @@ impl SessionInner { let primitives = zread!(self.state).primitives()?; primitives.opt_send_push( &wire_expr, - || Push { + || { ( + push::ext::QoSType::new( + priority.into(), + congestion_control, + is_express, + ), + match kind { + SampleKind::Put => PushBody::Put(Put { + timestamp, + encoding: encoding.clone().into(), + #[cfg(feature = "unstable")] + ext_sinfo: source_info.into(), + #[cfg(not(feature = "unstable"))] + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: attachment.clone().map(|a| a.into()), + ext_unknown: vec![], + payload: payload.clone().into(), + }), + SampleKind::Delete => PushBody::Del(Del { + timestamp, + #[cfg(feature = "unstable")] + ext_sinfo: source_info.into(), + #[cfg(not(feature = "unstable"))] + ext_sinfo: None, + ext_attachment: attachment.clone().map(|a| a.into()), + ext_unknown: vec![], + }), + }, + ) + /*Push { wire_expr: wire_expr.to_owned(), ext_qos: push::ext::QoSType::new( priority.into(), @@ -2005,7 +2036,8 @@ impl SessionInner { ext_unknown: vec![], }), }, - }, + }*/ + }, #[cfg(feature = "unstable")] reliability, #[cfg(not(feature = "unstable"))] diff --git a/zenoh/src/net/primitives/mod.rs b/zenoh/src/net/primitives/mod.rs index d5fb50ebda..5772b8efe1 100644 --- a/zenoh/src/net/primitives/mod.rs +++ b/zenoh/src/net/primitives/mod.rs @@ -20,13 +20,13 @@ pub use demux::*; pub use mux::*; use zenoh_protocol::{ core::{Reliability, WireExpr}, - network::{interest::Interest, Declare, Push, Request, Response, ResponseFinal}, + network::{interest::Interest, push, Declare, Push, Request, Response, ResponseFinal}, zenoh::PushBody, }; use super::routing::RoutingContext; pub trait OptPrimitives: Send + Sync { - fn opt_send_pushPush>(&self,wire_expr: &WireExpr<'_>, fn_msg: F, reliability: Reliability); + fn opt_send_push(push::ext::QoSType, PushBody)>(&self,wire_expr: &WireExpr<'_>, fn_msg: F, reliability: Reliability); } pub trait Primitives: Send + Sync { diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 044f4de2d8..eb1203f3ca 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -23,10 +23,9 @@ use tokio_util::sync::CancellationToken; use zenoh_protocol::{ core::{ExprId, Reliability, WhatAmI, ZenohIdProto}, network::{ - interest::{InterestId, InterestMode, InterestOptions}, - Mapping, Push, Request, RequestId, Response, ResponseFinal, + interest::{InterestId, InterestMode, InterestOptions}, push, Mapping, Push, Request, RequestId, Response, ResponseFinal }, - zenoh::RequestBody, + zenoh::{PushBody, RequestBody}, }; use zenoh_sync::get_mut_unchecked; use zenoh_task::TaskController; @@ -217,7 +216,7 @@ impl Face { impl OptPrimitives for Face { #[inline] - fn opt_send_pushPush>(&self,wire_expr: &zenoh_protocol::core::WireExpr<'_>, fn_msg: F, reliability: Reliability) { + fn opt_send_push(push::ext::QoSType, PushBody)>(&self,wire_expr: &zenoh_protocol::core::WireExpr<'_>, fn_msg: F, reliability: Reliability) { opt_route_data(&self.tables, &self.state, wire_expr, fn_msg, reliability); } } diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index cea956d692..52fd4e407b 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -17,8 +17,7 @@ use zenoh_core::zread; use zenoh_protocol::{ core::{key_expr::keyexpr, Reliability, WhatAmI, WireExpr}, network::{ - declare::{ext, SubscriberId}, - Push, + declare::{ext, SubscriberId}, push, Push }, zenoh::PushBody, }; @@ -524,7 +523,7 @@ pub fn route_data( } -pub fn opt_route_dataPush>( +pub fn opt_route_data(push::ext::QoSType, PushBody)>( tables_ref: &Arc, face: &FaceState, wire_expr: &WireExpr<'_>, @@ -537,14 +536,13 @@ pub fn opt_route_dataPush>( .cloned() { Some(prefix) => { - let mut msg = fn_msg(); tracing::trace!( "{} Route data for res {}{}", face, prefix.expr(), - msg.wire_expr.suffix.as_ref() + wire_expr.suffix.as_ref() ); - let mut expr = RoutingExpr::new(&prefix, msg.wire_expr.suffix.as_ref()); + let mut expr = RoutingExpr::new(&prefix, wire_expr.suffix.as_ref()); #[cfg(feature = "stats")] let admin = expr.full_expr().starts_with("@/"); @@ -558,10 +556,11 @@ pub fn opt_route_dataPush>( if tables.hat_code.ingress_filter(&tables, face, &mut expr) { let res = Resource::get_resource(&prefix, expr.suffix); - let route = get_data_route(&tables, face, &res, &mut expr, msg.ext_nodeid.node_id); + let route = get_data_route(&tables, face, &res, &mut expr, push::ext::NodeIdType::DEFAULT.node_id); if !route.is_empty() { - treat_timestamp!(&tables.hlc, msg.payload, tables.drop_future_timestamp); + let (ext_qos, mut payload) = fn_msg(); + treat_timestamp!(&tables.hlc, payload, tables.drop_future_timestamp); if route.len() == 1 { let (outface, key_expr, context) = route.values().next().unwrap(); @@ -580,10 +579,10 @@ pub fn opt_route_dataPush>( outface.primitives.send_push( Push { wire_expr: key_expr.into(), - ext_qos: msg.ext_qos, - ext_tstamp: msg.ext_tstamp, + ext_qos, + ext_tstamp: None, ext_nodeid: ext::NodeIdType { node_id: *context }, - payload: msg.payload, + payload, }, reliability, ) @@ -611,10 +610,10 @@ pub fn opt_route_dataPush>( outface.primitives.send_push( Push { wire_expr: key_expr, - ext_qos: msg.ext_qos, + ext_qos, ext_tstamp: None, ext_nodeid: ext::NodeIdType { node_id: context }, - payload: msg.payload.clone(), + payload: payload.clone(), }, reliability, ) @@ -638,10 +637,10 @@ pub fn opt_route_dataPush>( outface.primitives.send_push( Push { wire_expr: key_expr.into(), - ext_qos: msg.ext_qos, + ext_qos, ext_tstamp: None, ext_nodeid: ext::NodeIdType { node_id: *context }, - payload: msg.payload.clone(), + payload: payload.clone(), }, reliability, )