Skip to content

Commit

Permalink
New Command type for Channel input
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasrichard committed Sep 4, 2024
1 parent 985886a commit 4ed2bcb
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 64 deletions.
2 changes: 1 addition & 1 deletion metalmq/src/client/channel/open_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Channel {
Ok(())
}

pub async fn handle_channel_close(&mut self, channel: u16) -> Result<()> {
pub async fn handle_channel_close(&mut self, code: u16, cm: u32, text: String) -> Result<()> {
// Cancel consumed queues on the channel
//if let Some(cq) = self.consumed_queues.remove(&channel) {
// qm::cancel_consume(
Expand Down
35 changes: 27 additions & 8 deletions metalmq/src/client/channel/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,20 @@ use crate::exchange;
use crate::queue;

use metalmq_codec::codec::Frame;
use metalmq_codec::frame::unify_class_method;
use metalmq_codec::frame::MethodFrameArgs;
use metalmq_codec::frame::{AMQPFrame, ContentBodyFrame, ContentHeaderFrame};

use tokio::sync::mpsc;

#[derive(Debug)]
pub enum Command {
MethodFrame(u16, u32, MethodFrameArgs),
Close(u16, u32, String),
ContentHeader(ContentHeaderFrame),
ContentBody(ContentBodyFrame),
}

/// Queues consumed by the connection with Basic.Consume
#[derive(Debug)]
pub struct ActivelyConsumedQueue {
Expand Down Expand Up @@ -57,12 +67,21 @@ pub struct Channel {
}

impl Channel {
pub async fn handle_message(&mut self, mut rx: mpsc::Receiver<AMQPFrame>) -> Result<()> {
while let Some(f) = rx.recv().await {
match f {
AMQPFrame::Method(ch, cm, ma) => {
pub async fn handle_message(&mut self, mut rx: mpsc::Receiver<Command>) -> Result<()> {
use Command::*;

while let Some(m) = rx.recv().await {
match m {
MethodFrame(ch, cm, ma) => {
let result = match ma {
metalmq_codec::frame::MethodFrameArgs::ChannelClose(_) => self.handle_channel_close(ch).await,
metalmq_codec::frame::MethodFrameArgs::ChannelClose(args) => {
self.handle_channel_close(
args.code,
unify_class_method(args.class_id, args.method_id),
args.text,
)
.await
}
metalmq_codec::frame::MethodFrameArgs::ChannelCloseOk => todo!(),
metalmq_codec::frame::MethodFrameArgs::ExchangeDeclare(args) => {
self.handle_exchange_declare(args).await
Expand Down Expand Up @@ -98,9 +117,9 @@ impl Channel {
_ => unreachable!(),
};
}
AMQPFrame::ContentHeader(_) => todo!(),
AMQPFrame::ContentBody(_) => todo!(),
_ => unreachable!(),
Close(reason, cm, text) => self.handle_channel_close(reason, cm, text).await?,
ContentHeader(header) => self.handle_content_header(header).await?,
ContentBody(body) => self.handle_content_body(body).await?,
}
}

Expand Down
4 changes: 3 additions & 1 deletion metalmq/src/client/connection/open_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,14 @@ impl Connection {
.to_result(frame::CHANNEL_OPEN, "NOT_ALLOWED - Channel number is too large");
}

self.start_channel(channel).await?;

self.send_frame(Frame::Frame(frame::channel_open_ok(channel))).await?;

Ok(())
}

pub async fn handle_channel_close(&mut self, channel: u16, _args: frame::ChannelCloseArgs) -> Result<()> {
pub async fn handle_channel_close(&mut self, channel: u16) -> Result<()> {
// TODO delete exclusive queues

self.close_channel(channel).await?;
Expand Down
159 changes: 110 additions & 49 deletions metalmq/src/client/connection/router.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,55 @@
use std::collections::HashMap;

use log::error;
use metalmq_codec::{
codec::Frame,
frame::{AMQPFrame, ConnectionStartArgs, MethodFrameArgs},
frame::{self, unify_class_method, AMQPFrame, ConnectionStartArgs, MethodFrameArgs},
};
use tokio::sync::mpsc;

use crate::{
client::{channel::types::Channel, connection::types::Connection},
client::{
channel::types::{Channel, Command},
connection::types::Connection,
},
error::{to_runtime_error, ConnectionError, ErrorScope, Result, RuntimeError},
};

impl Connection {
pub async fn handle_client_frame(&mut self, f: AMQPFrame) -> Result<()> {
// TODO here we need to handle the error
// if it is connection error, close the conn and before sending the connection close
// message, send all the cleaning, like channel close, basic consume end, ets
// if it is a channel error, close the channel only and send out the channel close message
//
// if it is other error, log it and close the connection
/// Handles a frame coming from the client and return with `Ok(true)` if the connection should
/// keep open. In case of `Ok(false)` the connection loop should close the connection gently,
/// but all the necessary communication had already happened.
pub async fn handle_client_frame(&mut self, f: AMQPFrame) -> Result<bool> {
use AMQPFrame::*;

let result = match &f {
// During the processing of method frames can happen only that client closes the
// connection. In that case the `handle_method_frame` returns with `Ok(false)` namely that
// 'should we keep the loop running' = 'no'.
//
// So we need to return that value to the caller in order that connection loop can stop
// itself. In this use-case we don't need to close the resources because
// `handle_connection_close` already did that.
//
// If we got an error from anywhere else, we need to check what we need to clean up. If it
// is a channel error, we just need to close the channel. If it is a connection error, we
// need to iterate over the channels and close the channels, and then the connection.
//
// Is it a question if we need to send messages out to the client (probably no) during that
// internal cleanup.
let result = match f {
Header => self.send_frame(Frame::Frame(ConnectionStartArgs::new().frame())).await,
Method(_, _, _) => self.handle_method_frame(f).await,
ContentHeader(ch) => self.send_command_to_channel(ch.channel, f).await,
ContentBody(cb) => self.send_command_to_channel(cb.channel, f).await,
Method(ch, cm, ma) => match self.handle_method_frame(ch, cm, ma).await {
Ok(keep_running) => return Ok(keep_running),
Err(e) => Err(e),
},
ContentHeader(header) => {
self.send_command_to_channel(header.channel, Command::ContentHeader(header))
.await
}
ContentBody(body) => {
self.send_command_to_channel(body.channel, Command::ContentBody(body))
.await
}
Heartbeat(0) => Ok(()),
Heartbeat(_) => ConnectionError::FrameError.to_result(0, "Heartbeat must have channel 0"),
};
Expand Down Expand Up @@ -60,49 +84,82 @@ impl Connection {
};
}

Ok(())
Ok(true)
}

async fn handle_method_frame(&mut self, f: AMQPFrame) -> Result<()> {
let (channel, class_method) = match &f {
AMQPFrame::Method(channel, cm, _) => (channel, cm),
_ => unreachable!(),
};
async fn handle_method_frame(&mut self, channel: u16, class_method: u32, ma: MethodFrameArgs) -> Result<bool> {
use MethodFrameArgs::*;

let mut ch_tx = None;

// If it is not connection class frame, we need to look up the channel.
if class_method >> 16 != 0x000A && class_method != frame::CHANNEL_OPEN {
ch_tx = self.channel_receivers.get(&channel);

// We cannot unpack the Option, since we handle connection frames which obviously don't
// belong to any channel.
if ch_tx.is_none() {
return ConnectionError::ChannelError.to_result(class_method, "Channel not exist");
}
}

match ma {
ConnectionStart(_) => unreachable!(),
ConnectionStartOk(args) => {
self.handle_connection_start_ok(args).await?;

Ok(true)
}
ConnectionTune(_) => unreachable!(),
ConnectionTuneOk(args) => {
self.handle_connection_tune_ok(args).await?;

// WARN this is quite ugly and the intent is not clear
match class_method >> 16 {
0x000A => self.handle_connection_command(f).await,
_ if *class_method == metalmq_codec::frame::CHANNEL_OPEN => {
if self.channel_receivers.contains_key(channel) {
ConnectionError::ChannelError.to_result(*class_method, "CHANNEL_ERROR - Channel is already opened")
Ok(true)
}
ConnectionOpen(args) => {
self.handle_connection_open(args).await?;

Ok(true)
}
ConnectionOpenOk => unreachable!(),
ConnectionClose(args) => {
self.handle_connection_close(args).await?;

Ok(false)
}
ChannelOpen => {
if ch_tx.is_some() {
ConnectionError::ChannelError.to_result(class_method, "Channel already exist")
} else {
self.start_channel(*channel).await
self.handle_channel_open(channel).await?;
//self.start_channel(channel).await?;

Ok(true)
}
}
_ => self.send_command_to_channel(*channel, f).await,
}
}
ChannelClose(args) => {
let cmd = Command::Close(args.code, unify_class_method(args.class_id, args.method_id), args.text);

async fn handle_connection_command(&mut self, f: AMQPFrame) -> Result<()> {
use MethodFrameArgs::*;
self.send_command_to_channel(channel, cmd).await?;
self.handle_channel_close(channel).await?;

match f {
AMQPFrame::Method(_, _cm, mf) => match mf {
ConnectionStartOk(args) => self.handle_connection_start_ok(args).await,
ConnectionTuneOk(args) => self.handle_connection_tune_ok(args).await,
ConnectionOpen(args) => self.handle_connection_open(args).await,
ConnectionClose(args) => self.handle_connection_close(args).await,
_ => unreachable!(),
},
Ok(true)
}
// TODO here we need to handle channel close, because we need to remove the channel
// receiver from the hash map. It is a question though where to handle all of that.
// This should send a message to the channel, that can have a change to cancel the
// consume of queues, etc. Then this code should have a chance to execute something.
_ => {
unreachable!()
let cmd = Command::MethodFrame(channel, class_method, ma);

self.send_command_to_channel(channel, cmd).await?;

Ok(true)
}
}
}

async fn start_channel(&mut self, channel_number: u16) -> Result<()> {
self.handle_channel_open(channel_number).await?;

pub async fn start_channel(&mut self, channel_number: u16) -> Result<()> {
let mut channel = Channel {
source_connection: self.id.clone(),
number: channel_number,
Expand Down Expand Up @@ -133,13 +190,17 @@ impl Connection {
Ok(())
}

async fn send_command_to_channel(&self, ch: u16, f: AMQPFrame) -> Result<()> {
if let Some(ch_tx) = self.channel_receivers.get(&ch) {
ch_tx.send(f).await?;
pub async fn send_command_to_channel(&self, channel: u16, cmd: Command) -> Result<()> {
if let Some(ch_tx) = self.channel_receivers.get(&channel) {
if let Err(e) = ch_tx.send(cmd).await {
error!("Cannot send frame to channel handler {:?}", e);

ConnectionError::InternalError.to_result(0, "Internal error")
} else {
Ok(())
}
} else {
return ConnectionError::ChannelError.to_result(0, "No such channel");
ConnectionError::ChannelError.to_result(0, "Channel not exist")
}

Ok(())
}
}
6 changes: 3 additions & 3 deletions metalmq/src/client/connection/types.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashMap;

use log::info;
use metalmq_codec::{codec::Frame, frame::AMQPFrame};
use metalmq_codec::codec::Frame;
use tokio::{sync::mpsc, task::JoinHandle};
use uuid::Uuid;

use crate::{exchange, queue, Context, Result};
use crate::{client::channel::types::Command, exchange, queue, Context, Result};

/// Exclusive queue declared by the connection
#[derive(Debug)]
Expand All @@ -29,7 +29,7 @@ pub struct Connection {
pub qm: queue::manager::QueueManagerSink,
pub em: exchange::manager::ExchangeManagerSink,
pub channel_handlers: HashMap<u16, JoinHandle<Result<()>>>,
pub channel_receivers: HashMap<u16, mpsc::Sender<AMQPFrame>>,
pub channel_receivers: HashMap<u16, mpsc::Sender<Command>>,
/// Sink for AMQP frames toward the client
pub outgoing: mpsc::Sender<Frame>,
}
Expand Down
4 changes: 3 additions & 1 deletion metalmq/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ impl From<RuntimeError> for AMQPFrame {

impl From<RuntimeError> for Frame {
fn from(value: RuntimeError) -> Self {
value.into()
let f: AMQPFrame = value.into();

Frame::Frame(f)
}
}

Expand Down
32 changes: 32 additions & 0 deletions metalmq/src/tests/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,35 @@ async fn connect_with_username_password() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn connect_and_open_channel() -> Result<()> {
let test_case = TestCase::new().await;
let mut client = test_case.new_connected_client(1).await;

client
.connection
.handle_client_frame(frame::channel_close(1, 200, "Normal close", frame::CHANNEL_CLOSE).into())
.await?;

let channel_close_ok = client.recv_single_frame().await;

assert!(matches!(
dbg!(channel_close_ok),
AMQPFrame::Method(1, frame::CHANNEL_CLOSE_OK, frame::MethodFrameArgs::ChannelCloseOk)
));

client
.connection
.handle_client_frame(frame::connection_close(200, "Normal close", frame::CONNECTION_CLOSE).into())
.await?;

let connection_close_ok = client.recv_single_frame().await;

assert!(matches!(
dbg!(connection_close_ok),
AMQPFrame::Method(_, frame::CONNECTION_CLOSE_OK, frame::MethodFrameArgs::ConnectionCloseOk)
));

Ok(())
}
2 changes: 1 addition & 1 deletion metalmq/src/tests/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn bind_queue_with_validation() {
let exchange_declare_ok = client.send_frame_with_response(args.frame(1)).await;

assert!(matches!(
exchange_declare_ok,
dbg!(exchange_declare_ok),
frame::AMQPFrame::Method(1, _, frame::MethodFrameArgs::ExchangeDeclareOk)
));

Expand Down

0 comments on commit 4ed2bcb

Please sign in to comment.