Skip to content

Commit

Permalink
Binding tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasrichard committed Sep 12, 2024
1 parent 6552663 commit 690cdba
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 79 deletions.
4 changes: 2 additions & 2 deletions metalmq/src/client/channel/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ mod tests {
use tokio::sync::mpsc;

use crate::exchange;
use crate::tests::recv_timeout;
use crate::tests::recv::recv_with_timeout;

use super::*;

Expand Down Expand Up @@ -456,7 +456,7 @@ mod tests {

channel.handle_content_body(body).await?;

let cmd = recv_timeout(&mut ex_rx).await.expect("No frame received");
let cmd = recv_with_timeout(&mut ex_rx).await.expect("No frame received");

assert!(matches!(
cmd,
Expand Down
2 changes: 1 addition & 1 deletion metalmq/src/client/channel/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl Channel {

while let Some(m) = rx.recv().await {
match m {
MethodFrame(ch, cm, ma) => {
MethodFrame(_ch, _cm, ma) => {
let result = match ma {
metalmq_codec::frame::MethodFrameArgs::ChannelClose(args) => {
self.handle_channel_close(
Expand Down
3 changes: 3 additions & 0 deletions metalmq/src/client/connection.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod open_close;
pub mod router;
pub mod types;

#[cfg(test)]
mod tests;
102 changes: 102 additions & 0 deletions metalmq/src/client/connection/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Test cases
// - here we need to test if specific events made the necessary state changes in Connection
// struct
//
// connection tune with good and bad numbers
//
// connection open with bad virtual host
// - probably I have more tests when there will be multiple virtual hosts from config
//
// connection close closes all the channels

use metalmq_codec::{codec::Frame, frame};
use tokio::sync::mpsc;

use crate::{exchange, queue, tests::recv, Context};

use super::types::Connection;

fn new_context() -> Context {
let em = exchange::manager::start();
let qm = queue::manager::start(em.clone());

Context {
exchange_manager: em,
queue_manager: qm,
}
}

#[tokio::test]
async fn connection_close_clean_up_channels() {
use metalmq_codec::frame::{AMQPFrame, MethodFrameArgs};

let ctx = new_context();
let (tx, mut rx) = mpsc::channel(1);
let mut connection = Connection::new(ctx, tx);

connection.handle_client_frame(frame::AMQPFrame::Header).await.unwrap();

let connection_start = recv::recv_single_frame(&mut rx).await;

assert!(matches!(
connection_start,
AMQPFrame::Method(0, _, MethodFrameArgs::ConnectionStart(_))
));

connection
.handle_connection_start_ok(frame::ConnectionStartOkArgs::new("guest", "guest"))
.await
.unwrap();

let connection_tune = recv::recv_single_frame(&mut rx).await;

connection
.handle_connection_tune_ok(frame::ConnectionTuneOkArgs::default())
.await
.unwrap();

connection
.handle_connection_open(frame::ConnectionOpenArgs::default().virtual_host("/"))
.await
.unwrap();

let connection_open_ok = recv::recv_single_frame(&mut rx).await;

// start a channel
// start a consumer
//
// test should cancel consumer, close channel, stop channel spawned task, and close connection

connection.handle_channel_open(11).await.unwrap();

let channel_open_ok = recv::recv_single_frame(&mut rx).await;

assert!(connection.channel_receivers.contains_key(&11));

connection
.handle_connection_close(frame::ConnectionCloseArgs::default())
.await
.unwrap();

//assert!(!connection.channel_handlers.contains_key(&11));

// TODO this is a deadlock situation, here we need to have a blocking call - probably we don't
// need - which emits channel close and other messages, but it will get channel close ok
// message from client, then it can go and close the other channel or other resources, and so
// on.
// The problem however is that we block the incoming loop of the connection and if a channel
// close ok message arrives, we cannot route to the appropriate channel. So maybe connection
// close should be an implicit event, we need to
// - if we are asked to close the connection, we need to check if there are channels open
// - if not we can send back the connection close ok message and we are done
// - if not we need to start closing all the channels when all the channel close ok
// messages coming back we can close the connection. But what if the client doesn't send
// back all the close messages? We will leak connections... okay we have heartbeat, so it
// will close connection, but...

let channel_close = recv::recv_single_frame(&mut rx).await;

connection.handle_channel_close_ok(11).await.unwrap();

//let connection_close_ok = recv::recv_single_frame(&mut rx).await;
}
76 changes: 69 additions & 7 deletions metalmq/src/exchange/binding.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//! Handling the exchange-queue bindings.
//!
//! To see the list of exchange bindings and their semantic, see [AMQP
//! exchanges](https://www.rabbitmq.com/tutorials/amqp-concepts#exchanges).
use crate::{
logerr,
message::Message,
Expand All @@ -11,26 +15,47 @@ use tokio::sync::oneshot;

use super::ExchangeType;

/// A direct exchange routes the message if the routing key of the message is exactly the same
/// as the routing key of the binding. It is legal to have two bindings with the same routing key.
#[derive(Debug)]
pub struct DirectBinding {
pub routing_key: String,
pub queue_name: String,
pub queue: QueueCommandSink,
}

/// A fanout exchange routes the message to all bound queues, disregarding the routing key of the
/// message.
#[derive(Debug)]
pub struct FanoutBinding {
pub queue_name: String,
pub queue: QueueCommandSink,
}

/// Topic exchange routes the message to the queues whose routing key matches to the routing key of
/// the message.
///
/// A binding routing key can be 'price.nyse.*' or 'price.nikkei.*' so the message with routing key
/// 'price.nyse.goog' will match to the first pattern. The dot has special meaning, it separates
/// the topic path, and the '*' matches a arbitrary topic path segment, and the '#' matches one or
/// more arbitrary path segments. So 'price.#' matches all price messages.
#[derive(Debug)]
pub struct TopicBinding {
pub routing_key: String,
pub queue_name: String,
pub queue: QueueCommandSink,
}

/// Headers exchange routes messages by message header matching.
///
/// Headers exchange ignores the routing key of the message, and it matches the headers of the
/// message with the headers of the binding.
///
/// If there are more header values specified one can decide if all or any of them need to be
/// matched. This described by the 'x-match' header and the value can be 'any' or 'all'. In these
/// cases the headers starting with 'x-' are not taken into account. But if the 'x-match' is
/// 'any-with-x' or 'all-with-x', all the headers including the ones starting with 'x-' are
/// considered.
#[derive(Debug)]
pub struct HeadersBinding {
pub headers: HashMap<String, AMQPFieldValue>,
Expand All @@ -51,6 +76,7 @@ pub enum Bindings {
}

impl Bindings {
/// Create a new exchange binding.
pub fn new(exchange_type: ExchangeType) -> Self {
match exchange_type {
ExchangeType::Direct => Bindings::Direct(vec![]),
Expand Down Expand Up @@ -403,9 +429,31 @@ pub fn match_header(

#[cfg(test)]
mod tests {
use crate::queue::handler::QueueCommandSource;

use super::*;

use tokio::sync::mpsc;

fn direct_bind_queue(bindings: &mut Bindings, routing_key: &str, queue_name: &str) -> QueueCommandSource {
let (tx, rx) = mpsc::channel(1);

let result = bindings.add_direct_binding(routing_key.to_string(), queue_name.to_string(), tx);

assert!(result);

rx
}

fn new_message(exchange: &str, routing_key: &str) -> Message {
let mut message = Message::default();

message.exchange = exchange.to_string();
message.routing_key = routing_key.to_string();

message
}

#[test]
fn test_match_routing_key() {
assert!(match_routing_key("stocks.nwse.goog", "stocks.nwse.goog"));
Expand All @@ -416,15 +464,12 @@ mod tests {
}

#[tokio::test]
async fn test_direct_binding() {
let (tx, mut rx) = mpsc::channel(1);

async fn direct_binding_one_routing_key() {
let mut bindings = Bindings::new(ExchangeType::Direct);
bindings.add_direct_binding("extension.png".to_string(), "png-images".to_string(), tx);

let mut message = Message::default();
message.exchange = "images".to_string();
message.routing_key = "extension.png".to_string();
let mut rx = direct_bind_queue(&mut bindings, "extension.png", "png-images");

let message = new_message("images", "extension.png");

let result = bindings.route_message(message).await.unwrap();

Expand All @@ -434,4 +479,21 @@ mod tests {
let delivered = rx.recv().await.unwrap();
assert!(matches!(delivered, QueueCommand::PublishMessage(_)));
}

#[tokio::test]
async fn direct_binding_multiple_queues_same_routing_key() {
let mut bindings = Bindings::new(ExchangeType::Direct);

let mut jpg = direct_bind_queue(&mut bindings, "jpg-images", "extension.jpg");
let mut jpeg = direct_bind_queue(&mut bindings, "jpg-images", "extension.jpeg");

let message = new_message("images", "jpg-images");

let result = bindings.route_message(message).await.unwrap();

assert!(result.is_none());

assert!(matches!(jpg.recv().await.unwrap(), QueueCommand::PublishMessage(_)));
assert!(matches!(jpeg.recv().await.unwrap(), QueueCommand::PublishMessage(_)));
}
}
1 change: 1 addition & 0 deletions metalmq/src/queue/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tokio::sync::{mpsc, oneshot};
use self::outbox::{Outbox, OutgoingMessage};

pub type QueueCommandSink = mpsc::Sender<QueueCommand>;
pub type QueueCommandSource = mpsc::Receiver<QueueCommand>;

/// Delivery tag of a message
#[derive(Clone, Debug)]
Expand Down
16 changes: 8 additions & 8 deletions metalmq/src/queue/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;
use crate::{
error::{to_runtime_error, ErrorScope},
message::{Message, MessageContent},
tests::recv_timeout,
tests::recv::recv_with_timeout,
};
use metalmq_codec::{codec::Frame, frame::AMQPFrame};
use std::sync::Arc;
Expand Down Expand Up @@ -344,7 +344,7 @@ async fn publish_to_queue_with_one_consumer() {
let result = tester.state.handle_command(cmd).await;
assert!(result.is_ok());

let frame = recv_timeout(&mut msg_rx).await.unwrap();
let frame = recv_with_timeout(&mut msg_rx).await.unwrap();

let message = parse_message(frame).unwrap();
assert_eq!(message.message.exchange, tester.exchange_name);
Expand Down Expand Up @@ -389,14 +389,14 @@ async fn unacked_messages_should_be_put_back_in_the_queue() {
.handle_command(tester.command_publish("1st"))
.await
.unwrap();
let _msg_res = recv_timeout(&mut frx).await.unwrap();
let _msg_res = recv_with_timeout(&mut frx).await.unwrap();

tester
.state
.handle_command(tester.command_publish("2nd"))
.await
.unwrap();
let _msg_res = recv_timeout(&mut frx).await.unwrap();
let _msg_res = recv_with_timeout(&mut frx).await.unwrap();

let (rtx, rrx) = oneshot::channel();
tester
Expand Down Expand Up @@ -430,7 +430,7 @@ async fn consume_unacked_removes_messages_from_the_queue_after_send() {
.await
.unwrap();

recv_timeout(&mut frx).await;
recv_with_timeout(&mut frx).await;

assert!(tester.state.messages.is_empty());
}
Expand All @@ -447,7 +447,7 @@ async fn basic_get_then_basic_ack_deletes_the_message_from_the_queue() {

let mut frx = tester.passive_consume().await;

let fr = recv_timeout(&mut frx).await.unwrap();
let fr = recv_with_timeout(&mut frx).await.unwrap();
let msg = parse_message(fr).unwrap();

assert_eq!(msg.consumer_tag, "");
Expand Down Expand Up @@ -481,7 +481,7 @@ async fn basic_get_and_consume_without_ack_and_get_should_redeliver() {

let mut frx = tester.passive_consume().await;

let _frame = recv_timeout(&mut frx).await.unwrap();
let _frame = recv_with_timeout(&mut frx).await.unwrap();

tester
.state
Expand All @@ -490,7 +490,7 @@ async fn basic_get_and_consume_without_ack_and_get_should_redeliver() {
.unwrap();

let mut frx = tester.passive_consume().await;
let fr = recv_timeout(&mut frx).await.unwrap();
let fr = recv_with_timeout(&mut frx).await.unwrap();

println!("{:?}", fr);

Expand Down
8 changes: 4 additions & 4 deletions metalmq/src/tests/consume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ async fn one_consumer() {
.basic_consume(1, BasicConsumeArgs::default().queue("q-direct").consumer_tag("ctag"))
.await;

let consume_ok = test_client.recv_frames().await;
let consume_ok = test_client.recv_single_frame().await;

assert!(matches!(
dbg!(consume_ok.get(0)).unwrap(),
dbg!(consume_ok),
frame::AMQPFrame::Method(
1,
_,
Expand Down Expand Up @@ -63,7 +63,7 @@ async fn one_consumer_redeliver() {
test_client
.basic_consume(1, BasicConsumeArgs::default().queue("q-direct").consumer_tag("ctag"))
.await;
test_client.recv_frames().await;
test_client.recv_single_frame().await;

// Publish a message
test_client
Expand All @@ -82,7 +82,7 @@ async fn one_consumer_redeliver() {
test_client
.basic_consume(3, BasicConsumeArgs::default().queue("q-direct").consumer_tag("ctag2"))
.await;
test_client.recv_frames().await;
test_client.recv_single_frame().await;

// Receive the message again
let mut deliver = test_client.recv_frames().await;
Expand Down
Loading

0 comments on commit 690cdba

Please sign in to comment.