Skip to content

Commit

Permalink
Killing some TODOs
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasrichard committed Oct 7, 2024
1 parent fd1f223 commit 37284e1
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 160 deletions.
6 changes: 1 addition & 5 deletions amqp-compliance/test_connect.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pika
import pika.exceptions
import pytest

import helper
Expand All @@ -18,11 +19,6 @@ def test_connect_fail_bad_password():
with pytest.raises(pika.exceptions.ProbableAuthenticationError) as exp:
helper.connect(password="pwd")

#breakpoint()
#assert 403 == exp.value.reply_code
# TODO how to check reply code?
#assert str(exp.value.reply_text).startswith("ACCESS_REFUSED")

def test_reopen_the_same_channel():
"""
Open the same channel results in a channel error.
Expand Down
54 changes: 37 additions & 17 deletions metalmq/src/client/channel/basic.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time::Duration;

use log::{error, warn};
use metalmq_codec::{codec::Frame, frame};
use tokio::sync::oneshot;
Expand All @@ -14,9 +12,18 @@ use crate::{

impl Channel {
pub async fn handle_basic_publish(&mut self, args: frame::BasicPublishArgs) -> Result<()> {
if args.is_immediate() {
return ConnectionError::NotImplemented.into_result(
frame::BASIC_PUBLISH,
"NOT_IMPLEMENTED - Immediate publish is not implemented",
);
}

if self.in_flight_content.is_some() {
return ConnectionError::UnexpectedFrame
.into_result(frame::BASIC_PUBLISH, "Already publish message arrived");
return ConnectionError::CommandInvalid.into_result(
frame::BASIC_PUBLISH,
"COMMAND_INVALID - Already publish message arrived",
);
}

// Check if exchange exists, and cache it in order that `handle_content_body` can access
Expand Down Expand Up @@ -111,23 +118,36 @@ impl Channel {
/// A message can be acked more than once. If a non-delivered message is acked, a channel
/// exception will be raised.
pub async fn handle_basic_ack(&mut self, args: frame::BasicAckArgs) -> Result<()> {
// TODO check if only delivered messages are acked, even multiple times
if self.next_confirm_delivery_tag.is_none() {
return ChannelError::PreconditionFailed.into_result(
self.number,
frame::BASIC_ACK,
"PRECONDITION_FAILED - Acking not even used delivery tag",
);
}

if let Some(dt) = self.next_confirm_delivery_tag {
if args.delivery_tag > dt {
return ChannelError::PreconditionFailed.into_result(
self.number,
frame::BASIC_ACK,
"PRECONDITION_FAILED - Acking not even used delivery tag",
);
}
}

match &self.consumed_queue {
Some(cq) => {
let (tx, rx) = oneshot::channel();

// TODO why we need here the timeout?
cq.queue_sink
.send_timeout(
queue::handler::QueueCommand::AckMessage(queue::handler::AckCmd {
channel: self.number,
consumer_tag: cq.consumer_tag.clone(),
delivery_tag: args.delivery_tag,
multiple: args.multiple,
result: tx,
}),
Duration::from_secs(1),
)
.send(queue::handler::QueueCommand::AckMessage(queue::handler::AckCmd {
channel: self.number,
consumer_tag: cq.consumer_tag.clone(),
delivery_tag: args.delivery_tag,
multiple: args.multiple,
result: tx,
}))
.await?;

rx.await.unwrap()?
Expand Down Expand Up @@ -270,7 +290,7 @@ impl Channel {

if let Some(ex_tx) = self.exchanges.get(&msg.exchange) {
// In confirm mode or if message is mandatory we need to be sure that the message
// was router correctly from the exchange to the queue.
// was routed correctly from the exchange to the queue.
let (rtx, rrx) = match msg.mandatory || self.next_confirm_delivery_tag.is_some() {
false => (None, None),
true => {
Expand Down
15 changes: 7 additions & 8 deletions metalmq/src/client/channel/exchange.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use metalmq_codec::{codec::Frame, frame};
use metalmq_codec::frame;

use crate::{
error::Result,
Expand Down Expand Up @@ -32,7 +32,7 @@ impl Channel {

if !no_wait {
self.outgoing
.send(Frame::Frame(frame::exchange_declare_ok(self.number)))
.send(frame::exchange_declare_ok(self.number).into())
.await?
}

Expand All @@ -47,13 +47,12 @@ impl Channel {
exchange_name: args.exchange_name,
};

exchange::manager::delete_exchange(&self.em, cmd).await?;

// TODO what happens if the previous code returns with an error and we never removes that
// exchange?
// At first we remove the exchange from the local cache in order that the next call fails
// we don't have inconsistent state.
self.exchanges.remove(&exchange_name);

self.send_frame(Frame::Frame(frame::exchange_delete_ok(self.number)))
.await
exchange::manager::delete_exchange(&self.em, cmd).await?;

self.send_frame(frame::exchange_delete_ok(self.number).into()).await
}
}
31 changes: 0 additions & 31 deletions metalmq/src/client/channel/open_close.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,7 @@
use metalmq_codec::frame;

use super::types::Channel;
use crate::error::Result;

impl Channel {
pub async fn handle_connection_close(&mut self) -> Result<()> {
// TODO should be here just to close all channels, not repeating the channel close logic
// Most of the time we have all channels closed at this point, but what if the connection
// has been cut and client didn't have a chance to close everything properly?
//for (channel, cq) in self.consumed_queues.drain() {
// let cmd = qm::QueueCancelConsume {
// channel,
// queue_name: cq.queue_name.clone(),
// consumer_tag: cq.consumer_tag.clone(),
// };

// logerr!(qm::cancel_consume(&self.qm, cmd).await);
//}

//for qs in &self.exclusive_queues {
// qm::queue_deleted(
// &self.qm,
// qm::QueueDeletedEvent {
// queue: qs.queue_name.clone(),
// },
// )
// .await
// .unwrap();
//}

// TODO cleanup, like close all channels, delete temporal queues, etc
Ok(())
}

pub async fn handle_channel_close(&mut self, code: u16, cm: u32, text: String) -> Result<()> {
// Cancel consumed queues on the channel
//if let Some(cq) = self.consumed_queues.remove(&channel) {
Expand Down
4 changes: 2 additions & 2 deletions metalmq/src/client/connection/open_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ impl Connection {
if channel > self.channel_max {
warn!("Channel number is too big: {channel}");

return ConnectionError::NotAllowed
.into_result(frame::CHANNEL_OPEN, "NOT_ALLOWED - Channel number is too large");
return ConnectionError::ResourceError
.into_result(frame::CHANNEL_OPEN, "RESOURCE_ERROR - Channel number is too large");
}

self.start_channel(channel).await?;
Expand Down
23 changes: 23 additions & 0 deletions metalmq/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,53 @@ pub type Result<T> = std::result::Result<T, Error>;
/// Shorthand of a boxed Send, Sync error.
pub type Error = Box<dyn std::error::Error + Send + Sync>;

/// Connection scope errors
#[derive(Debug)]
pub enum ConnectionError {
/// The server forced to close the connection.
ConnectionForced = 320,
/// The client tried to work with an invalid virtual host.
InvalidPath = 402,
/// The client tried to access a resource it didn't have access.
AccessRefused = 403,
/// The client sent an invalid AMQP frame.
FrameError = 501,
/// The client sent a frame which contains erroneous data.
SyntaxError = 502,
/// The client sent a frame which didn't fit in the normal order.
CommandInvalid = 503,
/// The client tried to access a non-existing or not-opened channel.
ChannelError = 504,
/// The client sent an unexpected content header or body frame.
UnexpectedFrame = 505,
/// The client tried to exceed the limits of the connection agreed during connection tune.
ResourceError = 506,
/// The client tried to work with an entity in a way which is not allowed by the server.
NotAllowed = 530,
/// The client tried to use a not implemented funcionality.
NotImplemented = 540,
/// The server couldn't fulfill the request because of an intermittent error.
InternalError = 541,
}

/// Channel scope errors
#[derive(Debug)]
pub enum ChannelError {
/// Denotes successful execution like connection or channel closed.
Success = 200,
/// The client attempted to transfer a message which exceeded the limits.
ContentTooLarge = 311,
/// The mandatory message cannot be routed to queues.
NoRoute = 312,
/// The immediate message cannot be delivered to consumers in the absence of consumers.
NoConsumers = 313,
/// The client tried to access a resource it didn't have access.
AccessRefused = 403,
/// Queue or entity cannot be found.
NotFound = 404,
/// The client cannot access a resource because another client is working on what.
ResourceLocked = 405,
/// The work on resource is refused mostly because of validation errors.
PreconditionFailed = 406,
}

Expand Down
1 change: 0 additions & 1 deletion metalmq/src/exchange/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ impl ExchangeState {
returned: Option<oneshot::Sender<Option<Arc<Message>>>>,
) -> Result<()> {
if let Some(failed_message) = self.bindings.route_message(message).await? {
// TODO handle immediate somewhere, too
if failed_message.mandatory {
returned.unwrap().send(Some(failed_message)).unwrap();
} else if let Some(r) = returned {
Expand Down
96 changes: 0 additions & 96 deletions metalmq/tests/it/consume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ async fn two_consumers_exclusive_queue_error() -> Result<()> {
)
.await?;

// TODO write another test to get 405 - resource locker for consuming exclusive queue
//
ch.queue_declare(QUEUE, QueueDeclareOpts::default()).await?;

ch.queue_bind(QUEUE, EXCHANGE, Binding::Direct("".to_string())).await?;
Expand All @@ -99,97 +97,3 @@ async fn two_consumers_exclusive_queue_error() -> Result<()> {

Ok(())
}

//#[tokio::test]
//async fn three_consumers_consume_roughly_the_same_number_of_messages() -> Result<()> {
// use std::collections::HashMap;
// use std::sync::{Arc, Mutex};
// use tokio::sync::broadcast;
//
// let mut producer = helper::default().connect().await?;
// let channel = producer.channel_open(12).await?;
//
// channel.queue_delete("3-queue", false, false).await?;
// channel.exchange_delete("3-exchange", false).await?;
//
// helper::declare_exchange_queue(&channel, "3-exchange", "3-queue").await?;
//
// let mut consumers = vec![];
// let mut channels = vec![];
// let total_message_count = Arc::new(Mutex::new(0u32));
// let message_count = Arc::new(Mutex::new(HashMap::<u16, u32>::new()));
// let (acked_tx, mut acked_rx) = broadcast::channel::<u32>(8);
// let max = 30u32;
//
// for i in 0..3u16 {
// let mut consumer = helper::default().connect().await?;
// let ch = consumer.channel_open(12).await?;
// let msg_count = message_count.clone();
// let total_count = total_message_count.clone();
//
// message_count.lock().unwrap().insert(i, 0);
//
// let counter = move |ci: ConsumerSignal| {
// match ci {
// ConsumerSignal::Delivered(m) => {
// // Count the messages per consumer number
// let mut mc = msg_count.lock().unwrap();
// if let Some(c) = mc.get_mut(&i) {
// *c += 1;
// }
//
// // Count the total number of messages
// let mut tc = total_count.lock().unwrap();
// *tc += 1;
//
// ConsumerResponse {
// result: None,
// ack: ConsumerAck::Ack {
// delivery_tag: m.delivery_tag,
// multiple: false,
// },
// }
// }
// _ => ConsumerResponse {
// result: Some(()),
// ack: ConsumerAck::Nothing,
// },
// }
// };
//
// ch.basic_consume("3-queue", &format!("ctag-{}", i), None, Box::new(counter))
// .await?;
//
// consumers.push(consumer);
// channels.push(ch);
// }
//
// for i in 0..max {
// channel
// .basic_publish("3-exchange", "", format!("Message #{}", i))
// .await?;
// }
//
// for i in 0..3usize {
// channels.get(i).unwrap().basic_cancel(&format!("ctag-{}", i)).await?;
// }
//
// /*while let Ok(ack) = acked_rx.recv().await {
// if ack == max {
// break;
// }
// }*/
//
// for cons in &mut consumers {
// cons.close().await.unwrap();
// }
//
// for mc in message_count.lock().unwrap().iter() {
// println!("Message count {:?}", mc);
// assert!(mc.1 > &5u32 && mc.1 < &15u32);
// }
//
// producer.close().await?;
//
// Ok(())
//}

0 comments on commit 37284e1

Please sign in to comment.