From 3bf138f9b4687e59146c44838f9fc25e0f339fa8 Mon Sep 17 00:00:00 2001 From: Richard Jonas Date: Thu, 5 Sep 2024 16:38:19 +0200 Subject: [PATCH] Refactor channel commands --- metalmq/src/client/channel.rs | 1 + metalmq/src/client/channel/basic.rs | 172 +++++++++++++++++++++++- metalmq/src/client/channel/exchange.rs | 21 ++- metalmq/src/client/channel/queue.rs | 122 ++++++++++++++++- metalmq/src/client/channel/router.rs | 92 +++++++++++++ metalmq/src/client/channel/types.rs | 69 +--------- metalmq/src/client/connection/router.rs | 21 ++- 7 files changed, 414 insertions(+), 84 deletions(-) create mode 100644 metalmq/src/client/channel/router.rs diff --git a/metalmq/src/client/channel.rs b/metalmq/src/client/channel.rs index 53e4535..aa455ed 100644 --- a/metalmq/src/client/channel.rs +++ b/metalmq/src/client/channel.rs @@ -3,4 +3,5 @@ pub mod content; pub mod exchange; pub mod open_close; pub mod queue; +pub mod router; pub mod types; diff --git a/metalmq/src/client/channel/basic.rs b/metalmq/src/client/channel/basic.rs index f4caf1d..46535bb 100644 --- a/metalmq/src/client/channel/basic.rs +++ b/metalmq/src/client/channel/basic.rs @@ -1,13 +1,17 @@ -use log::error; -use metalmq_codec::frame; +use std::time::Duration; + +use log::{error, warn}; +use metalmq_codec::{codec::Frame, frame}; +use tokio::sync::oneshot; use crate::{ error::{ChannelError, ConnectionError, Result}, exchange::handler::ExchangeCommand, message::Message, + queue, }; -use super::types::{Channel, PublishedContent}; +use super::types::{ActivelyConsumedQueue, Channel, PassivelyConsumedQueue, PublishedContent}; impl Channel { pub async fn handle_basic_publish(&mut self, args: frame::BasicPublishArgs) -> Result<()> { @@ -39,11 +43,58 @@ impl Channel { } pub async fn handle_basic_consume(&mut self, args: frame::BasicConsumeArgs) -> Result<()> { + let queue_clone = args.queue.clone(); + let consumer_tag_clone = args.consumer_tag.clone(); + let cmd = queue::manager::QueueConsumeCommand { + conn_id: self.source_connection.clone(), + channel: self.number, + queue_name: queue_clone, + consumer_tag: consumer_tag_clone, + no_ack: args.flags.contains(frame::BasicConsumeFlags::NO_ACK), + exclusive: args.flags.contains(frame::BasicConsumeFlags::EXCLUSIVE), + outgoing: self.outgoing.clone(), + frame_size: self.frame_size, + }; + + let queue_sink = queue::manager::consume(&self.qm, cmd).await?; + + let consumer_tag_clone = args.consumer_tag.clone(); + let queue_sink_clone = queue_sink.clone(); + + self.consumed_queue = Some(ActivelyConsumedQueue { + consumer_tag: args.consumer_tag.clone(), + queue_name: args.queue.clone(), + queue_sink, + }); + + self.send_frame(Frame::Frame( + frame::BasicConsumeOkArgs::new(&args.consumer_tag).frame(self.number), + )) + .await?; + + let start_deliver_cmd = queue::handler::QueueCommand::StartDelivering { + consumer_tag: consumer_tag_clone, + }; + + queue_sink_clone.send(start_deliver_cmd).await?; + Ok(()) } pub async fn handle_basic_cancel(&mut self, args: frame::BasicCancelArgs) -> Result<()> { - Ok(()) + if let Some(cq) = self.consumed_queue.take() { + let cmd = queue::manager::QueueCancelConsume { + channel: self.number, + queue_name: cq.queue_name.clone(), + consumer_tag: cq.consumer_tag.clone(), + }; + queue::manager::cancel_consume(&self.qm, cmd).await?; + } + + self.send_frame(Frame::Frame( + frame::BasicCancelOkArgs::new(&args.consumer_tag).frame(self.number), + )) + .await } /// Handles Ack coming from client. @@ -51,11 +102,117 @@ impl Channel { /// A message can be acked more than once. If a non-delivered message is acked, a channel /// exception will be raised. pub async fn handle_basic_ack(&mut self, args: frame::BasicAckArgs) -> Result<()> { + // TODO check if only delivered messages are acked, even multiple times + match &self.consumed_queue { + Some(cq) => { + let (tx, rx) = oneshot::channel(); + + // TODO why we need here the timeout? + cq.queue_sink + .send_timeout( + queue::handler::QueueCommand::AckMessage(queue::handler::AckCmd { + channel: self.number, + consumer_tag: cq.consumer_tag.clone(), + delivery_tag: args.delivery_tag, + multiple: args.multiple, + result: tx, + }), + Duration::from_secs(1), + ) + .await?; + + rx.await.unwrap()? + } + None => match &self.passively_consumed_queue { + Some(pq) => { + let (tx, rx) = oneshot::channel(); + + pq.queue_sink + .send(queue::handler::QueueCommand::AckMessage(queue::handler::AckCmd { + channel: self.number, + consumer_tag: pq.consumer_tag.clone(), + delivery_tag: args.delivery_tag, + multiple: args.multiple, + result: tx, + })) + .await?; + + rx.await.unwrap().unwrap(); + } + None => { + warn!("Basic.Ack arrived without consuming the queue"); + } + }, + } + Ok(()) } pub async fn handle_basic_get(&mut self, args: frame::BasicGetArgs) -> Result<()> { - Ok(()) + // Cache the queue the client consumes passively with Basic.Get + if self.passively_consumed_queue.is_none() { + let sink = queue::manager::get_command_sink( + &self.qm, + queue::manager::GetQueueSinkQuery { + channel: self.number, + queue_name: args.queue.clone(), + }, + ) + .await; + + if sink.is_err() { + return ChannelError::NotFound.to_result( + self.number, + frame::BASIC_GET, + &format!("Queue {} not found", args.queue), + ); + } + + let sink = sink.unwrap(); + let (tx, rx) = oneshot::channel(); + + sink.send(queue::handler::QueueCommand::PassiveConsume( + queue::handler::PassiveConsumeCmd { + conn_id: self.source_connection.clone(), + channel: self.number, + sink: self.outgoing.clone(), + frame_size: self.frame_size, + result: tx, + }, + )) + .await + .unwrap(); + + rx.await.unwrap().unwrap(); + + let pq = PassivelyConsumedQueue { + queue_name: args.queue, + consumer_tag: format!("{}-{}", self.source_connection, self.number), + delivery_tag: 1u64, + queue_sink: sink, + }; + + let _ = self.passively_consumed_queue.insert(pq); + } + + if let Some(pq) = &self.passively_consumed_queue { + let (tx, rx) = oneshot::channel(); + + let _ = pq + .queue_sink + .send(queue::handler::QueueCommand::Get(queue::handler::GetCmd { + conn_id: self.source_connection.clone(), + channel: self.number, + no_ack: args.no_ack, + result: tx, + })) + .await + .unwrap(); + + rx.await.unwrap() + } else { + ConnectionError::InternalError.to_result(frame::BASIC_GET, "Queue not exist") + } } pub async fn handle_basic_reject(&mut self, __args: frame::BasicRejectArgs) -> Result<()> { @@ -63,6 +220,11 @@ impl Channel { } pub async fn handle_confirm_select(&mut self, _args: frame::ConfirmSelectArgs) -> Result<()> { + self.next_confirm_delivery_tag = 1u64; + + self.send_frame(Frame::Frame(frame::confirm_select_ok(self.number))) + .await?; + Ok(()) } diff --git a/metalmq/src/client/channel/exchange.rs b/metalmq/src/client/channel/exchange.rs index 047edbc..b44c04f 100644 --- a/metalmq/src/client/channel/exchange.rs +++ b/metalmq/src/client/channel/exchange.rs @@ -2,7 +2,10 @@ use metalmq_codec::{codec::Frame, frame}; use crate::{ error::Result, - exchange::{self, manager::DeclareExchangeCommand}, + exchange::{ + self, + manager::{DeclareExchangeCommand, DeleteExchangeCommand}, + }, }; use super::types::Channel; @@ -37,6 +40,20 @@ impl Channel { } pub async fn handle_exchange_delete(&mut self, args: frame::ExchangeDeleteArgs) -> Result<()> { - Ok(()) + let exchange_name = args.exchange_name.clone(); + let cmd = DeleteExchangeCommand { + channel: self.number, + if_unused: args.flags.contains(frame::ExchangeDeleteFlags::IF_UNUSED), + exchange_name: args.exchange_name, + }; + + exchange::manager::delete_exchange(&self.em, cmd).await?; + + // TODO what happens if the previous code returns with an error and we never removes that + // exchange? + self.exchanges.remove(&exchange_name); + + self.send_frame(Frame::Frame(frame::exchange_delete_ok(self.number))) + .await } } diff --git a/metalmq/src/client/channel/queue.rs b/metalmq/src/client/channel/queue.rs index c93530e..92c5d28 100644 --- a/metalmq/src/client/channel/queue.rs +++ b/metalmq/src/client/channel/queue.rs @@ -1,26 +1,134 @@ +use log::warn; +use metalmq_codec::codec::Frame; use metalmq_codec::frame; +use uuid::Uuid; use crate::client::channel::types::Channel; -use crate::error::Result; +use crate::error::{ChannelError, Result}; +use crate::exchange::manager::{BindQueueCommand, UnbindQueueCommand}; +use crate::{exchange, queue}; impl Channel { - pub async fn queue_declare(&mut self, channel: Channel, args: frame::QueueDeclareArgs) -> Result<()> { + pub async fn handle_queue_declare(&mut self, args: frame::QueueDeclareArgs) -> Result<()> { + let mut queue_name = args.name.clone(); + if queue_name.is_empty() { + queue_name = Uuid::new_v4().hyphenated().to_string(); + } + + let passive = args.flags.contains(frame::QueueDeclareFlags::PASSIVE); + + let cmd = queue::manager::QueueDeclareCommand { + conn_id: self.source_connection.clone(), + channel: self.number, + queue: args.into(), + passive, + }; + let (message_count, consumer_count) = queue::manager::declare_queue(&self.qm, cmd).await?; + + self.send_frame(Frame::Frame( + frame::QueueDeclareOkArgs::default() + .name(&queue_name) + .message_count(message_count) + .consumer_count(consumer_count) + .frame(self.number), + )) + .await?; + Ok(()) } - pub async fn queue_bind(&mut self, channel: Channel, args: frame::QueueBindArgs) -> Result<()> { - Ok(()) + pub async fn handle_queue_bind(&mut self, args: frame::QueueBindArgs) -> Result<()> { + let cmd = queue::manager::GetQueueSinkQuery { + channel: self.number, + queue_name: args.queue_name.clone(), + }; + + match queue::manager::get_command_sink(&self.qm, cmd).await { + Ok(sink) => { + let cmd = BindQueueCommand { + conn_id: self.source_connection.clone(), + channel: self.number, + exchange_name: args.exchange_name, + queue_name: args.queue_name, + routing_key: args.routing_key, + args: args.args, + queue_sink: sink, + }; + + // TODO now we can use let patter = x else {} so we can drop this macro + exchange::manager::bind_queue(&self.em, cmd).await?; + + self.send_frame(Frame::Frame(frame::queue_bind_ok(self.number))).await + } + Err(e) => { + warn!("{:?}", e); + + ChannelError::NotFound.to_result(self.number, frame::QUEUE_BIND, "Queue not found") + } + } } - pub async fn queue_delete(&mut self, channel: Channel, args: frame::QueueDeleteArgs) -> Result<()> { + pub async fn handle_queue_delete(&mut self, args: frame::QueueDeleteArgs) -> Result<()> { + let cmd = queue::manager::QueueDeleteCommand { + conn_id: self.source_connection.clone(), + channel: self.number, + queue_name: args.queue_name, + if_unused: args.flags.contains(frame::QueueDeleteFlags::IF_UNUSED), + if_empty: args.flags.contains(frame::QueueDeleteFlags::IF_EMPTY), + }; + + let message_count = queue::manager::delete_queue(&self.qm, cmd).await?; + + self.send_frame(Frame::Frame( + frame::QueueDeleteOkArgs::default() + .message_count(message_count) + .frame(self.number), + )) + .await?; + Ok(()) } - pub async fn queue_unbind(&mut self, channel: Channel, args: frame::QueueUnbindArgs) -> Result<()> { + pub async fn handle_queue_unbind(&mut self, args: frame::QueueUnbindArgs) -> Result<()> { + let cmd = UnbindQueueCommand { + conn_id: self.source_connection.clone(), + channel: self.number, + exchange_name: args.exchange_name, + queue_name: args.queue_name, + routing_key: args.routing_key, + }; + + exchange::manager::unbind_queue(&self.em, cmd).await?; + + self.send_frame(Frame::Frame(frame::AMQPFrame::Method( + self.number, + frame::QUEUE_UNBIND_OK, + frame::MethodFrameArgs::QueueUnbindOk, + ))) + .await?; + Ok(()) } - pub async fn queue_purge(&mut self, channel: Channel, args: frame::QueuePurgeArgs) -> Result<()> { + pub async fn handle_queue_purge(&mut self, args: frame::QueuePurgeArgs) -> Result<()> { + // Purge the not-yet sent messages from the queue. Queue gives back the number of purged + // messages, so this operation is sync. + + let cmd = queue::manager::GetQueueSinkQuery { + channel: self.number, + queue_name: args.queue_name, + }; + + let queue_sink = queue::manager::get_command_sink(&self.qm, cmd).await?; + let message_count = queue::handler::purge(self.source_connection.clone(), self.number, &queue_sink).await?; + + self.send_frame(Frame::Frame( + frame::QueuePurgeOkArgs::default() + .message_count(message_count) + .frame(self.number), + )) + .await?; + Ok(()) } } diff --git a/metalmq/src/client/channel/router.rs b/metalmq/src/client/channel/router.rs new file mode 100644 index 0000000..1ef2ab0 --- /dev/null +++ b/metalmq/src/client/channel/router.rs @@ -0,0 +1,92 @@ +use metalmq_codec::codec::Frame; +use metalmq_codec::frame::unify_class_method; +use tokio::sync::mpsc; + +use crate::client::channel::types::{Channel, Command}; +use crate::error::Result; + +impl Channel { + pub async fn handle_message(&mut self, mut rx: mpsc::Receiver) -> Result<()> { + use Command::*; + + while let Some(m) = rx.recv().await { + match m { + MethodFrame(ch, cm, ma) => { + let result = match ma { + metalmq_codec::frame::MethodFrameArgs::ChannelClose(args) => { + self.handle_channel_close( + args.code, + unify_class_method(args.class_id, args.method_id), + args.text, + ) + .await + } + metalmq_codec::frame::MethodFrameArgs::ChannelCloseOk => unreachable!(), + metalmq_codec::frame::MethodFrameArgs::ExchangeDeclare(args) => { + self.handle_exchange_declare(args).await + } + metalmq_codec::frame::MethodFrameArgs::ExchangeDeclareOk => unreachable!(), + metalmq_codec::frame::MethodFrameArgs::ExchangeDelete(args) => { + self.handle_exchange_delete(args).await + } + metalmq_codec::frame::MethodFrameArgs::ExchangeDeleteOk => unreachable!(), + metalmq_codec::frame::MethodFrameArgs::QueueDeclare(args) => { + self.handle_queue_declare(args).await + } + metalmq_codec::frame::MethodFrameArgs::QueueDeclareOk(_) => unreachable!(), + metalmq_codec::frame::MethodFrameArgs::QueueBind(args) => self.handle_queue_bind(args).await, + metalmq_codec::frame::MethodFrameArgs::QueueBindOk => unreachable!(), + metalmq_codec::frame::MethodFrameArgs::QueuePurge(args) => self.handle_queue_purge(args).await, + metalmq_codec::frame::MethodFrameArgs::QueuePurgeOk(_) => unreachable!(), + metalmq_codec::frame::MethodFrameArgs::QueueDelete(args) => { + self.handle_queue_delete(args).await + } + metalmq_codec::frame::MethodFrameArgs::QueueDeleteOk(_) => unreachable!(), + metalmq_codec::frame::MethodFrameArgs::QueueUnbind(args) => { + self.handle_queue_unbind(args).await + } + metalmq_codec::frame::MethodFrameArgs::QueueUnbindOk => unreachable!(), + metalmq_codec::frame::MethodFrameArgs::BasicConsume(args) => { + self.handle_basic_consume(args).await + } + metalmq_codec::frame::MethodFrameArgs::BasicConsumeOk(_) => unreachable!(), + metalmq_codec::frame::MethodFrameArgs::BasicCancel(args) => { + self.handle_basic_cancel(args).await + } + metalmq_codec::frame::MethodFrameArgs::BasicCancelOk(_) => unreachable!(), + metalmq_codec::frame::MethodFrameArgs::BasicGet(args) => self.handle_basic_get(args).await, + metalmq_codec::frame::MethodFrameArgs::BasicGetOk(_) => unreachable!(), + metalmq_codec::frame::MethodFrameArgs::BasicGetEmpty => todo!(), + metalmq_codec::frame::MethodFrameArgs::BasicPublish(args) => { + self.handle_basic_publish(args).await + } + metalmq_codec::frame::MethodFrameArgs::BasicReturn(_) => unreachable!(), + metalmq_codec::frame::MethodFrameArgs::BasicDeliver(_) => unreachable!(), + metalmq_codec::frame::MethodFrameArgs::BasicAck(args) => self.handle_basic_ack(args).await, + metalmq_codec::frame::MethodFrameArgs::BasicReject(args) => { + self.handle_basic_reject(args).await + } + metalmq_codec::frame::MethodFrameArgs::BasicNack(_) => unimplemented!(), + metalmq_codec::frame::MethodFrameArgs::ConfirmSelect(args) => { + self.handle_confirm_select(args).await + } + metalmq_codec::frame::MethodFrameArgs::ConfirmSelectOk => unreachable!(), + _ => unreachable!(), + }; + } + Close(reason, cm, text) => self.handle_channel_close(reason, cm, text).await?, + ContentHeader(header) => self.handle_content_header(header).await?, + ContentBody(body) => self.handle_content_body(body).await?, + } + } + + Ok(()) + } + + /// Send frame out to client asynchronously. + pub async fn send_frame(&self, f: Frame) -> Result<()> { + self.outgoing.send(f).await?; + + Ok(()) + } +} diff --git a/metalmq/src/client/channel/types.rs b/metalmq/src/client/channel/types.rs index 0eae306..4531c02 100644 --- a/metalmq/src/client/channel/types.rs +++ b/metalmq/src/client/channel/types.rs @@ -1,13 +1,11 @@ use std::collections::HashMap; -use crate::error::Result; use crate::exchange; use crate::queue; use metalmq_codec::codec::Frame; -use metalmq_codec::frame::unify_class_method; use metalmq_codec::frame::MethodFrameArgs; -use metalmq_codec::frame::{AMQPFrame, ContentBodyFrame, ContentHeaderFrame}; +use metalmq_codec::frame::{ContentBodyFrame, ContentHeaderFrame}; use tokio::sync::mpsc; @@ -28,7 +26,7 @@ pub struct ActivelyConsumedQueue { } /// Queues consumed by the connection with Basic.Get -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct PassivelyConsumedQueue { pub queue_name: String, pub consumer_tag: String, @@ -57,72 +55,13 @@ pub struct Channel { pub source_connection: String, pub number: u16, pub consumed_queue: Option, + pub passively_consumed_queue: Option, pub in_flight_content: Option, pub confirm_mode: bool, pub next_confirm_delivery_tag: u64, + pub frame_size: usize, pub outgoing: mpsc::Sender, pub exchanges: HashMap, pub em: exchange::manager::ExchangeManagerSink, pub qm: queue::manager::QueueManagerSink, } - -impl Channel { - pub async fn handle_message(&mut self, mut rx: mpsc::Receiver) -> Result<()> { - use Command::*; - - while let Some(m) = rx.recv().await { - match m { - MethodFrame(ch, cm, ma) => { - let result = match ma { - metalmq_codec::frame::MethodFrameArgs::ChannelClose(args) => { - self.handle_channel_close( - args.code, - unify_class_method(args.class_id, args.method_id), - args.text, - ) - .await - } - metalmq_codec::frame::MethodFrameArgs::ChannelCloseOk => todo!(), - metalmq_codec::frame::MethodFrameArgs::ExchangeDeclare(args) => { - self.handle_exchange_declare(args).await - } - metalmq_codec::frame::MethodFrameArgs::ExchangeDeclareOk => todo!(), - metalmq_codec::frame::MethodFrameArgs::ExchangeDelete(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::ExchangeDeleteOk => todo!(), - metalmq_codec::frame::MethodFrameArgs::QueueDeclare(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::QueueDeclareOk(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::QueueBind(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::QueueBindOk => todo!(), - metalmq_codec::frame::MethodFrameArgs::QueuePurge(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::QueuePurgeOk(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::QueueDelete(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::QueueDeleteOk(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::QueueUnbind(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::QueueUnbindOk => todo!(), - metalmq_codec::frame::MethodFrameArgs::BasicConsume(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::BasicConsumeOk(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::BasicCancel(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::BasicCancelOk(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::BasicGet(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::BasicGetOk(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::BasicGetEmpty => todo!(), - metalmq_codec::frame::MethodFrameArgs::BasicPublish(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::BasicReturn(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::BasicDeliver(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::BasicAck(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::BasicReject(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::BasicNack(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::ConfirmSelect(_) => todo!(), - metalmq_codec::frame::MethodFrameArgs::ConfirmSelectOk => todo!(), - _ => unreachable!(), - }; - } - Close(reason, cm, text) => self.handle_channel_close(reason, cm, text).await?, - ContentHeader(header) => self.handle_content_header(header).await?, - ContentBody(body) => self.handle_content_body(body).await?, - } - } - - Ok(()) - } -} diff --git a/metalmq/src/client/connection/router.rs b/metalmq/src/client/connection/router.rs index e9041f6..d75384d 100644 --- a/metalmq/src/client/connection/router.rs +++ b/metalmq/src/client/connection/router.rs @@ -10,7 +10,7 @@ use tokio::sync::mpsc; use crate::{ client::{ channel::types::{Channel, Command}, - connection::types::Connection, + connection::types::{Connection, ExclusiveQueue}, }, error::{to_runtime_error, ConnectionError, ErrorScope, Result, RuntimeError}, }; @@ -145,10 +145,19 @@ impl Connection { Ok(true) } - // TODO here we need to handle channel close, because we need to remove the channel - // receiver from the hash map. It is a question though where to handle all of that. - // This should send a message to the channel, that can have a change to cancel the - // consume of queues, etc. Then this code should have a chance to execute something. + QueueDeclare(args) => { + let exclusive = args.flags.contains(frame::QueueDeclareFlags::EXCLUSIVE); + let queue_name = args.name.clone(); + let cmd = Command::MethodFrame(channel, class_method, MethodFrameArgs::QueueDeclare(args)); + + self.send_command_to_channel(channel, cmd).await?; + + if exclusive { + self.exclusive_queues.push(ExclusiveQueue { queue_name }); + } + + Ok(true) + } _ => { let cmd = Command::MethodFrame(channel, class_method, ma); @@ -164,9 +173,11 @@ impl Connection { source_connection: self.id.clone(), number: channel_number, consumed_queue: None, + passively_consumed_queue: None, in_flight_content: None, confirm_mode: false, next_confirm_delivery_tag: 1u64, + frame_size: self.frame_max, outgoing: self.outgoing.clone(), exchanges: HashMap::new(), em: self.em.clone(),