diff --git a/metalmq/src/client/conn.rs b/metalmq/src/client/conn.rs index 6ed4bbb..30d5209 100644 --- a/metalmq/src/client/conn.rs +++ b/metalmq/src/client/conn.rs @@ -70,6 +70,9 @@ async fn incoming_loop(conn: &mut Connection, mut stream: SplitStream Result { match data { Frame::Frame(frame) => { - if conn.handle_client_frame(frame).await.is_err() { - conn.close().await?; + dbg!(&frame); + if !conn.handle_client_frame(frame).await? { return Ok(false); } @@ -206,9 +211,7 @@ pub async fn handle_in_stream_data(conn: &mut Connection, data: Frame) -> Result } Frame::Frames(frames) => { for frame in frames { - if conn.handle_client_frame(frame).await.is_err() { - conn.close().await?; - + if !conn.handle_client_frame(frame).await? { return Ok(false); } } diff --git a/metalmq/src/client/connection/open_close.rs b/metalmq/src/client/connection/open_close.rs index 0ca22dc..1244908 100644 --- a/metalmq/src/client/connection/open_close.rs +++ b/metalmq/src/client/connection/open_close.rs @@ -120,10 +120,14 @@ impl Connection { for (channel, ch_tx) in &self.channel_receivers { // drop channel channel in order to stop it - let _ = ch_tx; + let _ = *ch_tx; if let Some(jh) = self.channel_handlers.remove(&channel) { - jh.await; + //jh.abort(); + + //let x = jh.await; + + //dbg!(x); } } diff --git a/metalmq/src/client/connection/router.rs b/metalmq/src/client/connection/router.rs index 1b9568d..f883c75 100644 --- a/metalmq/src/client/connection/router.rs +++ b/metalmq/src/client/connection/router.rs @@ -64,9 +64,13 @@ impl Connection { scope: ErrorScope::Connection, .. } => { - let _r2 = self.close().await; + // TODO this should be silent close which means that it doesn't send out any + // frames + let r2 = self.close().await; - self.send_frame(rte.into()).await; + dbg!(r2); + + let _ = self.send_frame(dbg!(rte.into())).await.map_err(|e| dbg!(e)); // if it fails we just return with an error so loop will close everything return Ok(false); diff --git a/metalmq/src/error.rs b/metalmq/src/error.rs index 1f659b3..d5d2b2e 100644 --- a/metalmq/src/error.rs +++ b/metalmq/src/error.rs @@ -1,6 +1,9 @@ use std::fmt; -use metalmq_codec::{codec::Frame, frame::AMQPFrame}; +use metalmq_codec::{ + codec::Frame, + frame::{self, AMQPFrame, MethodFrameArgs}, +}; pub type Result = std::result::Result; @@ -74,6 +77,28 @@ impl From for Result { } } +impl From for RuntimeError { + fn from(value: AMQPFrame) -> Self { + match value { + AMQPFrame::Method(0, _, MethodFrameArgs::ConnectionClose(args)) => RuntimeError { + scope: ErrorScope::Connection, + channel: 0, + code: args.code, + text: args.text.to_string(), + class_method: frame::unify_class_method(args.class_id, args.method_id), + }, + AMQPFrame::Method(channel, _, MethodFrameArgs::ChannelClose(args)) => RuntimeError { + scope: ErrorScope::Channel, + channel, + code: args.code, + text: args.text.to_string(), + class_method: frame::unify_class_method(args.class_id, args.method_id), + }, + f => panic!("Unknown frame {f:?}"), + } + } +} + // TODO move all error converstion to an error mod // instead of this amqpframe implements to_frame //pub fn runtime_error_to_frame(rte: &RuntimeError) -> Frame { diff --git a/metalmq/src/tests/connect.rs b/metalmq/src/tests/connect.rs index 6e472f4..b834faa 100644 --- a/metalmq/src/tests/connect.rs +++ b/metalmq/src/tests/connect.rs @@ -1,7 +1,7 @@ use metalmq_codec::frame::{self, AMQPFrame}; -use crate::tests::TestCase; -use crate::Result; +use crate::error::{ErrorScope, Result, RuntimeError}; +use crate::tests::{recv, TestCase}; #[tokio::test] async fn connect_with_username_password() -> Result<()> { @@ -112,6 +112,8 @@ async fn connect_with_bad_password() -> Result<()> { AMQPFrame::Method(0, _, frame::MethodFrameArgs::ConnectionClose(_)) )); + assert!(client.connection.await.is_ok()); + Ok(()) } @@ -121,15 +123,19 @@ async fn channel_reopen_with_same_number() -> Result<()> { let mut client = test_case.new_client_with_channel(1).await; client.send_frame(frame::channel_open(1)).await; - client.send_frame(frame::channel_open(1)).await; - let connection_error = client.recv_single_frame().await; + let connection_error = recv::recv_error_frame(&mut client.conn_rx).await; assert!(matches!( connection_error, - AMQPFrame::Method(0, _, frame::MethodFrameArgs::ConnectionClose(_)) + RuntimeError { + scope: ErrorScope::Connection, + .. + }, )); + dbg!(client.connection.await); + Ok(()) } diff --git a/metalmq/src/tests/mod.rs b/metalmq/src/tests/mod.rs index eea70ec..3a3629e 100644 --- a/metalmq/src/tests/mod.rs +++ b/metalmq/src/tests/mod.rs @@ -61,7 +61,7 @@ impl TestCase { let (outgoing_tx, outgoing_rx) = mpsc::channel(16); let connection = Connection::new(ctx, outgoing_tx); - let jh = tokio::spawn(async move { TestCase::client_loop(connection, incoming_rx).await }); + let jh = tokio::spawn(async move { dbg!(TestCase::client_loop(connection, incoming_rx).await) }); TestClient { connection: jh, @@ -72,7 +72,9 @@ impl TestCase { async fn client_loop(mut connection: Connection, mut incoming_rx: mpsc::Receiver) -> Result<()> { while let Some(f) = incoming_rx.recv().await { - conn::handle_in_stream_data(&mut connection, f).await?; + if !conn::handle_in_stream_data(&mut connection, f).await? { + break; + } } Ok(()) diff --git a/metalmq/src/tests/recv.rs b/metalmq/src/tests/recv.rs index 0745c3a..90b7fc2 100644 --- a/metalmq/src/tests/recv.rs +++ b/metalmq/src/tests/recv.rs @@ -1,6 +1,8 @@ use metalmq_codec::{codec::Frame, frame::AMQPFrame}; use tokio::sync::mpsc; +use crate::error::RuntimeError; + /// Receiving with timeout pub async fn recv_with_timeout(rx: &mut mpsc::Receiver) -> Option { let sleep = tokio::time::sleep(tokio::time::Duration::from_millis(100)); @@ -19,6 +21,8 @@ pub async fn recv_with_timeout(rx: &mut mpsc::Receiver) -> Option { /// Receive a single frame from the channel and panics if there is a timeout or we received more /// frames. See [`Frame`] enum. pub async fn recv_single_frame(rx: &mut mpsc::Receiver) -> AMQPFrame { + debug_assert!(!rx.is_closed()); + let f = recv_with_timeout(rx).await.expect("No response is received"); match f { @@ -30,6 +34,8 @@ pub async fn recv_single_frame(rx: &mut mpsc::Receiver) -> AMQPFrame { /// Receive multiple frames from the channel and panics if there is a timeout or we received /// only a single frame. See [`Frame`] enum. pub async fn recv_multiple_frames(rx: &mut mpsc::Receiver) -> Vec { + debug_assert!(!rx.is_closed()); + let f = recv_with_timeout(rx).await.expect("No response is received"); match f { @@ -40,5 +46,15 @@ pub async fn recv_multiple_frames(rx: &mut mpsc::Receiver) -> Vec(rx: &mut mpsc::Receiver) -> bool { + debug_assert!(!rx.is_closed()); + recv_with_timeout(rx).await.is_none() } + +pub async fn recv_error_frame(rx: &mut mpsc::Receiver) -> RuntimeError { + debug_assert!(!rx.is_closed()); + + let f = recv_single_frame(rx).await; + + f.into() +}