Skip to content

Commit

Permalink
refactor: {Async,}Sink takes item by reference
Browse files Browse the repository at this point in the history
  • Loading branch information
threadexio committed Aug 30, 2024
1 parent 2e095e5 commit 496ba03
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 120 deletions.
4 changes: 2 additions & 2 deletions channels-io/src/framed/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use alloc::vec::Vec;
/// TODO: docs
pub trait Encoder {
/// TODO: docs
type Item;
type Item: ?Sized;

/// TODO: docs
type Error;

/// TODO: docs
fn encode(
&mut self,
item: Self::Item,
item: &Self::Item,
buf: &mut Vec<u8>,
) -> Result<(), Self::Error>;
}
21 changes: 12 additions & 9 deletions channels-io/src/framed/framed_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ impl<W, E> FramedWrite<W, E>
where
E: Encoder,
{
fn encode_item(&mut self, item: E::Item) -> Result<(), E::Error> {
fn encode_item(
&mut self,
item: &E::Item,
) -> Result<(), E::Error> {
self.encoder.encode(item, &mut self.buf)
}

Expand Down Expand Up @@ -192,7 +195,7 @@ where

fn start_send(
mut self: Pin<&mut Self>,
item: Self::Item,
item: &Self::Item,
) -> Result<(), Self::Error> {
self.encode_item(item).map_err(FramedWriteError::Encode)
}
Expand All @@ -217,7 +220,7 @@ where

type Error = FramedWriteError<E::Error, W::Error>;

fn send(&mut self, item: Self::Item) -> Result<(), Self::Error> {
fn send(&mut self, item: &Self::Item) -> Result<(), Self::Error> {
self.encode_item(item).map_err(FramedWriteError::Encode)?;

let pinned = unsafe { Pin::new_unchecked(self) };
Expand Down Expand Up @@ -249,7 +252,7 @@ mod tests {

fn encode(
&mut self,
item: Self::Item,
item: &Self::Item,
buf: &mut Vec<u8>,
) -> Result<(), Self::Error> {
buf.extend(&item.to_be_bytes());
Expand All @@ -263,8 +266,8 @@ mod tests {
let mut framed =
FramedWrite::new(buf.by_ref().writer(), U32Encoder);

Sink::send(&mut framed, 42).expect("");
Sink::send(&mut framed, 0xff_12_34).expect("");
Sink::send(&mut framed, &42).expect("");
Sink::send(&mut framed, &0xff_12_34).expect("");

assert_eq!(buf.get(), &[0, 0, 0, 42, 0, 0xff, 0x12, 0x34, 0]);
}
Expand All @@ -275,10 +278,10 @@ mod tests {
let mut framed =
FramedWrite::new(buf.by_ref().writer(), U32Encoder);

Sink::send(&mut framed, 42).expect("");
Sink::send(&mut framed, 0xff_12_34).expect("");
Sink::send(&mut framed, &42).expect("");
Sink::send(&mut framed, &0xff_12_34).expect("");
assert!(matches!(
Sink::send(&mut framed, 1),
Sink::send(&mut framed, &1),
Err(FramedWriteError::Io(_))
));
}
Expand Down
10 changes: 5 additions & 5 deletions channels-io/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,27 @@ use self::send::Send;
/// TODO: docs
pub trait Sink {
/// TODO: docs
type Item;
type Item: ?Sized;

/// TODO: docs
type Error;

/// TODO: docs
fn send(&mut self, item: Self::Item) -> Result<(), Self::Error>;
fn send(&mut self, item: &Self::Item) -> Result<(), Self::Error>;
}

/// TODO: docs
pub trait AsyncSink {
/// TODO: docs
type Item;
type Item: ?Sized;

/// TODO: docs
type Error;

/// TODO: docs
fn start_send(
self: Pin<&mut Self>,
item: Self::Item,
item: &Self::Item,
) -> Result<(), Self::Error>;

/// TODO: docs
Expand All @@ -39,7 +39,7 @@ pub trait AsyncSink {
) -> Poll<Result<(), Self::Error>>;

/// TODO: docs
fn send(&mut self, item: Self::Item) -> Send<'_, Self>
fn send<'a>(&'a mut self, item: &'a Self::Item) -> Send<'a, Self>
where
Self: Unpin,
Self::Item: Unpin,
Expand Down
4 changes: 2 additions & 2 deletions channels-io/src/sink/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ where
S::Item: Unpin,
{
sink: &'a mut S,
item: Option<S::Item>,
item: Option<&'a S::Item>,
}

impl<'a, S> Send<'a, S>
where
S: AsyncSink + Unpin + ?Sized,
S::Item: Unpin,
{
pub fn new(sink: &'a mut S, item: S::Item) -> Self {
pub fn new(sink: &'a mut S, item: &'a S::Item) -> Self {
Self { sink, item: Some(item) }
}
}
Expand Down
110 changes: 85 additions & 25 deletions channels/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,35 @@
use core::fmt::{self, Debug, Display};

use channels_io::framed::{FramedReadError, FramedWriteError};

/// TODO: docs
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum EncodeError {
/// Frame is too large.
TooLarge,
}

impl fmt::Display for EncodeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::TooLarge => f.write_str("data too large"),
}
}
}

#[cfg(feature = "std")]
impl std::error::Error for EncodeError {}

/// The error type returned by [`Sender`].
///
/// [`Sender`]: crate::Sender
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum SendError<Ser, Io> {
/// The underlying protocol could not encode the provided data.
Encode(EncodeError),

/// The serializer has encountered an error while trying to
/// serialize/deserialize the data. This error is usually recoverable and
/// the channel might still be able to be used normally.
Expand All @@ -17,9 +40,6 @@ pub enum SendError<Ser, Io> {
/// sent/received. This error is recoverable and the channel can continue to
/// be used normally.
Serde(Ser),

/// TODO: docs
TooLarge,
}

impl<Ser, Io> Display for SendError<Ser, Io>
Expand All @@ -29,9 +49,9 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Encode(e) => Display::fmt(e, f),
Self::Io(e) => Display::fmt(e, f),
Self::Serde(e) => Display::fmt(e, f),
Self::TooLarge => f.write_str("data too large"),
}
}
}
Expand All @@ -42,24 +62,28 @@ impl<Ser, Io> std::error::Error for SendError<Ser, Io> where
{
}

/// The error type returned by [`Receiver`].
///
/// [`Receiver`]: crate::Receiver
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum RecvError<Des, Io> {
/// The underlying transport has returned an error while the data was being
/// sent/received.
Io(Io),

/// The serializer has encountered an error while trying to
/// serialize/deserialize the data.
Serde(Des),
impl<Ser, Io> From<FramedWriteError<EncodeError, Io>>
for SendError<Ser, Io>
{
fn from(value: FramedWriteError<EncodeError, Io>) -> Self {
match value {
FramedWriteError::Encode(x) => Self::Encode(x),
FramedWriteError::Io(x) => Self::Io(x),
}
}
}

/// TODO: docs
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DecodeError {
/// The underlying transport is not reliable and the sent data has suffered
/// modification and/or corruption.
InvalidChecksum,

/// The underlying transport is not reliable and the sent data has been
/// received in the wrong order.
OutOfOrder,

/// The received data exceeded the maximum amount of data the receiver was
/// configured to receive. This error indicates either that: a) you must
/// configure the receiver to allow larger payloads with [`max_size()`], or
Expand All @@ -68,24 +92,49 @@ pub enum RecvError<Des, Io> {
/// [`max_size()`]: crate::receiver::Config::max_size()
TooLarge,

/// The underlying transport is not reliable and the sent data has been
/// received in the wrong order.
OutOfOrder,

/// The 2 peers are not using the same protocol version. This means that
/// each end is not using the same version of the crate.
VersionMismatch,
}

impl Display for DecodeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidChecksum => f.write_str("invalid checksum"),
Self::OutOfOrder => f.write_str("data out of order"),
Self::TooLarge => f.write_str("data too large"),
Self::VersionMismatch => f.write_str("version mismatch"),
}
}
}

#[cfg(feature = "std")]
impl std::error::Error for DecodeError {}

/// The error type returned by [`Receiver`].
///
/// [`Receiver`]: crate::Receiver
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum RecvError<Des, Io> {
/// The underlying protocol could not decode the received data.
Decode(DecodeError),

/// The underlying transport has returned an error while the data was being
/// sent/received.
Io(Io),

/// The serializer has encountered an error while trying to
/// serialize/deserialize the data.
Serde(Des),
}

impl<Des: Display, Io: Display> Display for RecvError<Des, Io> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Decode(e) => Display::fmt(e, f),
Self::Io(e) => Display::fmt(e, f),
Self::Serde(e) => Display::fmt(e, f),
Self::InvalidChecksum => f.write_str("corrupted data"),
Self::OutOfOrder => f.write_str("data out of order"),
Self::TooLarge => f.write_str("payload too large"),
Self::VersionMismatch => f.write_str("version mismatch"),
}
}
}
Expand All @@ -95,3 +144,14 @@ impl<Des, Io> std::error::Error for RecvError<Des, Io> where
Self: Debug + Display
{
}

impl<Des, Io> From<FramedReadError<DecodeError, Io>>
for RecvError<Des, Io>
{
fn from(value: FramedReadError<DecodeError, Io>) -> Self {
match value {
FramedReadError::Decode(x) => Self::Decode(x),
FramedReadError::Io(x) => Self::Io(x),
}
}
}
Loading

0 comments on commit 496ba03

Please sign in to comment.