Skip to content

Commit

Permalink
Refactor channel commands
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasrichard committed Sep 5, 2024
1 parent 4ed2bcb commit 3bf138f
Show file tree
Hide file tree
Showing 7 changed files with 414 additions and 84 deletions.
1 change: 1 addition & 0 deletions metalmq/src/client/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ pub mod content;
pub mod exchange;
pub mod open_close;
pub mod queue;
pub mod router;
pub mod types;
172 changes: 167 additions & 5 deletions metalmq/src/client/channel/basic.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use log::error;
use metalmq_codec::frame;
use std::time::Duration;

use log::{error, warn};
use metalmq_codec::{codec::Frame, frame};
use tokio::sync::oneshot;

use crate::{
error::{ChannelError, ConnectionError, Result},
exchange::handler::ExchangeCommand,
message::Message,
queue,
};

use super::types::{Channel, PublishedContent};
use super::types::{ActivelyConsumedQueue, Channel, PassivelyConsumedQueue, PublishedContent};

impl Channel {
pub async fn handle_basic_publish(&mut self, args: frame::BasicPublishArgs) -> Result<()> {
Expand Down Expand Up @@ -39,30 +43,188 @@ impl Channel {
}

pub async fn handle_basic_consume(&mut self, args: frame::BasicConsumeArgs) -> Result<()> {
let queue_clone = args.queue.clone();
let consumer_tag_clone = args.consumer_tag.clone();
let cmd = queue::manager::QueueConsumeCommand {
conn_id: self.source_connection.clone(),
channel: self.number,
queue_name: queue_clone,
consumer_tag: consumer_tag_clone,
no_ack: args.flags.contains(frame::BasicConsumeFlags::NO_ACK),
exclusive: args.flags.contains(frame::BasicConsumeFlags::EXCLUSIVE),
outgoing: self.outgoing.clone(),
frame_size: self.frame_size,
};

let queue_sink = queue::manager::consume(&self.qm, cmd).await?;

let consumer_tag_clone = args.consumer_tag.clone();
let queue_sink_clone = queue_sink.clone();

self.consumed_queue = Some(ActivelyConsumedQueue {
consumer_tag: args.consumer_tag.clone(),
queue_name: args.queue.clone(),
queue_sink,
});

self.send_frame(Frame::Frame(
frame::BasicConsumeOkArgs::new(&args.consumer_tag).frame(self.number),
))
.await?;

let start_deliver_cmd = queue::handler::QueueCommand::StartDelivering {
consumer_tag: consumer_tag_clone,
};

queue_sink_clone.send(start_deliver_cmd).await?;

Ok(())
}

pub async fn handle_basic_cancel(&mut self, args: frame::BasicCancelArgs) -> Result<()> {
Ok(())
if let Some(cq) = self.consumed_queue.take() {
let cmd = queue::manager::QueueCancelConsume {
channel: self.number,
queue_name: cq.queue_name.clone(),
consumer_tag: cq.consumer_tag.clone(),
};
queue::manager::cancel_consume(&self.qm, cmd).await?;
}

self.send_frame(Frame::Frame(
frame::BasicCancelOkArgs::new(&args.consumer_tag).frame(self.number),
))
.await
}

/// Handles Ack coming from client.
///
/// A message can be acked more than once. If a non-delivered message is acked, a channel
/// exception will be raised.
pub async fn handle_basic_ack(&mut self, args: frame::BasicAckArgs) -> Result<()> {
// TODO check if only delivered messages are acked, even multiple times
match &self.consumed_queue {
Some(cq) => {
let (tx, rx) = oneshot::channel();

// TODO why we need here the timeout?
cq.queue_sink
.send_timeout(
queue::handler::QueueCommand::AckMessage(queue::handler::AckCmd {
channel: self.number,
consumer_tag: cq.consumer_tag.clone(),
delivery_tag: args.delivery_tag,
multiple: args.multiple,
result: tx,
}),
Duration::from_secs(1),
)
.await?;

rx.await.unwrap()?
}
None => match &self.passively_consumed_queue {
Some(pq) => {
let (tx, rx) = oneshot::channel();

pq.queue_sink
.send(queue::handler::QueueCommand::AckMessage(queue::handler::AckCmd {
channel: self.number,
consumer_tag: pq.consumer_tag.clone(),
delivery_tag: args.delivery_tag,
multiple: args.multiple,
result: tx,
}))
.await?;

rx.await.unwrap().unwrap();
}
None => {
warn!("Basic.Ack arrived without consuming the queue");
}
},
}

Ok(())
}

pub async fn handle_basic_get(&mut self, args: frame::BasicGetArgs) -> Result<()> {
Ok(())
// Cache the queue the client consumes passively with Basic.Get
if self.passively_consumed_queue.is_none() {
let sink = queue::manager::get_command_sink(
&self.qm,
queue::manager::GetQueueSinkQuery {
channel: self.number,
queue_name: args.queue.clone(),
},
)
.await;

if sink.is_err() {
return ChannelError::NotFound.to_result(
self.number,
frame::BASIC_GET,
&format!("Queue {} not found", args.queue),
);
}

let sink = sink.unwrap();
let (tx, rx) = oneshot::channel();

sink.send(queue::handler::QueueCommand::PassiveConsume(
queue::handler::PassiveConsumeCmd {
conn_id: self.source_connection.clone(),
channel: self.number,
sink: self.outgoing.clone(),
frame_size: self.frame_size,
result: tx,
},
))
.await
.unwrap();

rx.await.unwrap().unwrap();

let pq = PassivelyConsumedQueue {
queue_name: args.queue,
consumer_tag: format!("{}-{}", self.source_connection, self.number),
delivery_tag: 1u64,
queue_sink: sink,
};

let _ = self.passively_consumed_queue.insert(pq);
}

if let Some(pq) = &self.passively_consumed_queue {
let (tx, rx) = oneshot::channel();

let _ = pq
.queue_sink
.send(queue::handler::QueueCommand::Get(queue::handler::GetCmd {
conn_id: self.source_connection.clone(),
channel: self.number,
no_ack: args.no_ack,
result: tx,
}))
.await
.unwrap();

rx.await.unwrap()
} else {
ConnectionError::InternalError.to_result(frame::BASIC_GET, "Queue not exist")
}
}

pub async fn handle_basic_reject(&mut self, __args: frame::BasicRejectArgs) -> Result<()> {
Ok(())
}

pub async fn handle_confirm_select(&mut self, _args: frame::ConfirmSelectArgs) -> Result<()> {
self.next_confirm_delivery_tag = 1u64;

self.send_frame(Frame::Frame(frame::confirm_select_ok(self.number)))
.await?;

Ok(())
}

Expand Down
21 changes: 19 additions & 2 deletions metalmq/src/client/channel/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use metalmq_codec::{codec::Frame, frame};

use crate::{
error::Result,
exchange::{self, manager::DeclareExchangeCommand},
exchange::{
self,
manager::{DeclareExchangeCommand, DeleteExchangeCommand},
},
};

use super::types::Channel;
Expand Down Expand Up @@ -37,6 +40,20 @@ impl Channel {
}

pub async fn handle_exchange_delete(&mut self, args: frame::ExchangeDeleteArgs) -> Result<()> {
Ok(())
let exchange_name = args.exchange_name.clone();
let cmd = DeleteExchangeCommand {
channel: self.number,
if_unused: args.flags.contains(frame::ExchangeDeleteFlags::IF_UNUSED),
exchange_name: args.exchange_name,
};

exchange::manager::delete_exchange(&self.em, cmd).await?;

// TODO what happens if the previous code returns with an error and we never removes that
// exchange?
self.exchanges.remove(&exchange_name);

self.send_frame(Frame::Frame(frame::exchange_delete_ok(self.number)))
.await
}
}
122 changes: 115 additions & 7 deletions metalmq/src/client/channel/queue.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,134 @@
use log::warn;
use metalmq_codec::codec::Frame;
use metalmq_codec::frame;
use uuid::Uuid;

use crate::client::channel::types::Channel;
use crate::error::Result;
use crate::error::{ChannelError, Result};
use crate::exchange::manager::{BindQueueCommand, UnbindQueueCommand};
use crate::{exchange, queue};

impl Channel {
pub async fn queue_declare(&mut self, channel: Channel, args: frame::QueueDeclareArgs) -> Result<()> {
pub async fn handle_queue_declare(&mut self, args: frame::QueueDeclareArgs) -> Result<()> {
let mut queue_name = args.name.clone();
if queue_name.is_empty() {
queue_name = Uuid::new_v4().hyphenated().to_string();
}

let passive = args.flags.contains(frame::QueueDeclareFlags::PASSIVE);

let cmd = queue::manager::QueueDeclareCommand {
conn_id: self.source_connection.clone(),
channel: self.number,
queue: args.into(),
passive,
};
let (message_count, consumer_count) = queue::manager::declare_queue(&self.qm, cmd).await?;

self.send_frame(Frame::Frame(
frame::QueueDeclareOkArgs::default()
.name(&queue_name)
.message_count(message_count)
.consumer_count(consumer_count)
.frame(self.number),
))
.await?;

Ok(())
}

pub async fn queue_bind(&mut self, channel: Channel, args: frame::QueueBindArgs) -> Result<()> {
Ok(())
pub async fn handle_queue_bind(&mut self, args: frame::QueueBindArgs) -> Result<()> {
let cmd = queue::manager::GetQueueSinkQuery {
channel: self.number,
queue_name: args.queue_name.clone(),
};

match queue::manager::get_command_sink(&self.qm, cmd).await {
Ok(sink) => {
let cmd = BindQueueCommand {
conn_id: self.source_connection.clone(),
channel: self.number,
exchange_name: args.exchange_name,
queue_name: args.queue_name,
routing_key: args.routing_key,
args: args.args,
queue_sink: sink,
};

// TODO now we can use let patter = x else {} so we can drop this macro
exchange::manager::bind_queue(&self.em, cmd).await?;

self.send_frame(Frame::Frame(frame::queue_bind_ok(self.number))).await
}
Err(e) => {
warn!("{:?}", e);

ChannelError::NotFound.to_result(self.number, frame::QUEUE_BIND, "Queue not found")
}
}
}

pub async fn queue_delete(&mut self, channel: Channel, args: frame::QueueDeleteArgs) -> Result<()> {
pub async fn handle_queue_delete(&mut self, args: frame::QueueDeleteArgs) -> Result<()> {
let cmd = queue::manager::QueueDeleteCommand {
conn_id: self.source_connection.clone(),
channel: self.number,
queue_name: args.queue_name,
if_unused: args.flags.contains(frame::QueueDeleteFlags::IF_UNUSED),
if_empty: args.flags.contains(frame::QueueDeleteFlags::IF_EMPTY),
};

let message_count = queue::manager::delete_queue(&self.qm, cmd).await?;

self.send_frame(Frame::Frame(
frame::QueueDeleteOkArgs::default()
.message_count(message_count)
.frame(self.number),
))
.await?;

Ok(())
}

pub async fn queue_unbind(&mut self, channel: Channel, args: frame::QueueUnbindArgs) -> Result<()> {
pub async fn handle_queue_unbind(&mut self, args: frame::QueueUnbindArgs) -> Result<()> {
let cmd = UnbindQueueCommand {
conn_id: self.source_connection.clone(),
channel: self.number,
exchange_name: args.exchange_name,
queue_name: args.queue_name,
routing_key: args.routing_key,
};

exchange::manager::unbind_queue(&self.em, cmd).await?;

self.send_frame(Frame::Frame(frame::AMQPFrame::Method(
self.number,
frame::QUEUE_UNBIND_OK,
frame::MethodFrameArgs::QueueUnbindOk,
)))
.await?;

Ok(())
}

pub async fn queue_purge(&mut self, channel: Channel, args: frame::QueuePurgeArgs) -> Result<()> {
pub async fn handle_queue_purge(&mut self, args: frame::QueuePurgeArgs) -> Result<()> {
// Purge the not-yet sent messages from the queue. Queue gives back the number of purged
// messages, so this operation is sync.

let cmd = queue::manager::GetQueueSinkQuery {
channel: self.number,
queue_name: args.queue_name,
};

let queue_sink = queue::manager::get_command_sink(&self.qm, cmd).await?;
let message_count = queue::handler::purge(self.source_connection.clone(), self.number, &queue_sink).await?;

self.send_frame(Frame::Frame(
frame::QueuePurgeOkArgs::default()
.message_count(message_count)
.frame(self.number),
))
.await?;

Ok(())
}
}
Loading

0 comments on commit 3bf138f

Please sign in to comment.