Skip to content

Commit

Permalink
Start to refactor server client channel
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasrichard committed Aug 23, 2024
1 parent b96e732 commit c8425bd
Show file tree
Hide file tree
Showing 12 changed files with 292 additions and 204 deletions.
8 changes: 8 additions & 0 deletions metalmq-codec/src/frame/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ impl BasicPublishArgs {
super::MethodFrameArgs::BasicPublish(self),
)
}

pub fn is_mandatory(&self) -> bool {
self.flags.contains(BasicPublishFlags::MANDATORY)
}

pub fn is_immediate(&self) -> bool {
self.flags.contains(BasicPublishFlags::IMMEDIATE)
}
}

#[derive(Clone, Debug, Default)]
Expand Down
83 changes: 83 additions & 0 deletions metalmq/src/client/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use metalmq_codec::{
codec::Frame,
frame::{self, ContentBodyFrame, ContentHeaderFrame},
};
use tokio::sync::mpsc;

use crate::{queue::handler as queue_handler, ErrorScope, Result, RuntimeError};

pub mod open_close;

#[derive(Debug)]
pub enum ChannelError {
ContentTooLarge = 311,
NoRoute = 312,
NoConsumers = 313,
AccessRefused = 403,
NotFound = 404,
ResourceLocked = 405,
PreconditionFailed = 406,
}

/// Queues consumed by the connection with Basic.Consume
#[derive(Debug)]
pub struct ActivelyConsumedQueue {
pub queue_name: String,
pub consumer_tag: String,
pub queue_sink: queue_handler::QueueCommandSink,
}

/// Queues consumed by the connection with Basic.Get
#[derive(Debug)]
pub struct PassivelyConsumedQueue {
pub queue_name: String,
pub consumer_tag: String,
pub delivery_tag: u64,
pub queue_sink: queue_handler::QueueCommandSink,
}

#[derive(Debug, Default)]
pub struct PublishedContent {
pub source_connection: String,
pub channel: u16,
pub exchange: String,
pub routing_key: String,
pub mandatory: bool,
pub immediate: bool,
/// The method frame class id which initiated the sending of the content.
pub method_frame_class_id: u32,
pub content_header: ContentHeaderFrame,
pub content_bodies: Vec<ContentBodyFrame>,
pub body_size: usize,
}

/// Represents a channel
#[derive(Debug)]
pub struct Channel {
pub number: u16,
pub consumed_queue: Option<ActivelyConsumedQueue>,
pub in_flight_content: Option<PublishedContent>,
pub confirm_mode: bool,
pub next_confirm_delivery_tag: u64,
pub outgoing: mpsc::Sender<Frame>,
}

/// Helper to create channel error frames.
pub fn channel_error<T>(channel: u16, cm: u32, code: ChannelError, text: &str) -> Result<T> {
Err(Box::new(RuntimeError {
scope: ErrorScope::Channel,
channel,
code: code as u16,
text: text.to_owned(),
class_method: cm,
}))
}

pub fn runtime_error_to_frame(rte: &RuntimeError) -> Frame {
let amqp_frame = match rte.scope {
ErrorScope::Connection => frame::connection_close(rte.code, &rte.text, rte.class_method),
ErrorScope::Channel => frame::channel_close(rte.channel, rte.code, &rte.text, rte.class_method),
};

Frame::Frame(amqp_frame)
}
File renamed without changes.
15 changes: 13 additions & 2 deletions metalmq/src/client/conn.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
client::{self, state::Connection, to_runtime_error},
client::{self, connection, state::Connection, to_runtime_error},
Context, Result,
};
use futures::{
Expand Down Expand Up @@ -210,6 +210,17 @@ async fn handle_in_stream_data(conn: &mut Connection, data: Frame) -> Result<boo
}
}

// FIXME new design is needed
//
// In the new design each channel should be a spawned thread, because the goal of the channels is
// that the client and the server can send messages parallelly.
//
// So all of these fn calls will be message sendings with an return value mpsc channel here. When
// we send out a message to a channel, we already have that return value channel cloned in the
// channel. So we can listen the messages coming back and we can react on that. The return value
// channel is only good for handling the errors because the reponse AMQP frames will be sent out
// via the cloned outgoing channel.

async fn handle_client_frame(conn: &mut Connection, f: AMQPFrame) -> Result<()> {
use AMQPFrame::*;

Expand All @@ -223,7 +234,7 @@ async fn handle_client_frame(conn: &mut Connection, f: AMQPFrame) -> Result<()>
ContentBody(cb) => conn.receive_content_body(cb).await,
Heartbeat(0) => Ok(()),
Heartbeat(_) => {
client::connection_error(0, client::ConnectionError::FrameError, "Heartbeat must have channel 0")
connection::connection_error(0, client::ConnectionError::FrameError, "Heartbeat must have channel 0")
}
}
}
Expand Down
36 changes: 36 additions & 0 deletions metalmq/src/client/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use metalmq_codec::codec::Frame;

use crate::{ErrorScope, Result, RuntimeError};

// TODO here goes the connection related business logic
#[derive(Debug)]
pub enum ConnectionError {
ConnectionForced = 320,
InvalidPath = 402,
AccessRefused = 403,
FrameError = 501,
SyntaxError = 502,
CommandInvalid = 503,
ChannelError = 504,
UnexpectedFrame = 505,
ResourceError = 506,
NotAllowed = 530,
NotImplemented = 540,
InternalError = 541,
}

