From 5dd9d8f10c2b1c76e13e2930df1646ec9324ed43 Mon Sep 17 00:00:00 2001 From: Richard Jonas Date: Fri, 18 Oct 2024 17:11:20 +0200 Subject: [PATCH] Convert header frame to message props --- metalmq-client/src/message.rs | 20 +++++++++-- metalmq-codec/src/frame/mod.rs | 19 +++++++++++ metalmq/src/client/channel/open_close.rs | 6 ++-- metalmq/src/client/connection/tests.rs | 10 +++--- metalmq/src/client/connection/types.rs | 43 ++++++++++++++++++++---- 5 files changed, 82 insertions(+), 16 deletions(-) diff --git a/metalmq-client/src/message.rs b/metalmq-client/src/message.rs index 89c9ee6..34cf050 100644 --- a/metalmq-client/src/message.rs +++ b/metalmq-client/src/message.rs @@ -1,6 +1,8 @@ use std::collections::HashMap; -use metalmq_codec::frame::{AMQPFieldValue, ContentBodyFrame, ContentHeaderFrame, HeaderPropertyFlags}; +use metalmq_codec::frame::{ + fieldtable_to_hashmap, AMQPFieldValue, ContentBodyFrame, ContentHeaderFrame, HeaderPropertyFlags, +}; use crate::ChannelNumber; @@ -126,7 +128,21 @@ pub(crate) fn to_content_frames(message: Content) -> (ContentHeaderFrame, Conten impl From for MessageProperties { fn from(value: ContentHeaderFrame) -> Self { - MessageProperties::default() + MessageProperties { + content_type: value.content_type, + content_encoding: value.content_encoding, + headers: fieldtable_to_hashmap(value.headers.unwrap()), + delivery_mode: value.delivery_mode, + priority: value.priority, + correlation_id: value.correlation_id, + reply_to: value.reply_to, + expiration: value.expiration, + message_id: value.message_id, + timestamp: value.timestamp, + message_type: value.message_type, + user_id: value.user_id, + app_id: value.app_id, + } } } diff --git a/metalmq-codec/src/frame/mod.rs b/metalmq-codec/src/frame/mod.rs index d81eb6e..773e6c7 100644 --- a/metalmq-codec/src/frame/mod.rs +++ b/metalmq-codec/src/frame/mod.rs @@ -307,6 +307,25 @@ impl From for AMQPFrame { } } +pub fn fieldtable_to_hashmap(value: FieldTable) -> HashMap { + let mut m = HashMap::new(); + + for (k, v) in value { + match v { + AMQPFieldValue::Bool(b) => { + m.insert(k, b.to_string()); + } + AMQPFieldValue::LongString(s) => { + m.insert(k, s); + } + AMQPFieldValue::EmptyFieldTable => {} + AMQPFieldValue::FieldTable(_) => panic!("Embedded field table is not supported"), + } + } + + m +} + /// Split class id and method id from `u32` combined code. pub fn split_class_method(cm: u32) -> (u16, u16) { let method_id = (cm & 0x0000FFFF) as u16; diff --git a/metalmq/src/client/channel/open_close.rs b/metalmq/src/client/channel/open_close.rs index 6427f9d..7b3f249 100644 --- a/metalmq/src/client/channel/open_close.rs +++ b/metalmq/src/client/channel/open_close.rs @@ -2,7 +2,7 @@ use super::types::Channel; use crate::error::Result; impl Channel { - pub async fn handle_channel_close(&mut self, code: u16, cm: u32, text: String) -> Result<()> { + pub async fn handle_channel_close(&mut self, _code: u16, _cm: u32, _text: String) -> Result<()> { // Cancel consumed queues on the channel //if let Some(cq) = self.consumed_queues.remove(&channel) { // qm::cancel_consume( @@ -31,9 +31,9 @@ impl Channel { Ok(()) } - pub async fn handle_channel_close_ok(&mut self, channel: u16) -> Result<()> { + pub async fn handle_channel_close_ok(&mut self, _channel: u16) -> Result<()> { todo!(); - Ok(()) + //Ok(()) } } diff --git a/metalmq/src/client/connection/tests.rs b/metalmq/src/client/connection/tests.rs index a4c8a0e..0406e77 100644 --- a/metalmq/src/client/connection/tests.rs +++ b/metalmq/src/client/connection/tests.rs @@ -9,7 +9,7 @@ // // connection close closes all the channels -use metalmq_codec::{codec::Frame, frame}; +use metalmq_codec::frame; use tokio::sync::mpsc; use crate::{exchange, queue, tests::recv, Context}; @@ -48,7 +48,7 @@ async fn connection_close_clean_up_channels() { .await .unwrap(); - let connection_tune = recv::recv_single_frame(&mut rx).await; + let _connection_tune = recv::recv_single_frame(&mut rx).await; connection .handle_connection_tune_ok(frame::ConnectionTuneOkArgs::default()) @@ -60,7 +60,7 @@ async fn connection_close_clean_up_channels() { .await .unwrap(); - let connection_open_ok = recv::recv_single_frame(&mut rx).await; + let _connection_open_ok = recv::recv_single_frame(&mut rx).await; // start a channel // start a consumer @@ -69,7 +69,7 @@ async fn connection_close_clean_up_channels() { connection.handle_channel_open(11).await.unwrap(); - let channel_open_ok = recv::recv_single_frame(&mut rx).await; + let _channel_open_ok = recv::recv_single_frame(&mut rx).await; assert!(connection.channel_receivers.contains_key(&11)); @@ -94,7 +94,7 @@ async fn connection_close_clean_up_channels() { // back all the close messages? We will leak connections... okay we have heartbeat, so it // will close connection, but... - let channel_close = recv::recv_single_frame(&mut rx).await; + let _channel_close = recv::recv_single_frame(&mut rx).await; connection.handle_channel_close_ok(11).await.unwrap(); diff --git a/metalmq/src/client/connection/types.rs b/metalmq/src/client/connection/types.rs index ef47016..b37124a 100644 --- a/metalmq/src/client/connection/types.rs +++ b/metalmq/src/client/connection/types.rs @@ -1,11 +1,23 @@ use std::collections::HashMap; -use log::info; +use log::{debug, info}; use metalmq_codec::codec::Frame; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; use uuid::Uuid; -use crate::{client::channel::types::Command, exchange, queue, Context, Result}; +use crate::{ + client::channel::types::Command, + config::{MAX_CHANNELS_PER_CONNECTION, MAX_FRAME_SIZE}, + exchange, + queue::{ + self, + manager::{QueueDeleteCommand, QueueManagerCommand}, + }, + Context, Result, +}; /// Status of the connection. #[derive(Debug)] @@ -79,8 +91,8 @@ impl Connection { status: ConnectionState::Initiated, qm: context.queue_manager, em: context.exchange_manager, - channel_max: 2047, - frame_max: 131_072, + channel_max: MAX_CHANNELS_PER_CONNECTION, + frame_max: MAX_FRAME_SIZE, heartbeat_interval: None, auto_delete_exchanges: vec![], exclusive_queues: vec![], @@ -98,6 +110,7 @@ impl Connection { let _ = *ch_tx; if let Some(jh) = self.channel_handlers.remove(channel) { + debug!("Channel handler status {channel} = {}", jh.is_finished()); //jh.abort(); //let x = jh.await; @@ -139,6 +152,24 @@ impl Connection { //jh.abort(); } + for ex in &self.exclusive_queues { + let (tx, rx) = oneshot::channel(); + let cmd = QueueManagerCommand::Delete( + QueueDeleteCommand { + conn_id: self.id.clone(), + channel: 1u16, + queue_name: ex.queue_name.clone(), + if_unused: false, + if_empty: false, + }, + tx, + ); + + self.qm.send(cmd).await?; + + rx.await??; + } + Ok(()) } @@ -148,7 +179,7 @@ impl Connection { } if let Some(jh) = self.channel_handlers.remove(&channel) { - jh.await; + jh.await??; } Ok(())