Skip to content

Commit

Permalink
Fix connection close on error bug
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasrichard committed Sep 18, 2024
1 parent c0896bd commit ae49890
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 17 deletions.
13 changes: 8 additions & 5 deletions metalmq/src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ async fn incoming_loop(conn: &mut Connection, mut stream: SplitStream<Framed<Tcp
}
}

debug_assert!(conn.channel_receivers.is_empty());
debug_assert!(conn.channel_handlers.is_empty());

Ok(())
}

Expand Down Expand Up @@ -193,22 +196,22 @@ async fn outgoing_loop_with_heartbeat(
Ok(())
}

/// Handles the incoming frame by the connection or connection routes to the appropriate channel.
/// If it returns `Ok(false)` that means that the loop should be closed.
pub async fn handle_in_stream_data(conn: &mut Connection, data: Frame) -> Result<bool> {
match data {
Frame::Frame(frame) => {
if conn.handle_client_frame(frame).await.is_err() {
conn.close().await?;
dbg!(&frame);

if !conn.handle_client_frame(frame).await? {
return Ok(false);
}

Ok(true)
}
Frame::Frames(frames) => {
for frame in frames {
if conn.handle_client_frame(frame).await.is_err() {
conn.close().await?;

if !conn.handle_client_frame(frame).await? {
return Ok(false);
}
}
Expand Down
8 changes: 6 additions & 2 deletions metalmq/src/client/connection/open_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,14 @@ impl Connection {

for (channel, ch_tx) in &self.channel_receivers {
// drop channel channel in order to stop it
let _ = ch_tx;
let _ = *ch_tx;

if let Some(jh) = self.channel_handlers.remove(&channel) {
jh.await;
//jh.abort();

//let x = jh.await;

//dbg!(x);
}
}

Expand Down
8 changes: 6 additions & 2 deletions metalmq/src/client/connection/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ impl Connection {
scope: ErrorScope::Connection,
..
} => {
let _r2 = self.close().await;
// TODO this should be silent close which means that it doesn't send out any
// frames
let r2 = self.close().await;

self.send_frame(rte.into()).await;
dbg!(r2);

let _ = self.send_frame(dbg!(rte.into())).await.map_err(|e| dbg!(e));
// if it fails we just return with an error so loop will close everything

return Ok(false);
Expand Down
27 changes: 26 additions & 1 deletion metalmq/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::fmt;

use metalmq_codec::{codec::Frame, frame::AMQPFrame};
use metalmq_codec::{
codec::Frame,
frame::{self, AMQPFrame, MethodFrameArgs},
};

pub type Result<T> = std::result::Result<T, Error>;

Expand Down Expand Up @@ -74,6 +77,28 @@ impl<T> From<RuntimeError> for Result<T> {
}
}

impl From<AMQPFrame> for RuntimeError {
fn from(value: AMQPFrame) -> Self {
match value {
AMQPFrame::Method(0, _, MethodFrameArgs::ConnectionClose(args)) => RuntimeError {
scope: ErrorScope::Connection,
channel: 0,
code: args.code,
text: args.text.to_string(),
class_method: frame::unify_class_method(args.class_id, args.method_id),
},
AMQPFrame::Method(channel, _, MethodFrameArgs::ChannelClose(args)) => RuntimeError {
scope: ErrorScope::Channel,
channel,
code: args.code,
text: args.text.to_string(),
class_method: frame::unify_class_method(args.class_id, args.method_id),
},
f => panic!("Unknown frame {f:?}"),
}
}
}

// TODO move all error converstion to an error mod
// instead of this amqpframe implements to_frame
//pub fn runtime_error_to_frame(rte: &RuntimeError) -> Frame {
Expand Down
16 changes: 11 additions & 5 deletions metalmq/src/tests/connect.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use metalmq_codec::frame::{self, AMQPFrame};

use crate::tests::TestCase;
use crate::Result;
use crate::error::{ErrorScope, Result, RuntimeError};
use crate::tests::{recv, TestCase};

#[tokio::test]
async fn connect_with_username_password() -> Result<()> {
Expand Down Expand Up @@ -112,6 +112,8 @@ async fn connect_with_bad_password() -> Result<()> {
AMQPFrame::Method(0, _, frame::MethodFrameArgs::ConnectionClose(_))
));

assert!(client.connection.await.is_ok());

Ok(())
}

Expand All @@ -121,15 +123,19 @@ async fn channel_reopen_with_same_number() -> Result<()> {
let mut client = test_case.new_client_with_channel(1).await;

client.send_frame(frame::channel_open(1)).await;

client.send_frame(frame::channel_open(1)).await;

let connection_error = client.recv_single_frame().await;
let connection_error = recv::recv_error_frame(&mut client.conn_rx).await;

assert!(matches!(
connection_error,
AMQPFrame::Method(0, _, frame::MethodFrameArgs::ConnectionClose(_))
RuntimeError {
scope: ErrorScope::Connection,
..
},
));

dbg!(client.connection.await);

Ok(())
}
6 changes: 4 additions & 2 deletions metalmq/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl TestCase {
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let connection = Connection::new(ctx, outgoing_tx);

let jh = tokio::spawn(async move { TestCase::client_loop(connection, incoming_rx).await });
let jh = tokio::spawn(async move { dbg!(TestCase::client_loop(connection, incoming_rx).await) });

TestClient {
connection: jh,
Expand All @@ -72,7 +72,9 @@ impl TestCase {

async fn client_loop(mut connection: Connection, mut incoming_rx: mpsc::Receiver<Frame>) -> Result<()> {
while let Some(f) = incoming_rx.recv().await {
conn::handle_in_stream_data(&mut connection, f).await?;
if !conn::handle_in_stream_data(&mut connection, f).await? {
break;
}
}

Ok(())
Expand Down
16 changes: 16 additions & 0 deletions metalmq/src/tests/recv.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use metalmq_codec::{codec::Frame, frame::AMQPFrame};
use tokio::sync::mpsc;

use crate::error::RuntimeError;

/// Receiving with timeout
pub async fn recv_with_timeout<T>(rx: &mut mpsc::Receiver<T>) -> Option<T> {
let sleep = tokio::time::sleep(tokio::time::Duration::from_millis(100));
Expand All @@ -19,6 +21,8 @@ pub async fn recv_with_timeout<T>(rx: &mut mpsc::Receiver<T>) -> Option<T> {
/// Receive a single frame from the channel and panics if there is a timeout or we received more
/// frames. See [`Frame`] enum.
pub async fn recv_single_frame(rx: &mut mpsc::Receiver<Frame>) -> AMQPFrame {
debug_assert!(!rx.is_closed());

let f = recv_with_timeout(rx).await.expect("No response is received");

match f {
Expand All @@ -30,6 +34,8 @@ pub async fn recv_single_frame(rx: &mut mpsc::Receiver<Frame>) -> AMQPFrame {
/// Receive multiple frames from the channel and panics if there is a timeout or we received
/// only a single frame. See [`Frame`] enum.
pub async fn recv_multiple_frames(rx: &mut mpsc::Receiver<Frame>) -> Vec<AMQPFrame> {
debug_assert!(!rx.is_closed());

let f = recv_with_timeout(rx).await.expect("No response is received");

match f {
Expand All @@ -40,5 +46,15 @@ pub async fn recv_multiple_frames(rx: &mut mpsc::Receiver<Frame>) -> Vec<AMQPFra

/// Listens for messages in the channel and if it doesn't get any, returns true.
pub async fn recv_nothing<T>(rx: &mut mpsc::Receiver<T>) -> bool {
debug_assert!(!rx.is_closed());

recv_with_timeout(rx).await.is_none()
}

pub async fn recv_error_frame(rx: &mut mpsc::Receiver<Frame>) -> RuntimeError {
debug_assert!(!rx.is_closed());

let f = recv_single_frame(rx).await;

f.into()
}

0 comments on commit ae49890

Please sign in to comment.