Skip to content

Commit

Permalink
wasi-http: make the buffer and budget capacity of the OutgoingBody wr…
Browse files Browse the repository at this point in the history
…iter configurable
  • Loading branch information
iawia002 committed Nov 28, 2024
1 parent 6691006 commit edd134d
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 26 deletions.
11 changes: 7 additions & 4 deletions crates/test-programs/src/bin/http_outbound_request_large_post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use std::io::{self, Read};
use test_programs::wasi::http::types::{Method, Scheme};

fn main() {
// TODO: ensure more than 700 bytes is allowed without error
const LEN: usize = 700;
// Make sure the final body is larger than 1024*1024, but we cannot allocate
// so much memory directly in the wasm program, so we use the `repeat`
// method to increase the body size.
const LEN: usize = 1024;
const REPEAT: usize = 1025;
let mut buffer = [0; LEN];
let addr = std::env::var("HTTP_SERVER").unwrap();
io::repeat(0b001).read_exact(&mut buffer).unwrap();
Expand All @@ -13,7 +16,7 @@ fn main() {
Scheme::Http,
&addr,
"/post",
Some(&buffer),
Some(&buffer.repeat(REPEAT)),
None,
None,
None,
Expand All @@ -26,5 +29,5 @@ fn main() {
assert_eq!(res.status, 200);
let method = res.header("x-wasmtime-test-method").unwrap();
assert_eq!(std::str::from_utf8(method).unwrap(), "POST");
assert_eq!(res.body.len(), LEN);
assert_eq!(res.body.len(), LEN * REPEAT);
}
29 changes: 14 additions & 15 deletions crates/test-programs/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ pub fn request(
.body()
.map_err(|_| anyhow!("outgoing request write failed"))?;

let options = http_types::RequestOptions::new();
options
.set_connect_timeout(connect_timeout)
.map_err(|()| anyhow!("failed to set connect_timeout"))?;
options
.set_first_byte_timeout(first_by_timeout)
.map_err(|()| anyhow!("failed to set first_byte_timeout"))?;
options
.set_between_bytes_timeout(between_bytes_timeout)
.map_err(|()| anyhow!("failed to set between_bytes_timeout"))?;
let options = Some(options);

let future_response = outgoing_handler::handle(request, options)?;

if let Some(mut buf) = body {
let request_body = outgoing_body
.write()
Expand Down Expand Up @@ -110,21 +124,6 @@ pub fn request(
Err(_) => anyhow::bail!("output stream error"),
};
}

let options = http_types::RequestOptions::new();
options
.set_connect_timeout(connect_timeout)
.map_err(|()| anyhow!("failed to set connect_timeout"))?;
options
.set_first_byte_timeout(first_by_timeout)
.map_err(|()| anyhow!("failed to set first_byte_timeout"))?;
options
.set_between_bytes_timeout(between_bytes_timeout)
.map_err(|()| anyhow!("failed to set between_bytes_timeout"))?;
let options = Some(options);

let future_response = outgoing_handler::handle(request, options)?;

http_types::OutgoingBody::finish(outgoing_body, None)?;

let incoming_response = match future_response.get() {
Expand Down
14 changes: 9 additions & 5 deletions crates/wasi-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,12 @@ pub struct HostOutgoingBody {

impl HostOutgoingBody {
/// Create a new `HostOutgoingBody`
pub fn new(context: StreamContext, size: Option<u64>) -> (Self, HyperOutgoingBody) {
pub fn new(
context: StreamContext,
size: Option<u64>,
buffer_chunks: usize,
chunk_size: usize,
) -> (Self, HyperOutgoingBody) {
let written = size.map(WrittenState::new);

use tokio::sync::oneshot::error::RecvError;
Expand Down Expand Up @@ -469,17 +474,16 @@ impl HostOutgoingBody {
}
}

let (body_sender, body_receiver) = mpsc::channel(2);
// always add 1 buffer here because one empty slot is required
let (body_sender, body_receiver) = mpsc::channel(buffer_chunks + 1);
let (finish_sender, finish_receiver) = oneshot::channel();
let body_impl = BodyImpl {
body_receiver,
finish_receiver: Some(finish_receiver),
}
.boxed();

// TODO: this capacity constant is arbitrary, and should be configurable
let output_stream =
BodyWriteStream::new(context, 1024 * 1024, body_sender, written.clone());
let output_stream = BodyWriteStream::new(context, chunk_size, body_sender, written.clone());

(
Self {
Expand Down
37 changes: 37 additions & 0 deletions crates/wasi-http/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,19 @@ pub trait WasiHttpView: Send {
fn is_forbidden_header(&mut self, _name: &HeaderName) -> bool {
false
}

/// Number of distinct write calls to the outgoing body's output-stream
/// that the implementation will buffer.
/// Default: 1.
fn outgoing_body_buffer_chunks(&mut self) -> usize {
1
}

/// Maximum size allowed in a write call to the outgoing body's output-stream.
/// Default: 1024 * 1024.
fn outgoing_body_chunk_size(&mut self) -> usize {
1024 * 1024
}
}

impl<T: ?Sized + WasiHttpView> WasiHttpView for &mut T {
Expand Down Expand Up @@ -156,6 +169,14 @@ impl<T: ?Sized + WasiHttpView> WasiHttpView for &mut T {
fn is_forbidden_header(&mut self, name: &HeaderName) -> bool {
T::is_forbidden_header(self, name)
}

fn outgoing_body_buffer_chunks(&mut self) -> usize {
T::outgoing_body_buffer_chunks(self)
}

fn outgoing_body_chunk_size(&mut self) -> usize {
T::outgoing_body_chunk_size(self)
}
}

impl<T: ?Sized + WasiHttpView> WasiHttpView for Box<T> {
Expand Down Expand Up @@ -187,6 +208,14 @@ impl<T: ?Sized + WasiHttpView> WasiHttpView for Box<T> {
fn is_forbidden_header(&mut self, name: &HeaderName) -> bool {
T::is_forbidden_header(self, name)
}

fn outgoing_body_buffer_chunks(&mut self) -> usize {
T::outgoing_body_buffer_chunks(self)
}

fn outgoing_body_chunk_size(&mut self) -> usize {
T::outgoing_body_chunk_size(self)
}
}

/// A concrete structure that all generated `Host` traits are implemented for.
Expand Down Expand Up @@ -233,6 +262,14 @@ impl<T: WasiHttpView> WasiHttpView for WasiHttpImpl<T> {
fn is_forbidden_header(&mut self, name: &HeaderName) -> bool {
self.0.is_forbidden_header(name)
}

fn outgoing_body_buffer_chunks(&mut self) -> usize {
self.0.outgoing_body_buffer_chunks()
}

fn outgoing_body_chunk_size(&mut self) -> usize {
self.0.outgoing_body_chunk_size()
}
}

/// Returns `true` when the header is forbidden according to this [`WasiHttpView`] implementation.
Expand Down
10 changes: 8 additions & 2 deletions crates/wasi-http/src/types_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ where
&mut self,
request: Resource<HostOutgoingRequest>,
) -> wasmtime::Result<Result<Resource<HostOutgoingBody>, ()>> {
let buffer_chunks = self.outgoing_body_buffer_chunks();
let chunk_size = self.outgoing_body_chunk_size();
let req = self
.table()
.get_mut(&request)
Expand All @@ -405,7 +407,8 @@ where
Err(e) => return Ok(Err(e)),
};

let (host_body, hyper_body) = HostOutgoingBody::new(StreamContext::Request, size);
let (host_body, hyper_body) =
HostOutgoingBody::new(StreamContext::Request, size, buffer_chunks, chunk_size);

req.body = Some(hyper_body);

Expand Down Expand Up @@ -751,6 +754,8 @@ where
&mut self,
id: Resource<HostOutgoingResponse>,
) -> wasmtime::Result<Result<Resource<HostOutgoingBody>, ()>> {
let buffer_chunks = self.outgoing_body_buffer_chunks();
let chunk_size = self.outgoing_body_chunk_size();
let resp = self.table().get_mut(&id)?;

if resp.body.is_some() {
Expand All @@ -762,7 +767,8 @@ where
Err(e) => return Ok(Err(e)),
};

let (host, body) = HostOutgoingBody::new(StreamContext::Response, size);
let (host, body) =
HostOutgoingBody::new(StreamContext::Response, size, buffer_chunks, chunk_size);

resp.body.replace(body);

Expand Down

0 comments on commit edd134d

Please sign in to comment.