Threadsafe Session Store #66
Replies: 7 comments 2 replies
-
you cannot use ntex types in multithreaded context. i would use channels for multithreaded communications |
Beta Was this translation helpful? Give feedback.
-
you need thread safe storage for session ids, so you can choose in which thread connection is located, and separate storage for thread sender. |
Beta Was this translation helpful? Give feedback.
-
Cannot get it yet :) Inside the handshake hook I create the session. I should store this newly created session (or link to it) in some container (Vec, BTreeSet) in order to iterate over this storage and get all the corresponding sinks inside the publish hook when a new message arrives. And probably it should have a static lifetime in order to be accessible between threads and places. It seems I need an example :) |
Beta Was this translation helpful? Give feedback.
-
you can store session only in thread local, so you can iterate over local sessions only, if you cannot find local session then you can send message to all other threads, so each of them iterate over its local sessions. also you can add optimization and store session ids in thread safe session registry so you know which thread contains required session |
Beta Was this translation helpful? Give feedback.
-
Succeeded with one-worker MQTT server and stuck.
Okay, suppose I receive publish inside one of the MqttServers' workers publish hook. I successfully iterating over its local session storage and everything seems fine. But how can I "resend" this message to other threads (and them only) to initiate their publish hook? Or how can I inform workers in another way maybe? |
Beta Was this translation helpful? Give feedback.
-
|
Beta Was this translation helpful? Give feedback.
-
@Antiarchitect in my private proof-of-concept project I implement broker behavior like Nikolay says above. In my solution:
Something like that #[derive(Debug)]
pub enum AppStateCmd {
// AddToSubs((ByteString, usize, mpsc::UnboundedSender<SubsCmd>)),
RegisterClient {
resp: oneshot::Sender<usize>,
sender: mpsc::UnboundedSender<SubsCmd>,
},
AddToSubs(NewSubsription),
Publish(PublishMessage),
ClientDisconnect(usize),
}
pub struct AppState {
pub tx: mpsc::UnboundedSender<AppStateCmd>,
pub rx: mpsc::UnboundedReceiver<AppStateCmd>,
pub clients_tx_by_index: Slab<mpsc::UnboundedSender<SubsCmd>>, // pool of clients/subscribers tx
pub registry: TopicRegistry, // broker stuff
}
impl AppState {
pub fn new() -> Self {
let (tx, rx) = mpsc::unbounded::<AppStateCmd>();
AppState {
tx,
rx,
clients_tx_by_index: Slab::new(),
registry: TopicRegistry::new(),
}
}
pub async fn listen_once(&mut self) -> bool {
if let Some(cmd) = self.rx.next().await {
log::trace!("Command {:?}", cmd);
self.execute_cmd(cmd).await;
true
} else {
false
}
}
pub async fn listen(mut self) {
while self.listen_once().await {}
}
pub async fn execute_cmd(&mut self, cmd: AppStateCmd) {
match cmd {
AppStateCmd::RegisterClient { resp, sender } => {
let client_index = self.clients_tx_by_index.insert(sender);
let _ = resp.send(client_index);
}
...
}
#[derive(Clone)]
pub struct ConnState {
client_id: String,
client_index: Cell<Option<usize>>,
app_state_tx: mpsc::UnboundedSender<AppStateCmd>,
}
impl ConnState {
pub fn spawn_recv_messages(&self, mut rx: mpsc::UnboundedReceiver<SubsCmd>, sink: ConnSink) {
// wait cmd from app state
let state = self.clone();
ntex::rt::spawn(async move {
loop {
log::trace!("Subscriber {:?} wait message", state);
match rx.next().await {
Some(cmd) => match cmd {
SubsCmd::Publish(message) => {
log::trace!("Try send {:?} to {:?}", message, state);
match sink {
ConnSink::V3(ref sink) => {
if let Err(e) = sink
.publish(message.topic, message.bytes)
.send_at_least_once()
.await
{
log::trace!("Error send message to {} - {}", state, e);
state.unregister_as_subscriber();
break;
}
}
...
};
}
});
}
} and main thread is #[ntex::main]
async fn main() -> std::io::Result<()> {
env_logger::init();
let settings = settings::Settings::new().unwrap();
let app_state = app_state::AppState::new();
let app_state_tx = app_state.tx.clone();
// listen commands for AppState
ntex::rt::spawn(async move {
app_state.listen().await;
});
ntex::server::Server::build()
.bind(
"mqtt",
format!("{}:{}", settings.host, settings.port),
move || {
let app_state_tx = app_state_tx.clone();
MqttServer::new()
.v3(
// on handshake register new client
v3::MqttServer::new(move |req| handshake_v3(req, app_state_tx.clone()))
.publish(fn_factory_with_config(
move |session: v3::Session<ConnState>| {
let script = script.clone();
let kafka_producer = kafka_producer_clone.clone();
let settings = settings_clone.clone();
ok::<_, MyServerError>(fn_service(move |req| {
publish_v3(
settings.clone(),
kafka_producer.clone(),
script.clone(),
session.clone(),
req,
)
}))
},
))
// on subscribe register that connection as subscriber to topics
// spawn task to wait topic messages to send them to client
.control(fn_factory_with_config(
move |session: v3::Session<ConnState>| {
ok::<_, MyServerError>(fn_service(move |req| {
control_v3(session.clone(), req)
}))
},
)),
)
... pub async fn control_v3(
session: v3::Session<ConnState>,
control: v3::ControlMessage,
) -> Result<v3::ControlResult, MyServerError> {
let state = session.state().clone();
// let client_index = unsafe { (*state).client_index };
log::info!("incoming control v3 ({:?})", state);
match control {
v3::ControlMessage::Ping(ping) => Ok(ping.ack()),
v3::ControlMessage::Closed(closed) => {
log::info!("incoming closed {:?} {:?}", state, closed);
state.unregister_as_subscriber();
Ok(closed.ack())
}
v3::ControlMessage::Subscribe(mut msg) => {
log::info!("incoming subscribe {:?} {:?}", state, msg);
let (tx, rx) = mpsc::unbounded::<SubsCmd>();
state.register_as_subscriber(tx).await;
for mut sub in &mut msg {
log::info!("subscribe ({:?}) -> {:?})", state, sub.topic());
let topic = sub.topic();
state.subscribe_to_topic(topic.to_owned());
sub.subscribe(v3::codec::QoS::AtLeastOnce);
}
let sink = session.sink().clone();
// spawn task to wait commands from AppState to write messages to the sink
state.spawn_recv_messages(rx, ConnSink::V3(sink));
Ok(msg.ack())
}
_ => Ok(control.disconnect()),
}
} P.S. Sorry for my poor English |
Beta Was this translation helpful? Give feedback.
-
Trying to implement Threadsafe Session Store in order to serve MQTT subscribers:
Where AsyncSafe is:
But getting this:
Maybe the whole approach is incorrect.
Beta Was this translation helpful? Give feedback.
All reactions