Skip to content

Commit e4081c9

Browse files
committed
Add channel body
1 parent c58b641 commit e4081c9

File tree

3 files changed

+171
-0
lines changed

3 files changed

+171
-0
lines changed

http-body-util/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,20 @@ Combinators and adapters for HTTP request or response bodies.
2525
keywords = ["http"]
2626
categories = ["web-programming"]
2727

28+
[features]
29+
default = []
30+
channel = ["dep:tokio"]
31+
full = ["channel"]
32+
2833
[dependencies]
2934
bytes = "1"
3035
futures-util = { version = "0.3.14", default-features = false, features = ["alloc"] }
3136
http = "1"
3237
http-body = { version = "1", path = "../http-body" }
3338
pin-project-lite = "0.2"
3439

40+
# optional dependencies
41+
tokio = { version = "1", features = ["sync"], optional = true }
42+
3543
[dev-dependencies]
3644
tokio = { version = "1", features = ["macros", "rt"] }

http-body-util/src/channel.rs

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
use std::{
2+
fmt::Display,
3+
pin::Pin,
4+
task::{Context, Poll},
5+
};
6+
7+
use bytes::Buf;
8+
use http::HeaderMap;
9+
use http_body::{Body, Frame};
10+
use tokio::sync::mpsc;
11+
12+
/// Create a body backed by a channel.
13+
pub struct Channel<D, E = std::convert::Infallible> {
14+
rx_frame: mpsc::Receiver<Frame<D>>,
15+
rx_error: mpsc::Receiver<E>,
16+
}
17+
18+
impl<D, E> Channel<D, E> {
19+
/// Create a new channel body.
20+
///
21+
/// The channel will buffer up to the provided number of messages. Once the buffer is full,
22+
/// attempts to send new messages will wait until a message is received from the channel. The
23+
/// provided buffer capacity must be at least 1.
24+
pub fn new(buffer: usize) -> (Sender<D, E>, Self) {
25+
let (tx_frame, rx_frame) = mpsc::channel(buffer);
26+
let (tx_error, rx_error) = mpsc::channel(1);
27+
(Sender { tx_frame, tx_error }, Self { rx_frame, rx_error })
28+
}
29+
}
30+
31+
impl<D, E> Body for Channel<D, E>
32+
where
33+
D: Buf,
34+
{
35+
type Data = D;
36+
type Error = E;
37+
38+
fn poll_frame(
39+
mut self: Pin<&mut Self>,
40+
cx: &mut Context<'_>,
41+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
42+
match self.rx_frame.poll_recv(cx) {
43+
Poll::Ready(frame) => return Poll::Ready(frame.map(Ok)),
44+
Poll::Pending => {}
45+
}
46+
47+
match self.rx_error.poll_recv(cx) {
48+
Poll::Ready(err) => return Poll::Ready(err.map(Err)),
49+
Poll::Pending => {}
50+
}
51+
52+
Poll::Pending
53+
}
54+
}
55+
56+
impl<D, E> std::fmt::Debug for Channel<D, E> {
57+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58+
f.debug_struct("Channel")
59+
.field("rx_frame", &self.rx_frame)
60+
.field("rx_error", &self.rx_error)
61+
.finish()
62+
}
63+
}
64+
65+
/// A sender half created through [`Channel::new`].
66+
pub struct Sender<D, E = std::convert::Infallible> {
67+
tx_frame: mpsc::Sender<Frame<D>>,
68+
tx_error: mpsc::Sender<E>,
69+
}
70+
71+
impl<D, E> Sender<D, E> {
72+
/// Send a frame on the channel.
73+
pub async fn send(&self, frame: Frame<D>) -> Result<(), SendError> {
74+
self.tx_frame.send(frame).await.map_err(|_| SendError)
75+
}
76+
77+
/// Send data on data channel.
78+
pub async fn send_data(&self, buf: D) -> Result<(), SendError> {
79+
self.send(Frame::data(buf)).await
80+
}
81+
82+
/// Send trailers on trailers channel.
83+
pub async fn send_trailers(&self, trailers: HeaderMap) -> Result<(), SendError> {
84+
self.send(Frame::trailers(trailers)).await
85+
}
86+
87+
/// Aborts the body in an abnormal fashion.
88+
pub fn abort(self, error: E) {
89+
match self.tx_error.try_send(error) {
90+
Ok(_) => {}
91+
Err(err) => {
92+
match err {
93+
mpsc::error::TrySendError::Full(_) => {
94+
// Channel::new creates the error channel with space for 1 message and we
95+
// only send once because this method consumes `self`. So the receiver
96+
// can't be full.
97+
unreachable!("error receiver should never be full")
98+
}
99+
mpsc::error::TrySendError::Closed(_) => {}
100+
}
101+
}
102+
}
103+
}
104+
}
105+
106+
impl<D, E> std::fmt::Debug for Sender<D, E> {
107+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108+
f.debug_struct("Sender")
109+
.field("tx_frame", &self.tx_frame)
110+
.field("tx_error", &self.tx_error)
111+
.finish()
112+
}
113+
}
114+
115+
/// The error returned if [`Sender`] fails to send because the receiver is closed.
116+
#[derive(Debug)]
117+
#[non_exhaustive]
118+
pub struct SendError;
119+
120+
impl Display for SendError {
121+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122+
write!(f, "failed to send frame")
123+
}
124+
}
125+
126+
impl std::error::Error for SendError {}
127+
128+
#[cfg(test)]
129+
mod tests {
130+
use bytes::Bytes;
131+
use http::{HeaderName, HeaderValue};
132+
133+
use crate::BodyExt;
134+
135+
use super::*;
136+
137+
#[tokio::test]
138+
async fn works() {
139+
let (tx, body) = Channel::<Bytes>::new(1024);
140+
141+
tokio::spawn(async move {
142+
tx.send_data(Bytes::from("Hel")).await.unwrap();
143+
tx.send_data(Bytes::from("lo!")).await.unwrap();
144+
145+
let mut trailers = HeaderMap::new();
146+
trailers.insert(
147+
HeaderName::from_static("foo"),
148+
HeaderValue::from_static("bar"),
149+
);
150+
tx.send_trailers(trailers).await.unwrap();
151+
});
152+
153+
let collected = body.collect().await.unwrap();
154+
assert_eq!(collected.trailers().unwrap()["foo"], "bar");
155+
assert_eq!(collected.to_bytes(), "Hello!");
156+
}
157+
}

http-body-util/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ mod full;
2020
mod limited;
2121
mod stream;
2222

23+
#[cfg(feature = "channel")]
24+
pub mod channel;
25+
2326
mod util;
2427

2528
use self::combinators::{BoxBody, MapErr, MapFrame, UnsyncBoxBody};
@@ -31,6 +34,9 @@ pub use self::full::Full;
3134
pub use self::limited::{LengthLimitError, Limited};
3235
pub use self::stream::{BodyStream, StreamBody};
3336

37+
#[cfg(feature = "channel")]
38+
pub use self::channel::Channel;
39+
3440
/// An extension trait for [`http_body::Body`] adding various combinators and adapters
3541
pub trait BodyExt: http_body::Body {
3642
/// Returns a future that resolves to the next [`Frame`], if any.

0 commit comments

Comments
 (0)