Skip to content

Commit

Permalink
Minor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasrichard committed Aug 19, 2024
1 parent 93a1d6e commit b96e732
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 18 deletions.
7 changes: 2 additions & 5 deletions metalmq-client/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,7 @@ pub(crate) async fn call(sink: &mpsc::Sender<ClientRequest>, f: frame::AMQPFrame
response: Some(WaitFor::FrameResponse(tx)),
},
)
.await
.unwrap();
// TODO ^^^ here we need to apply ? operator to bubble up the send error, not to panic
.await?;

rx.await.unwrap()?;

Expand All @@ -384,8 +382,7 @@ pub(crate) async fn sync_send(sink: &mpsc::Sender<ClientRequest>, f: frame::AMQP
param: Param::Frame(Box::new(f)),
response: Some(WaitFor::SentOut(tx)),
})
.await
.unwrap();
.await?;

rx.await.unwrap()?;

Expand Down
4 changes: 3 additions & 1 deletion metalmq-client/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl ClientState {
.await
.unwrap();

self.event_sink.send(EventSignal::ChannelClose).unwrap();
self.event_sink.send(EventSignal::ChannelClose)?;

Ok(())
}
Expand Down Expand Up @@ -330,6 +330,8 @@ impl ClientState {
channel: ChannelNumber,
args: frame::QueueDeleteOkArgs,
) -> Result<()> {
// TODO we may maintain the queues which are declared by this client and here we can
// remove if the deletion was successful
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions metalmq-client/tests/it/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ async fn delete_not_existing_exchange_error_404() -> Result<()> {
assert_eq!(err.channel, Some(9));
assert_eq!(err.code, 404);

ch.close().await?;
c.close().await?;
assert!(matches!(ch.close().await, Ok(())));
assert!(matches!(c.close().await, Ok(())));

Ok(())
}
Expand Down
9 changes: 9 additions & 0 deletions metalmq/src/client/state/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,15 @@ impl Connection {
//
// We can even have two types of message command as ExchangeCommand: one which is async
// and one which waits for confirmation.

// FIXME we need to separate the exchange lookup logic
// A client can send a message to the default exchange which is empty string, so
// in that case the exchange won't be in the 'cache'. Also a client can send
// message without declaring an exchange, so we need to ask exchange manager if
// this exchange exists. Of course we can cache the exchanges, but in that case if
// anyone deletes the exchange the client state needs to be notified.
//
// FIXME also message sending should be somewhere else in order to be testable
match self.exchanges.get(&pc.exchange) {
Some(ch) => {
// If message is mandatory or the channel is in confirm mode we can expect
Expand Down
19 changes: 9 additions & 10 deletions metalmq/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@ mod restapi;
#[cfg(test)]
pub mod tests;

use hyper::{
body::{self, Incoming},
server::conn::http1,
service::service_fn,
Request,
};
use hyper::{server::conn::http1, service::service_fn};
use hyper_util::rt::TokioIo;
use log::{error, info};
use std::{fmt, io::Write, net::SocketAddr, sync::Arc};
Expand Down Expand Up @@ -106,7 +101,7 @@ fn setup_logger() {
.format_timestamp_millis()
.format(|buf, record| {
let lvl = buf.default_level_style(record.level());
lvl.effects(Effects::BOLD);
let _ = lvl.effects(Effects::BOLD);

match record.level() {
log::Level::Error => lvl.fg_color(Some(AnsiColor::Red.into())),
Expand Down Expand Up @@ -187,17 +182,21 @@ pub async fn main() -> Result<()> {

let cli_config = config::cli();

let config = config::parse_config(&cli_config.config_file_path)?;
let config = config::parse_config(&cli_config.config_file_path).expect("Cannot parse config file");

let exchange_manager = exchange::manager::start();
let context = Context {
exchange_manager: exchange_manager.clone(),
queue_manager: queue::manager::start(exchange_manager),
};

start_http(context.clone(), &config.network.http_listen).await?;
//start_http(context.clone(), &config.network.http_listen)
// .await
// .expect("Cannot start http server");

start_amqp(context, &config.network.amqp_listen).await?;
start_amqp(context, &config.network.amqp_listen)
.await
.expect("Cannon start AMQP server");

signal::ctrl_c().await?;

Expand Down
33 changes: 33 additions & 0 deletions metalmq/tests/it/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,36 @@ async fn unrouted_mandatory_messages_gives_basic_return() -> Result<()> {

Ok(())
}

// basic publish without exchange name but routing key = queue name should direct the
// message to that queue
// aka default exchange

#[tokio::test]
async fn message_to_default_exchange_go_to_queue_by_routing_key() -> Result<()> {
let (mut c, _) = helper::default().connect().await?;
let mut ch = c.channel_open(15).await?;

ch.queue_declare("q-def-exchange", QueueDeclareOpts::default().durable(true))
.await?;

let mut ch_consume = c.channel_open(16).await?;

let result = helper::consume_messages(&ch_consume, "q-def-exchange", Exclusive(false), 1).await?;

ch.basic_publish("", "q-def-exchange", "Message to default exchange")
.await?;

let msgs = result.await.unwrap();

let msg = msgs.first().unwrap();

assert_eq!("", msg.exchange);
assert_eq!(b"Message to default exchange", msg.message.body.as_slice());

ch_consume.close().await?;
ch.close().await?;
c.close().await?;

Ok(())
}

0 comments on commit b96e732

Please sign in to comment.