Skip to content

Commit

Permalink
Convert header frame to message props
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasrichard committed Oct 18, 2024
1 parent 35e4d12 commit 5dd9d8f
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 16 deletions.
20 changes: 18 additions & 2 deletions metalmq-client/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;

use metalmq_codec::frame::{AMQPFieldValue, ContentBodyFrame, ContentHeaderFrame, HeaderPropertyFlags};
use metalmq_codec::frame::{
fieldtable_to_hashmap, AMQPFieldValue, ContentBodyFrame, ContentHeaderFrame, HeaderPropertyFlags,
};

use crate::ChannelNumber;

Expand Down Expand Up @@ -126,7 +128,21 @@ pub(crate) fn to_content_frames(message: Content) -> (ContentHeaderFrame, Conten

impl From<ContentHeaderFrame> for MessageProperties {
fn from(value: ContentHeaderFrame) -> Self {
MessageProperties::default()
MessageProperties {
content_type: value.content_type,
content_encoding: value.content_encoding,
headers: fieldtable_to_hashmap(value.headers.unwrap()),
delivery_mode: value.delivery_mode,
priority: value.priority,
correlation_id: value.correlation_id,
reply_to: value.reply_to,
expiration: value.expiration,
message_id: value.message_id,
timestamp: value.timestamp,
message_type: value.message_type,
user_id: value.user_id,
app_id: value.app_id,
}
}
}

Expand Down
19 changes: 19 additions & 0 deletions metalmq-codec/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,25 @@ impl From<ContentBodyFrame> for AMQPFrame {
}
}

pub fn fieldtable_to_hashmap(value: FieldTable) -> HashMap<String, String> {
let mut m = HashMap::new();

for (k, v) in value {
match v {
AMQPFieldValue::Bool(b) => {
m.insert(k, b.to_string());
}
AMQPFieldValue::LongString(s) => {
m.insert(k, s);
}
AMQPFieldValue::EmptyFieldTable => {}
AMQPFieldValue::FieldTable(_) => panic!("Embedded field table is not supported"),
}
}

m
}

/// Split class id and method id from `u32` combined code.
pub fn split_class_method(cm: u32) -> (u16, u16) {
let method_id = (cm & 0x0000FFFF) as u16;
Expand Down
6 changes: 3 additions & 3 deletions metalmq/src/client/channel/open_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::types::Channel;
use crate::error::Result;

impl Channel {
pub async fn handle_channel_close(&mut self, code: u16, cm: u32, text: String) -> Result<()> {
pub async fn handle_channel_close(&mut self, _code: u16, _cm: u32, _text: String) -> Result<()> {
// Cancel consumed queues on the channel
//if let Some(cq) = self.consumed_queues.remove(&channel) {
// qm::cancel_consume(
Expand Down Expand Up @@ -31,9 +31,9 @@ impl Channel {
Ok(())
}

pub async fn handle_channel_close_ok(&mut self, channel: u16) -> Result<()> {
pub async fn handle_channel_close_ok(&mut self, _channel: u16) -> Result<()> {
todo!();

Ok(())
//Ok(())
}
}
10 changes: 5 additions & 5 deletions metalmq/src/client/connection/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//
// connection close closes all the channels

use metalmq_codec::{codec::Frame, frame};
use metalmq_codec::frame;
use tokio::sync::mpsc;

use crate::{exchange, queue, tests::recv, Context};
Expand Down Expand Up @@ -48,7 +48,7 @@ async fn connection_close_clean_up_channels() {
.await
.unwrap();

let connection_tune = recv::recv_single_frame(&mut rx).await;
let _connection_tune = recv::recv_single_frame(&mut rx).await;

connection
.handle_connection_tune_ok(frame::ConnectionTuneOkArgs::default())
Expand All @@ -60,7 +60,7 @@ async fn connection_close_clean_up_channels() {
.await
.unwrap();

let connection_open_ok = recv::recv_single_frame(&mut rx).await;
let _connection_open_ok = recv::recv_single_frame(&mut rx).await;

// start a channel
// start a consumer
Expand All @@ -69,7 +69,7 @@ async fn connection_close_clean_up_channels() {

connection.handle_channel_open(11).await.unwrap();

let channel_open_ok = recv::recv_single_frame(&mut rx).await;
let _channel_open_ok = recv::recv_single_frame(&mut rx).await;

assert!(connection.channel_receivers.contains_key(&11));

Expand All @@ -94,7 +94,7 @@ async fn connection_close_clean_up_channels() {
// back all the close messages? We will leak connections... okay we have heartbeat, so it
// will close connection, but...

let channel_close = recv::recv_single_frame(&mut rx).await;
let _channel_close = recv::recv_single_frame(&mut rx).await;

connection.handle_channel_close_ok(11).await.unwrap();

Expand Down
43 changes: 37 additions & 6 deletions metalmq/src/client/connection/types.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
use std::collections::HashMap;

use log::info;
use log::{debug, info};
use metalmq_codec::codec::Frame;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
};
use uuid::Uuid;

use crate::{client::channel::types::Command, exchange, queue, Context, Result};
use crate::{
client::channel::types::Command,
config::{MAX_CHANNELS_PER_CONNECTION, MAX_FRAME_SIZE},
exchange,
queue::{
self,
manager::{QueueDeleteCommand, QueueManagerCommand},
},
Context, Result,
};

/// Status of the connection.
#[derive(Debug)]
Expand Down Expand Up @@ -79,8 +91,8 @@ impl Connection {
status: ConnectionState::Initiated,
qm: context.queue_manager,
em: context.exchange_manager,
channel_max: 2047,
frame_max: 131_072,
channel_max: MAX_CHANNELS_PER_CONNECTION,
frame_max: MAX_FRAME_SIZE,
heartbeat_interval: None,
auto_delete_exchanges: vec![],
exclusive_queues: vec![],
Expand All @@ -98,6 +110,7 @@ impl Connection {
let _ = *ch_tx;

if let Some(jh) = self.channel_handlers.remove(channel) {
debug!("Channel handler status {channel} = {}", jh.is_finished());
//jh.abort();

//let x = jh.await;
Expand Down Expand Up @@ -139,6 +152,24 @@ impl Connection {
//jh.abort();
}

for ex in &self.exclusive_queues {
let (tx, rx) = oneshot::channel();
let cmd = QueueManagerCommand::Delete(
QueueDeleteCommand {
conn_id: self.id.clone(),
channel: 1u16,
queue_name: ex.queue_name.clone(),
if_unused: false,
if_empty: false,
},
tx,
);

self.qm.send(cmd).await?;

rx.await??;
}

Ok(())
}

Expand All @@ -148,7 +179,7 @@ impl Connection {
}

if let Some(jh) = self.channel_handlers.remove(&channel) {
jh.await;
jh.await??;
}

Ok(())
Expand Down

0 comments on commit 5dd9d8f

Please sign in to comment.