/// Helper to create connection error frames.
pub fn connection_error<T>(cm: u32, code: ConnectionError, text: &str) -> Result<T> {
Err(Box::new(RuntimeError {
scope: ErrorScope::Connection,
channel: 0,
code: code as u16,
text: text.to_owned(),
class_method: cm,
}))
}

/// Convert ConnectionError to connection close frame.
pub fn connection_error_frame(cm: u32, code: ConnectionError, text: &str) -> Frame {
metalmq_codec::codec::Frame::Frame(metalmq_codec::frame::connection_close(code as u16, text, cm))
}
69 changes: 4 additions & 65 deletions metalmq/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,11 @@
pub mod channel;
pub mod conn;
pub mod connection;
pub mod state;

use crate::{ErrorScope, Result, RuntimeError};
use metalmq_codec::codec::Frame;
use metalmq_codec::frame::{self, Channel};
use connection::ConnectionError;

#[derive(Debug)]
pub enum ConnectionError {
ConnectionForced = 320,
InvalidPath = 402,
AccessRefused = 403,
FrameError = 501,
SyntaxError = 502,
CommandInvalid = 503,
ChannelError = 504,
UnexpectedFrame = 505,
ResourceError = 506,
NotAllowed = 530,
NotImplemented = 540,
InternalError = 541,
}

#[derive(Debug)]
pub enum ChannelError {
ContentTooLarge = 311,
NoRoute = 312,
NoConsumers = 313,
AccessRefused = 403,
NotFound = 404,
ResourceLocked = 405,
PreconditionFailed = 406,
}

/// Helper to create connection error frames.
pub fn connection_error<T>(cm: u32, code: ConnectionError, text: &str) -> Result<T> {
Err(Box::new(RuntimeError {
scope: ErrorScope::Connection,
channel: 0,
code: code as u16,
text: text.to_owned(),
class_method: cm,
}))
}

/// Convert ConnectionError to connection close frame.
pub fn connection_error_frame(cm: u32, code: ConnectionError, text: &str) -> Frame {
Frame::Frame(frame::connection_close(code as u16, text, cm))
}
use crate::{ErrorScope, RuntimeError};

//pub fn connection_error_frame(err: RuntimeError) -> Option<Frame> {
// if err.scope == ErrorScope::Channel {
Expand All @@ -62,26 +21,6 @@ pub fn connection_error_frame(cm: u32, code: ConnectionError, text: &str) -> Fra
// )))
//}

/// Helper to create channel error frames.
pub fn channel_error<T>(channel: Channel, cm: u32, code: ChannelError, text: &str) -> Result<T> {
Err(Box::new(RuntimeError {
scope: ErrorScope::Channel,
channel,
code: code as u16,
text: text.to_owned(),
class_method: cm,
}))
}

pub fn runtime_error_to_frame(rte: &RuntimeError) -> Frame {
let amqp_frame = match rte.scope {
ErrorScope::Connection => frame::connection_close(rte.code, &rte.text, rte.class_method),
ErrorScope::Channel => frame::channel_close(rte.channel, rte.code, &rte.text, rte.class_method),
};

Frame::Frame(amqp_frame)
}

/// Converts all errors as `RuntimeError`. Unknown errors are wrapped as internal connection
/// errors.
pub fn to_runtime_error(err: Box<dyn std::error::Error + Send + Sync>) -> RuntimeError {
Expand Down
44 changes: 2 additions & 42 deletions metalmq/src/client/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Do we need to expose the messages of a 'process' or hide it in an erlang-style?
use crate::{
client,
client::{channel, ChannelError},
exchange::{handler::ExchangeCommandSink, manager as em},
logerr,
queue::{handler as queue_handler, manager as qm},
Expand All @@ -15,36 +16,18 @@ use std::collections::HashMap;
use tokio::sync::mpsc;
use uuid::Uuid;

use super::{channel_error, ChannelError};

pub mod basic;
pub mod channel;
pub mod connect;
pub mod exchange;
pub mod queue;
pub mod receive_message;

#[cfg(test)]
mod tests;

// TODO all field of all struct are pub which is not good and it is because of tests

/// Queues consumed by the connection with Basic.Consume
#[derive(Debug)]
pub struct ActivelyConsumedQueue {
pub queue_name: String,
pub consumer_tag: String,
pub queue_sink: queue_handler::QueueCommandSink,
}

/// Queues consumed by the connection with Basic.Get
#[derive(Debug)]
pub struct PassivelyConsumedQueue {
pub queue_name: String,
pub consumer_tag: String,
pub delivery_tag: u64,
pub queue_sink: queue_handler::QueueCommandSink,
}

/// Exclusive queue declared by the connection
#[derive(Debug)]
pub struct ExclusiveQueue {
Expand Down Expand Up @@ -86,29 +69,6 @@ pub struct Connection {
pub outgoing: mpsc::Sender<Frame>,
}

/// Represents a channel
#[derive(Debug)]
pub struct ChannelState {
/// The channel number
pub channel: Channel,
/// The outgoing frame channel.
pub frame_sink: mpsc::Sender<Frame>,
}

#[derive(Debug, Default)]
pub struct PublishedContent {
pub channel: Channel,
pub exchange: String,
pub routing_key: String,
pub mandatory: bool,
pub immediate: bool,
/// The method frame class id which initiated the sending of the content.
pub method_frame_class_id: u32,
pub content_header: ContentHeaderFrame,
pub content_bodies: Vec<ContentBodyFrame>,
pub body_size: usize,
}

#[macro_export]
macro_rules! handle_error {
($self:expr, $val:expr) => {
Expand Down
Loading

0 comments on commit c8425bd

Please sign in to comment.