From ca9557646aac2d625a6d1d040e6caf065d0340f6 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Wed, 22 Jan 2025 11:27:15 -0800 Subject: [PATCH] Split off wasmtime-wasi-io crate from wasmtime-wasi (#10036) * stub: wasmtime-wasi-io crate * wasmtime: component::ResourceTableError now impls core::error::Error for compatibility without std * relocate much of the wasi-io impl into wasmtime-wasi-io * stump of poll that uses in_tokio * finish moving instances over to wasmtime_wasi_io * redirect wasmtime_wasi's bindgen properly over to wasmtime_wasi_io * wasmtime-wasi-http: point directly at wasmtime_wasi_io in sources it worked without these changes because all the re-exports are in the right places, but this is nice to do * comment work * fix streams rename, migrate bindings to its own file * move wasi-io impls into their own mod with appropriate name. check in CI. * change ResourceTable::iter_entries from taking a HashMap to BTreeMap so it works without std * crate-level docs for wasmtime-wasi-io * more docs * more docs, wasi-io gives an add_to_linker function for async only * wasi-io: inline view into lib.rs. improve docs. * more streams vs stream fixes... * wasi-http stream->streams fixes * fix adding wasmtime-wasi-io to public crates * wasmtime-cli: drop overzealous `=` version constraint on wasmtime-wasi-http wasmtime-wasi-http is part of the public API where we guarantee semver is obeyed * fix doctest * mechanically rename the wasi-io pub traits, and resource types resource type Pollable -> DynPollable resource type InputStream -> DynInputStream resource type OutputStream -> DynOutputStream trait Subscribe -> Pollable trait HostInputStream -> InputStream trait HostOutputStream -> OutputStream type alias PollableFuture -> DynFuture (little-used) * delete unused ClosureFuture alias * doc fixes * wasmtime-wasi-http: use all of wasmtime-wasi-io through wasmtime-wasi re-exports * fix nostd build * missing separator. i love yml * make wasmtime-wasi-io #![no_std] --- .github/workflows/main.yml | 3 +- Cargo.lock | 12 + Cargo.toml | 5 +- ci/vendor-wit.sh | 4 + crates/wasi-http/src/bindings.rs | 8 +- crates/wasi-http/src/body.rs | 16 +- crates/wasi-http/src/error.rs | 2 +- crates/wasi-http/src/lib.rs | 17 +- crates/wasi-http/src/types.rs | 4 +- crates/wasi-http/src/types_impl.rs | 17 +- crates/wasi-io/Cargo.toml | 30 ++ crates/wasi-io/src/bindings.rs | 29 ++ crates/wasi-io/src/impls.rs | 297 +++++++++++++++ crates/wasi-io/src/lib.rs | 183 ++++++++++ crates/wasi-io/src/poll.rs | 120 ++++++ .../src/stream.rs => wasi-io/src/streams.rs} | 23 +- crates/wasi-io/wit/deps/io/error.wit | 34 ++ crates/wasi-io/wit/deps/io/poll.wit | 47 +++ crates/wasi-io/wit/deps/io/streams.wit | 290 +++++++++++++++ crates/wasi-io/wit/deps/io/world.wit | 10 + crates/wasi-io/wit/world.wit | 6 + crates/wasi/Cargo.toml | 1 + crates/wasi/src/bindings.rs | 38 +- crates/wasi/src/ctx.rs | 25 +- crates/wasi/src/filesystem.rs | 12 +- crates/wasi/src/host/clocks.rs | 15 +- crates/wasi/src/host/filesystem.rs | 14 +- crates/wasi/src/host/io.rs | 344 +++--------------- crates/wasi/src/host/tcp.rs | 21 +- crates/wasi/src/host/udp.rs | 24 +- crates/wasi/src/ip_name_lookup.rs | 6 +- crates/wasi/src/lib.rs | 126 +++++-- crates/wasi/src/pipe.rs | 34 +- crates/wasi/src/poll.rs | 242 +----------- crates/wasi/src/preview1.rs | 13 +- crates/wasi/src/stdio.rs | 94 +++-- crates/wasi/src/stdio/worker_thread_stdin.rs | 14 +- crates/wasi/src/tcp.rs | 26 +- crates/wasi/src/udp.rs | 4 +- crates/wasi/src/view.rs | 79 ++-- crates/wasi/src/write_stream.rs | 10 +- crates/wasi/tests/all/api.rs | 2 +- crates/wasi/tests/process_stdin.rs | 2 +- .../src/runtime/component/resource_table.rs | 10 +- scripts/publish.rs | 3 + src/commands/serve.rs | 6 +- 46 files changed, 1505 insertions(+), 817 deletions(-) create mode 100644 crates/wasi-io/Cargo.toml create mode 100644 crates/wasi-io/src/bindings.rs create mode 100644 crates/wasi-io/src/impls.rs create mode 100644 crates/wasi-io/src/lib.rs create mode 100644 crates/wasi-io/src/poll.rs rename crates/{wasi/src/stream.rs => wasi-io/src/streams.rs} (94%) create mode 100644 crates/wasi-io/wit/deps/io/error.wit create mode 100644 crates/wasi-io/wit/deps/io/poll.wit create mode 100644 crates/wasi-io/wit/deps/io/streams.wit create mode 100644 crates/wasi-io/wit/deps/io/world.wit create mode 100644 crates/wasi-io/wit/world.wit diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index f1e67d490f70..1010c644c60a 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -541,7 +541,8 @@ jobs: cargo check -p wasmtime --no-default-features --features runtime,component-model && cargo check -p wasmtime --no-default-features --features runtime,gc,component-model && cargo check -p cranelift-control --no-default-features && - cargo check -p pulley-interpreter --features encode,decode,disas,interp + cargo check -p pulley-interpreter --features encode,decode,disas,interp && + cargo check -p wasmtime-wasi-io --no-default-features # Use `cross` for illumos to have a C compiler/linker available. - target: x86_64-unknown-illumos os: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 8dfad6436a54..4aec396b2f42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4592,6 +4592,7 @@ dependencies = [ "tracing-subscriber", "url", "wasmtime", + "wasmtime-wasi-io", "wiggle", "windows-sys 0.59.0", ] @@ -4633,6 +4634,17 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "wasmtime-wasi-io" +version = "30.0.0" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "futures", + "wasmtime", +] + [[package]] name = "wasmtime-wasi-keyvalue" version = "30.0.0" diff --git a/Cargo.toml b/Cargo.toml index 5b59df6b36ed..1f778f93828a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -230,7 +230,8 @@ wasmtime-fiber = { path = "crates/fiber", version = "=30.0.0" } wasmtime-jit-debug = { path = "crates/jit-debug", version = "=30.0.0" } wasmtime-wast = { path = "crates/wast", version = "=30.0.0" } wasmtime-wasi = { path = "crates/wasi", version = "30.0.0", default-features = false } -wasmtime-wasi-http = { path = "crates/wasi-http", version = "=30.0.0", default-features = false } +wasmtime-wasi-io = { path = "crates/wasi-io", version = "30.0.0", default-features = false } +wasmtime-wasi-http = { path = "crates/wasi-http", version = "30.0.0", default-features = false } wasmtime-wasi-nn = { path = "crates/wasi-nn", version = "30.0.0" } wasmtime-wasi-config = { path = "crates/wasi-config", version = "30.0.0" } wasmtime-wasi-keyvalue = { path = "crates/wasi-keyvalue", version = "30.0.0" } @@ -355,7 +356,7 @@ hyper = "1.0.1" http = "1.0.0" http-body = "1.0.0" http-body-util = "0.1.0" -bytes = "1.4" +bytes = { version = "1.4", default-features = false } futures = { version = "0.3.27", default-features = false } indexmap = { version = "2.0.0", default-features = false } pretty_env_logger = "0.5.0" diff --git a/ci/vendor-wit.sh b/ci/vendor-wit.sh index 252fb9a863e5..b71ca579a70a 100755 --- a/ci/vendor-wit.sh +++ b/ci/vendor-wit.sh @@ -36,6 +36,10 @@ make_vendor() { cache_dir=$(mktemp -d) +make_vendor "wasi-io" " + io@v0.2.3 +" + make_vendor "wasi" " cli@v0.2.3 clocks@v0.2.3 diff --git a/crates/wasi-http/src/bindings.rs b/crates/wasi-http/src/bindings.rs index 00054e5a644b..f88fd1216f7f 100644 --- a/crates/wasi-http/src/bindings.rs +++ b/crates/wasi-http/src/bindings.rs @@ -59,9 +59,11 @@ pub mod sync { tracing: true, async: false, with: { - "wasi:http": crate::bindings::http, // http is in this crate - "wasi:io": wasmtime_wasi::bindings::sync::io, // io is sync - "wasi": wasmtime_wasi::bindings, // everything else + // http is in this crate + "wasi:http": crate::bindings::http, + // sync requires the wrapper in the wasmtime_wasi crate, in + // order to have in_tokio + "wasi:io": wasmtime_wasi::bindings::sync::io, }, require_store_data_send: true, }); diff --git a/crates/wasi-http/src/body.rs b/crates/wasi-http/src/body.rs index 96c783964746..8adab1352a22 100644 --- a/crates/wasi-http/src/body.rs +++ b/crates/wasi-http/src/body.rs @@ -13,7 +13,7 @@ use std::{pin::Pin, sync::Arc, time::Duration}; use tokio::sync::{mpsc, oneshot}; use wasmtime_wasi::{ runtime::{poll_noop, AbortOnDropJoinHandle}, - HostInputStream, HostOutputStream, StreamError, Subscribe, + InputStream, OutputStream, Pollable, StreamError, }; /// Common type for incoming bodies. @@ -234,7 +234,7 @@ enum IncomingBodyStreamState { } #[async_trait::async_trait] -impl HostInputStream for HostIncomingBodyStream { +impl InputStream for HostIncomingBodyStream { fn read(&mut self, size: usize) -> Result { loop { // Handle buffered data/errors if any @@ -271,7 +271,7 @@ impl HostInputStream for HostIncomingBodyStream { } #[async_trait::async_trait] -impl Subscribe for HostIncomingBodyStream { +impl Pollable for HostIncomingBodyStream { async fn ready(&mut self) { if !self.buffer.is_empty() || self.error.is_some() { return; @@ -327,7 +327,7 @@ pub enum HostFutureTrailers { } #[async_trait::async_trait] -impl Subscribe for HostFutureTrailers { +impl Pollable for HostFutureTrailers { async fn ready(&mut self) { let body = match self { HostFutureTrailers::Waiting(body) => body, @@ -415,7 +415,7 @@ impl WrittenState { /// The concrete type behind a `wasi:http/types/outgoing-body` resource. pub struct HostOutgoingBody { /// The output stream that the body is written to. - body_output_stream: Option>, + body_output_stream: Option>, context: StreamContext, written: Option, finish_sender: Option>, @@ -499,7 +499,7 @@ impl HostOutgoingBody { } /// Take the output stream, if it's available. - pub fn take_output_stream(&mut self) -> Option> { + pub fn take_output_stream(&mut self) -> Option> { self.body_output_stream.take() } @@ -605,7 +605,7 @@ impl BodyWriteStream { } #[async_trait::async_trait] -impl HostOutputStream for BodyWriteStream { +impl OutputStream for BodyWriteStream { fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> { let len = bytes.len(); match self.writer.try_send(bytes) { @@ -665,7 +665,7 @@ impl HostOutputStream for BodyWriteStream { } #[async_trait::async_trait] -impl Subscribe for BodyWriteStream { +impl Pollable for BodyWriteStream { async fn ready(&mut self) { // Attempt to perform a reservation for a send. If there's capacity in // the channel or it's already closed then this will return immediately. diff --git a/crates/wasi-http/src/error.rs b/crates/wasi-http/src/error.rs index 1c60a4b0771e..c2f440975d6d 100644 --- a/crates/wasi-http/src/error.rs +++ b/crates/wasi-http/src/error.rs @@ -1,7 +1,7 @@ use crate::bindings::http::types::ErrorCode; use std::error::Error; use std::fmt; -use wasmtime_wasi::ResourceTableError; +use wasmtime::component::ResourceTableError; /// A [`Result`] type where the error type defaults to [`HttpError`]. pub type HttpResult = Result; diff --git a/crates/wasi-http/src/lib.rs b/crates/wasi-http/src/lib.rs index 9c57a6b6a974..0a9544f7c4f8 100644 --- a/crates/wasi-http/src/lib.rs +++ b/crates/wasi-http/src/lib.rs @@ -285,12 +285,13 @@ where T: WasiHttpView + wasmtime_wasi::WasiView, { let io_closure = type_annotate_io::(|t| wasmtime_wasi::IoImpl(t)); - let closure = type_annotate_wasi::(|t| wasmtime_wasi::WasiImpl(wasmtime_wasi::IoImpl(t))); - wasmtime_wasi::bindings::clocks::wall_clock::add_to_linker_get_host(l, closure)?; - wasmtime_wasi::bindings::clocks::monotonic_clock::add_to_linker_get_host(l, closure)?; wasmtime_wasi::bindings::io::poll::add_to_linker_get_host(l, io_closure)?; wasmtime_wasi::bindings::io::error::add_to_linker_get_host(l, io_closure)?; wasmtime_wasi::bindings::io::streams::add_to_linker_get_host(l, io_closure)?; + + let closure = type_annotate_wasi::(|t| wasmtime_wasi::WasiImpl(wasmtime_wasi::IoImpl(t))); + wasmtime_wasi::bindings::clocks::wall_clock::add_to_linker_get_host(l, closure)?; + wasmtime_wasi::bindings::clocks::monotonic_clock::add_to_linker_get_host(l, closure)?; wasmtime_wasi::bindings::cli::stdin::add_to_linker_get_host(l, closure)?; wasmtime_wasi::bindings::cli::stdout::add_to_linker_get_host(l, closure)?; wasmtime_wasi::bindings::cli::stderr::add_to_linker_get_host(l, closure)?; @@ -383,13 +384,17 @@ where T: WasiHttpView + wasmtime_wasi::WasiView, { let io_closure = type_annotate_io::(|t| wasmtime_wasi::IoImpl(t)); + // For the sync linker, use the definitions of poll and streams from the + // wasmtime_wasi::bindings::sync space because those are defined using in_tokio. + wasmtime_wasi::bindings::sync::io::poll::add_to_linker_get_host(l, io_closure)?; + wasmtime_wasi::bindings::sync::io::streams::add_to_linker_get_host(l, io_closure)?; + // The error interface in the wasmtime_wasi is synchronous + wasmtime_wasi::bindings::io::error::add_to_linker_get_host(l, io_closure)?; + let closure = type_annotate_wasi::(|t| wasmtime_wasi::WasiImpl(wasmtime_wasi::IoImpl(t))); wasmtime_wasi::bindings::clocks::wall_clock::add_to_linker_get_host(l, closure)?; wasmtime_wasi::bindings::clocks::monotonic_clock::add_to_linker_get_host(l, closure)?; - wasmtime_wasi::bindings::sync::io::poll::add_to_linker_get_host(l, io_closure)?; - wasmtime_wasi::bindings::sync::io::streams::add_to_linker_get_host(l, io_closure)?; - wasmtime_wasi::bindings::io::error::add_to_linker_get_host(l, io_closure)?; wasmtime_wasi::bindings::cli::stdin::add_to_linker_get_host(l, closure)?; wasmtime_wasi::bindings::cli::stdout::add_to_linker_get_host(l, closure)?; wasmtime_wasi::bindings::cli::stderr::add_to_linker_get_host(l, closure)?; diff --git a/crates/wasi-http/src/types.rs b/crates/wasi-http/src/types.rs index 35bda46f3917..7a7b518db5ee 100644 --- a/crates/wasi-http/src/types.rs +++ b/crates/wasi-http/src/types.rs @@ -18,7 +18,7 @@ use std::time::Duration; use tokio::net::TcpStream; use tokio::time::timeout; use wasmtime::component::{Resource, ResourceTable}; -use wasmtime_wasi::{runtime::AbortOnDropJoinHandle, IoImpl, IoView, Subscribe}; +use wasmtime_wasi::{runtime::AbortOnDropJoinHandle, IoImpl, IoView, Pollable}; /// Capture the state necessary for use in the wasi-http API implementation. #[derive(Debug)] @@ -715,7 +715,7 @@ impl HostFutureIncomingResponse { } #[async_trait::async_trait] -impl Subscribe for HostFutureIncomingResponse { +impl Pollable for HostFutureIncomingResponse { async fn ready(&mut self) { if let Self::Pending(handle) = self { *self = Self::Ready(handle.await); diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index f3a83f85d6d7..8f27143467a2 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -13,11 +13,8 @@ use crate::{ use anyhow::Context; use std::any::Any; use std::str::FromStr; -use wasmtime::component::{Resource, ResourceTable}; -use wasmtime_wasi::{ - bindings::io::streams::{InputStream, OutputStream}, - IoView, Pollable, ResourceTableError, -}; +use wasmtime::component::{Resource, ResourceTable, ResourceTableError}; +use wasmtime_wasi::{DynInputStream, DynOutputStream, DynPollable, IoView}; impl crate::bindings::http::types::Host for WasiHttpImpl where @@ -662,7 +659,7 @@ where fn subscribe( &mut self, index: Resource, - ) -> wasmtime::Result> { + ) -> wasmtime::Result> { wasmtime_wasi::subscribe(self.table(), index) } @@ -704,11 +701,11 @@ where fn stream( &mut self, id: Resource, - ) -> wasmtime::Result, ()>> { + ) -> wasmtime::Result, ()>> { let body = self.table().get_mut(&id)?; if let Some(stream) = body.take_stream() { - let stream: InputStream = Box::new(stream); + let stream: DynInputStream = Box::new(stream); let stream = self.table().push_child(stream, &id)?; return Ok(Ok(stream)); } @@ -883,7 +880,7 @@ where fn subscribe( &mut self, id: Resource, - ) -> wasmtime::Result> { + ) -> wasmtime::Result> { wasmtime_wasi::subscribe(self.table(), id) } } @@ -895,7 +892,7 @@ where fn write( &mut self, id: Resource, - ) -> wasmtime::Result, ()>> { + ) -> wasmtime::Result, ()>> { let body = self.table().get_mut(&id)?; if let Some(stream) = body.take_output_stream() { let id = self.table().push_child(stream, &id)?; diff --git a/crates/wasi-io/Cargo.toml b/crates/wasi-io/Cargo.toml new file mode 100644 index 000000000000..70e379ea6a8f --- /dev/null +++ b/crates/wasi-io/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "wasmtime-wasi-io" +version.workspace = true +authors.workspace = true +description = "wasi-io common traits to be shared among other wasi implementations" +license = "Apache-2.0 WITH LLVM-exception" +categories = ["wasm"] +keywords = ["webassembly", "wasm"] +repository = "https://github.com/bytecodealliance/wasmtime" +edition.workspace = true +rust-version.workspace = true + +[lints] +workspace = true + +[dependencies] +wasmtime = { workspace = true, features = ["component-model", "async", "runtime"] } +anyhow = { workspace = true } +bytes = { workspace = true } +async-trait = { workspace = true } +futures = { workspace = true } + +[features] +default = [ "std" ] +std = [ + "bytes/std", + "anyhow/std", + "wasmtime/std", +] + diff --git a/crates/wasi-io/src/bindings.rs b/crates/wasi-io/src/bindings.rs new file mode 100644 index 000000000000..b90c5a75f0db --- /dev/null +++ b/crates/wasi-io/src/bindings.rs @@ -0,0 +1,29 @@ +wasmtime::component::bindgen!({ + path: "wit", + trappable_imports: true, + with: { + "wasi:io/poll/pollable": crate::poll::DynPollable, + "wasi:io/streams/input-stream": crate::streams::DynInputStream, + "wasi:io/streams/output-stream": crate::streams::DynOutputStream, + "wasi:io/error/error": crate::streams::Error, + }, + async: { + only_imports: [ + "poll", + "[method]pollable.block", + "[method]pollable.ready", + "[method]input-stream.blocking-read", + "[method]input-stream.blocking-skip", + "[drop]input-stream", + "[method]output-stream.blocking-splice", + "[method]output-stream.blocking-flush", + "[method]output-stream.blocking-write", + "[method]output-stream.blocking-write-and-flush", + "[method]output-stream.blocking-write-zeroes-and-flush", + "[drop]output-stream", + ] + }, + trappable_error_type: { + "wasi:io/streams/stream-error" => crate::streams::StreamError, + } +}); diff --git a/crates/wasi-io/src/impls.rs b/crates/wasi-io/src/impls.rs new file mode 100644 index 000000000000..374f0f11f816 --- /dev/null +++ b/crates/wasi-io/src/impls.rs @@ -0,0 +1,297 @@ +use crate::bindings::wasi::io::{error, poll, streams}; +use crate::poll::{subscribe, DynFuture, DynPollable, MakeFuture}; +use crate::streams::{DynInputStream, DynOutputStream, StreamError, StreamResult}; +use crate::{IoImpl, IoView}; +use alloc::collections::BTreeMap; +use alloc::string::String; +use alloc::vec::Vec; +use anyhow::{anyhow, Result}; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use wasmtime::component::Resource; + +impl poll::Host for IoImpl { + async fn poll(&mut self, pollables: Vec>) -> Result> { + type ReadylistIndex = u32; + + if pollables.is_empty() { + return Err(anyhow!("empty poll list")); + } + + let table = self.table(); + + let mut table_futures: BTreeMap)> = BTreeMap::new(); + + for (ix, p) in pollables.iter().enumerate() { + let ix: u32 = ix.try_into()?; + + let pollable = table.get(p)?; + let (_, list) = table_futures + .entry(pollable.index) + .or_insert((pollable.make_future, Vec::new())); + list.push(ix); + } + + let mut futures: Vec<(DynFuture<'_>, Vec)> = Vec::new(); + for (entry, (make_future, readylist_indices)) in table.iter_entries(table_futures) { + let entry = entry?; + futures.push((make_future(entry), readylist_indices)); + } + + struct PollList<'a> { + futures: Vec<(DynFuture<'a>, Vec)>, + } + impl<'a> Future for PollList<'a> { + type Output = Vec; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut any_ready = false; + let mut results = Vec::new(); + for (fut, readylist_indicies) in self.futures.iter_mut() { + match fut.as_mut().poll(cx) { + Poll::Ready(()) => { + results.extend_from_slice(readylist_indicies); + any_ready = true; + } + Poll::Pending => {} + } + } + if any_ready { + Poll::Ready(results) + } else { + Poll::Pending + } + } + } + + Ok(PollList { futures }.await) + } +} + +impl crate::bindings::wasi::io::poll::HostPollable for IoImpl { + async fn block(&mut self, pollable: Resource) -> Result<()> { + let table = self.table(); + let pollable = table.get(&pollable)?; + let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?); + ready.await; + Ok(()) + } + async fn ready(&mut self, pollable: Resource) -> Result { + let table = self.table(); + let pollable = table.get(&pollable)?; + let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?); + futures::pin_mut!(ready); + Ok(matches!( + futures::future::poll_immediate(ready).await, + Some(()) + )) + } + fn drop(&mut self, pollable: Resource) -> Result<()> { + let pollable = self.table().delete(pollable)?; + if let Some(delete) = pollable.remove_index_on_delete { + delete(self.table(), pollable.index)?; + } + Ok(()) + } +} + +impl error::Host for IoImpl {} + +impl streams::Host for IoImpl { + fn convert_stream_error(&mut self, err: StreamError) -> Result { + match err { + StreamError::Closed => Ok(streams::StreamError::Closed), + StreamError::LastOperationFailed(e) => Ok(streams::StreamError::LastOperationFailed( + self.table().push(e)?, + )), + StreamError::Trap(e) => Err(e), + } + } +} + +impl error::HostError for IoImpl { + fn drop(&mut self, err: Resource) -> Result<()> { + self.table().delete(err)?; + Ok(()) + } + + fn to_debug_string(&mut self, err: Resource) -> Result { + Ok(alloc::format!("{:?}", self.table().get(&err)?)) + } +} + +impl streams::HostOutputStream for IoImpl { + async fn drop(&mut self, stream: Resource) -> Result<()> { + self.table().delete(stream)?.cancel().await; + Ok(()) + } + + fn check_write(&mut self, stream: Resource) -> StreamResult { + let bytes = self.table().get_mut(&stream)?.check_write()?; + Ok(bytes as u64) + } + + fn write(&mut self, stream: Resource, bytes: Vec) -> StreamResult<()> { + self.table().get_mut(&stream)?.write(bytes.into())?; + Ok(()) + } + + fn subscribe(&mut self, stream: Resource) -> Result> { + subscribe(self.table(), stream) + } + + async fn blocking_write_and_flush( + &mut self, + stream: Resource, + bytes: Vec, + ) -> StreamResult<()> { + if bytes.len() > 4096 { + return Err(StreamError::trap( + "Buffer too large for blocking-write-and-flush (expected at most 4096)", + )); + } + + self.table() + .get_mut(&stream)? + .blocking_write_and_flush(bytes.into()) + .await + } + + async fn blocking_write_zeroes_and_flush( + &mut self, + stream: Resource, + len: u64, + ) -> StreamResult<()> { + if len > 4096 { + return Err(StreamError::trap( + "Buffer too large for blocking-write-zeroes-and-flush (expected at most 4096)", + )); + } + + self.table() + .get_mut(&stream)? + .blocking_write_zeroes_and_flush(len as usize) + .await + } + + fn write_zeroes(&mut self, stream: Resource, len: u64) -> StreamResult<()> { + self.table().get_mut(&stream)?.write_zeroes(len as usize)?; + Ok(()) + } + + fn flush(&mut self, stream: Resource) -> StreamResult<()> { + self.table().get_mut(&stream)?.flush()?; + Ok(()) + } + + async fn blocking_flush(&mut self, stream: Resource) -> StreamResult<()> { + let s = self.table().get_mut(&stream)?; + s.flush()?; + s.write_ready().await?; + Ok(()) + } + + fn splice( + &mut self, + dest: Resource, + src: Resource, + len: u64, + ) -> StreamResult { + let len = len.try_into().unwrap_or(usize::MAX); + + let permit = { + let output = self.table().get_mut(&dest)?; + output.check_write()? + }; + let len = len.min(permit); + if len == 0 { + return Ok(0); + } + + let contents = self.table().get_mut(&src)?.read(len)?; + + let len = contents.len(); + if len == 0 { + return Ok(0); + } + + let output = self.table().get_mut(&dest)?; + output.write(contents)?; + Ok(len.try_into().expect("usize can fit in u64")) + } + + async fn blocking_splice( + &mut self, + dest: Resource, + src: Resource, + len: u64, + ) -> StreamResult { + let len = len.try_into().unwrap_or(usize::MAX); + + let permit = { + let output = self.table().get_mut(&dest)?; + output.write_ready().await? + }; + let len = len.min(permit); + if len == 0 { + return Ok(0); + } + + let contents = self.table().get_mut(&src)?.blocking_read(len).await?; + + let len = contents.len(); + if len == 0 { + return Ok(0); + } + + let output = self.table().get_mut(&dest)?; + output.blocking_write_and_flush(contents).await?; + Ok(len.try_into().expect("usize can fit in u64")) + } +} + +impl streams::HostInputStream for IoImpl { + async fn drop(&mut self, stream: Resource) -> Result<()> { + self.table().delete(stream)?.cancel().await; + Ok(()) + } + + fn read(&mut self, stream: Resource, len: u64) -> StreamResult> { + let len = len.try_into().unwrap_or(usize::MAX); + let bytes = self.table().get_mut(&stream)?.read(len)?; + debug_assert!(bytes.len() <= len); + Ok(bytes.into()) + } + + async fn blocking_read( + &mut self, + stream: Resource, + len: u64, + ) -> StreamResult> { + let len = len.try_into().unwrap_or(usize::MAX); + let bytes = self.table().get_mut(&stream)?.blocking_read(len).await?; + debug_assert!(bytes.len() <= len); + Ok(bytes.into()) + } + + fn skip(&mut self, stream: Resource, len: u64) -> StreamResult { + let len = len.try_into().unwrap_or(usize::MAX); + let written = self.table().get_mut(&stream)?.skip(len)?; + Ok(written.try_into().expect("usize always fits in u64")) + } + + async fn blocking_skip( + &mut self, + stream: Resource, + len: u64, + ) -> StreamResult { + let len = len.try_into().unwrap_or(usize::MAX); + let written = self.table().get_mut(&stream)?.blocking_skip(len).await?; + Ok(written.try_into().expect("usize always fits in u64")) + } + + fn subscribe(&mut self, stream: Resource) -> Result> { + crate::poll::subscribe(self.table(), stream) + } +} diff --git a/crates/wasi-io/src/lib.rs b/crates/wasi-io/src/lib.rs new file mode 100644 index 000000000000..5ffae0025727 --- /dev/null +++ b/crates/wasi-io/src/lib.rs @@ -0,0 +1,183 @@ +//! # Wasmtime's wasi-io Implementation +//! +//! This crate provides a Wasmtime host implementation of the WASI 0.2 (aka +//! WASIp2 aka Preview 2) wasi-io package. The host implementation is +//! abstract: it is exposed as a set of traits which other crates provide +//! impls of. +//! +//! The wasi-io package is the foundation which defines how WASI programs +//! interact with the scheduler. It provides the `pollable`, `input-stream`, +//! and `output-stream` Component Model resources, which other packages +//! (including wasi-filesystem, wasi-sockets, wasi-cli, and wasi-http) +//! expose as the standard way to wait for readiness, and asynchronously read +//! and write to streams. +//! +//! This crate is designed to have no unnecessary dependencies and, in +//! particular, to be #![no_std]. + +#![no_std] + +extern crate alloc; +#[cfg(feature = "std")] +#[macro_use] +extern crate std; + +pub mod bindings; +mod impls; +pub mod poll; +pub mod streams; + +#[doc(no_inline)] +pub use async_trait::async_trait; + +use alloc::boxed::Box; +use wasmtime::component::ResourceTable; + +/// A trait which provides access to the [`ResourceTable`] inside the +/// embedder's `T` of [`Store`][`Store`]. +/// +/// This crate's WASI Host implementations depend on the contents of +/// [`ResourceTable`]. The `T` type [`Store`][`Store`] is defined in each +/// embedding of Wasmtime. These implementations is connected to the +/// [`Linker`][`Linker`] by the +/// [`add_to_linker_async`] function. +/// +/// # Example +/// +/// ``` +/// use wasmtime::{Config, Engine}; +/// use wasmtime::component::{ResourceTable, Linker}; +/// use wasmtime_wasi_io::{IoView, add_to_linker_async}; +/// +/// struct MyState { +/// table: ResourceTable, +/// } +/// +/// impl IoView for MyState { +/// fn table(&mut self) -> &mut ResourceTable { &mut self.table } +/// } +/// let mut config = Config::new(); +/// config.async_support(true); +/// let engine = Engine::new(&config).unwrap(); +/// let mut linker: Linker = Linker::new(&engine); +/// add_to_linker_async(&mut linker).unwrap(); +/// ``` +/// [`Store`]: wasmtime::Store +/// [`Linker`]: wasmtime::component::Linker +/// [`ResourceTable`]: wasmtime::component::ResourceTable +/// +pub trait IoView: Send { + /// Yields mutable access to the internal resource management that this + /// context contains. + /// + /// Embedders can add custom resources to this table as well to give + /// resources to wasm as well. + fn table(&mut self) -> &mut ResourceTable; +} +impl IoView for &mut T { + fn table(&mut self) -> &mut ResourceTable { + T::table(self) + } +} +impl IoView for Box { + fn table(&mut self) -> &mut ResourceTable { + T::table(self) + } +} + +/// A small newtype wrapper which serves as the basis for implementations of +/// `Host` WASI traits in this crate. +/// +/// This type is used as the basis for the implementation of all `Host` traits +/// generated by `bindgen!` for WASI interfaces. +/// +/// You don't need to use this type if you are using the root +/// [`add_to_linker_async`] in this crate. +/// +/// If you're calling the `add_to_linker` functions generated by `bindgen!` +/// from the [`bindings` module](crate::bindings), you'll want to create a +/// value of this type in the closures added to a [`Linker`]. +/// +/// [`Linker`]: wasmtime::component::Linker +#[repr(transparent)] +pub struct IoImpl(pub T); +impl IoView for IoImpl { + fn table(&mut self) -> &mut ResourceTable { + T::table(&mut self.0) + } +} + +/// Add the wasi-io host implementation from this crate into the `linker` +/// provided. +/// +/// This function will add the `async` variant of all interfaces into the +/// [`Linker`] provided. By `async` this means that this function is only +/// compatible with [`Config::async_support(true)`][async]. For embeddings +/// with async support disabled, you'll need to use other crates, such as the +/// [`wasmtime-wasi`] crate, which provides an [`add_to_linker_sync`] that +/// includes an appropriate wasi-io implementation based on this crate's. +/// +/// This function will add all interfaces implemented by this crate to the +/// [`Linker`], which corresponds to the `wasi:io/imports` world supported by +/// this crate. +/// +/// [async]: wasmtime::Config::async_support +/// [`Linker`]: wasmtime::component::Linker +/// [`wasmtime-wasi`]: https://crates.io/crates/wasmtime-wasi +/// [`add_to_linker_sync`]: https://docs.rs/wasmtime-wasi/latest/wasmtime_wasi/fn.add_to_linker_sync.html +/// +/// +/// # Example +/// +/// ``` +/// use wasmtime::{Engine, Result, Store, Config}; +/// use wasmtime::component::{ResourceTable, Linker}; +/// use wasmtime_wasi_io::IoView; +/// +/// fn main() -> Result<()> { +/// let mut config = Config::new(); +/// config.async_support(true); +/// let engine = Engine::new(&config)?; +/// +/// let mut linker = Linker::::new(&engine); +/// wasmtime_wasi_io::add_to_linker_async(&mut linker)?; +/// // ... add any further functionality to `linker` if desired ... +/// +/// let mut store = Store::new( +/// &engine, +/// MyState { +/// table: ResourceTable::new(), +/// }, +/// ); +/// +/// // ... use `linker` to instantiate within `store` ... +/// +/// Ok(()) +/// } +/// +/// struct MyState { +/// table: ResourceTable, +/// } +/// +/// impl IoView for MyState { +/// fn table(&mut self) -> &mut ResourceTable { &mut self.table } +/// } +/// ``` +pub fn add_to_linker_async( + l: &mut wasmtime::component::Linker, +) -> wasmtime::Result<()> { + let closure = io_type_annotate::(|t| IoImpl(t)); + crate::bindings::wasi::io::error::add_to_linker_get_host(l, closure)?; + crate::bindings::wasi::io::poll::add_to_linker_get_host(l, closure)?; + crate::bindings::wasi::io::streams::add_to_linker_get_host(l, closure)?; + Ok(()) +} + +// NB: workaround some rustc inference - a future refactoring may make this +// obsolete. +fn io_type_annotate(val: F) -> F +where + F: Fn(&mut T) -> IoImpl<&mut T>, +{ + val +} diff --git a/crates/wasi-io/src/poll.rs b/crates/wasi-io/src/poll.rs new file mode 100644 index 000000000000..6322038bbf02 --- /dev/null +++ b/crates/wasi-io/src/poll.rs @@ -0,0 +1,120 @@ +use alloc::boxed::Box; +use anyhow::Result; +use core::any::Any; +use core::future::Future; +use core::pin::Pin; +use wasmtime::component::{Resource, ResourceTable}; + +pub type DynFuture<'a> = Pin + Send + 'a>>; +pub type MakeFuture = for<'a> fn(&'a mut dyn Any) -> DynFuture<'a>; + +/// The host representation of the `wasi:io/poll.pollable` resource. +/// +/// A pollable is not the same thing as a Rust Future: the same pollable may be used to +/// repeatedly check for readiness of a given condition, e.g. if a stream is readable +/// or writable. So, rather than containing a Future, which can only become Ready once, a +/// `DynPollable` contains a way to create a Future in each call to `poll`. +pub struct DynPollable { + pub(crate) index: u32, + pub(crate) make_future: MakeFuture, + pub(crate) remove_index_on_delete: Option Result<()>>, +} + +/// The trait used to implement [`DynPollable`] to create a `pollable` +/// resource in `wasi:io/poll`. +/// +/// This trait is the internal implementation detail of any pollable resource in +/// this crate's implementation of WASI. The `ready` function is an `async fn` +/// which resolves when the implementation is ready. Using native `async` Rust +/// enables this type's readiness to compose with other types' readiness +/// throughout the WASI implementation. +/// +/// This trait is used in conjunction with [`subscribe`] to create a `pollable` +/// resource. +/// +/// # Example +/// +/// This is a simple example of creating a `Pollable` resource from a few +/// parameters. +/// +/// ``` +/// # // stub out so we don't need a dep to build the doctests: +/// # mod tokio { pub mod time { pub use std::time::{Duration, Instant}; pub async fn sleep_until(_: +/// Instant) {} } } +/// use tokio::time::{self, Duration, Instant}; +/// use wasmtime_wasi_io::{IoView, poll::{Pollable, subscribe, DynPollable}, async_trait}; +/// use wasmtime::component::Resource; +/// use wasmtime::Result; +/// +/// fn sleep(cx: &mut dyn IoView, dur: Duration) -> Result> { +/// let end = Instant::now() + dur; +/// let sleep = MySleep { end }; +/// let sleep_resource = cx.table().push(sleep)?; +/// subscribe(cx.table(), sleep_resource) +/// } +/// +/// struct MySleep { +/// end: Instant, +/// } +/// +/// #[async_trait] +/// impl Pollable for MySleep { +/// async fn ready(&mut self) { +/// tokio::time::sleep_until(self.end).await; +/// } +/// } +/// ``` +#[async_trait::async_trait] +pub trait Pollable: Send + 'static { + /// An asynchronous function which resolves when this object's readiness + /// operation is ready. + /// + /// This function is invoked as part of `poll` in `wasi:io/poll`. The + /// meaning of when this function Returns depends on what object this + /// [`Pollable`] is attached to. When the returned future resolves then the + /// corresponding call to `wasi:io/poll` will return. + /// + /// Note that this method does not return an error. Returning an error + /// should be done through accessors on the object that this `pollable` is + /// connected to. The call to `wasi:io/poll` itself does not return errors, + /// only a list of ready objects. + async fn ready(&mut self); +} + +/// Creates a `wasi:io/poll/pollable` resource which is subscribed to the provided +/// `resource`. +/// +/// If `resource` is an owned resource then it will be deleted when the returned +/// resource is deleted. Otherwise the returned resource is considered a "child" +/// of the given `resource` which means that the given resource cannot be +/// deleted while the `pollable` is still alive. +pub fn subscribe( + table: &mut ResourceTable, + resource: Resource, +) -> Result> +where + T: Pollable, +{ + fn make_future<'a, T>(stream: &'a mut dyn Any) -> DynFuture<'a> + where + T: Pollable, + { + stream.downcast_mut::().unwrap().ready() + } + + let pollable = DynPollable { + index: resource.rep(), + remove_index_on_delete: if resource.owned() { + Some(|table, idx| { + let resource = Resource::::new_own(idx); + table.delete(resource)?; + Ok(()) + }) + } else { + None + }, + make_future: make_future::, + }; + + Ok(table.push_child(pollable, &resource)?) +} diff --git a/crates/wasi/src/stream.rs b/crates/wasi-io/src/streams.rs similarity index 94% rename from crates/wasi/src/stream.rs rename to crates/wasi-io/src/streams.rs index 251133cdac66..8f2392697c7e 100644 --- a/crates/wasi/src/stream.rs +++ b/crates/wasi-io/src/streams.rs @@ -1,11 +1,12 @@ -use crate::poll::Subscribe; +use crate::poll::Pollable; +use alloc::boxed::Box; use anyhow::Result; use bytes::Bytes; /// Host trait for implementing the `wasi:io/streams.input-stream` resource: A /// bytestream which can be read from. #[async_trait::async_trait] -pub trait HostInputStream: Subscribe { +pub trait InputStream: Pollable { /// Reads up to `size` bytes, returning a buffer holding these bytes on /// success. /// @@ -69,8 +70,8 @@ impl StreamError { } } -impl std::fmt::Display for StreamError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl alloc::fmt::Display for StreamError { + fn fmt(&self, f: &mut alloc::fmt::Formatter<'_>) -> alloc::fmt::Result { match self { StreamError::Closed => write!(f, "closed"), StreamError::LastOperationFailed(e) => write!(f, "last operation failed: {e}"), @@ -79,8 +80,8 @@ impl std::fmt::Display for StreamError { } } -impl std::error::Error for StreamError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { +impl core::error::Error for StreamError { + fn source(&self) -> Option<&(dyn core::error::Error + 'static)> { match self { StreamError::Closed => None, StreamError::LastOperationFailed(e) | StreamError::Trap(e) => e.source(), @@ -97,7 +98,7 @@ impl From for StreamError { /// Host trait for implementing the `wasi:io/streams.output-stream` resource: /// A bytestream which can be written to. #[async_trait::async_trait] -pub trait HostOutputStream: Subscribe { +pub trait OutputStream: Pollable { /// Write bytes after obtaining a permit to write those bytes /// /// Prior to calling [`write`](Self::write) the caller must call @@ -247,19 +248,19 @@ pub trait HostOutputStream: Subscribe { } #[async_trait::async_trait] -impl Subscribe for Box { +impl Pollable for Box { async fn ready(&mut self) { (**self).ready().await } } #[async_trait::async_trait] -impl Subscribe for Box { +impl Pollable for Box { async fn ready(&mut self) { (**self).ready().await } } -pub type InputStream = Box; +pub type DynInputStream = Box; -pub type OutputStream = Box; +pub type DynOutputStream = Box; diff --git a/crates/wasi-io/wit/deps/io/error.wit b/crates/wasi-io/wit/deps/io/error.wit new file mode 100644 index 000000000000..97c6068779ac --- /dev/null +++ b/crates/wasi-io/wit/deps/io/error.wit @@ -0,0 +1,34 @@ +package wasi:io@0.2.3; + +@since(version = 0.2.0) +interface error { + /// A resource which represents some error information. + /// + /// The only method provided by this resource is `to-debug-string`, + /// which provides some human-readable information about the error. + /// + /// In the `wasi:io` package, this resource is returned through the + /// `wasi:io/streams/stream-error` type. + /// + /// To provide more specific error information, other interfaces may + /// offer functions to "downcast" this error into more specific types. For example, + /// errors returned from streams derived from filesystem types can be described using + /// the filesystem's own error-code type. This is done using the function + /// `wasi:filesystem/types/filesystem-error-code`, which takes a `borrow` + /// parameter and returns an `option`. + /// + /// The set of functions which can "downcast" an `error` into a more + /// concrete type is open. + @since(version = 0.2.0) + resource error { + /// Returns a string that is suitable to assist humans in debugging + /// this error. + /// + /// WARNING: The returned string should not be consumed mechanically! + /// It may change across platforms, hosts, or other implementation + /// details. Parsing this string is a major platform-compatibility + /// hazard. + @since(version = 0.2.0) + to-debug-string: func() -> string; + } +} diff --git a/crates/wasi-io/wit/deps/io/poll.wit b/crates/wasi-io/wit/deps/io/poll.wit new file mode 100644 index 000000000000..9bcbe8e03692 --- /dev/null +++ b/crates/wasi-io/wit/deps/io/poll.wit @@ -0,0 +1,47 @@ +package wasi:io@0.2.3; + +/// A poll API intended to let users wait for I/O events on multiple handles +/// at once. +@since(version = 0.2.0) +interface poll { + /// `pollable` represents a single I/O event which may be ready, or not. + @since(version = 0.2.0) + resource pollable { + + /// Return the readiness of a pollable. This function never blocks. + /// + /// Returns `true` when the pollable is ready, and `false` otherwise. + @since(version = 0.2.0) + ready: func() -> bool; + + /// `block` returns immediately if the pollable is ready, and otherwise + /// blocks until ready. + /// + /// This function is equivalent to calling `poll.poll` on a list + /// containing only this pollable. + @since(version = 0.2.0) + block: func(); + } + + /// Poll for completion on a set of pollables. + /// + /// This function takes a list of pollables, which identify I/O sources of + /// interest, and waits until one or more of the events is ready for I/O. + /// + /// The result `list` contains one or more indices of handles in the + /// argument list that is ready for I/O. + /// + /// This function traps if either: + /// - the list is empty, or: + /// - the list contains more elements than can be indexed with a `u32` value. + /// + /// A timeout can be implemented by adding a pollable from the + /// wasi-clocks API to the list. + /// + /// This function does not return a `result`; polling in itself does not + /// do any I/O so it doesn't fail. If any of the I/O sources identified by + /// the pollables has an error, it is indicated by marking the source as + /// being ready for I/O. + @since(version = 0.2.0) + poll: func(in: list>) -> list; +} diff --git a/crates/wasi-io/wit/deps/io/streams.wit b/crates/wasi-io/wit/deps/io/streams.wit new file mode 100644 index 000000000000..0de0846293ff --- /dev/null +++ b/crates/wasi-io/wit/deps/io/streams.wit @@ -0,0 +1,290 @@ +package wasi:io@0.2.3; + +/// WASI I/O is an I/O abstraction API which is currently focused on providing +/// stream types. +/// +/// In the future, the component model is expected to add built-in stream types; +/// when it does, they are expected to subsume this API. +@since(version = 0.2.0) +interface streams { + @since(version = 0.2.0) + use error.{error}; + @since(version = 0.2.0) + use poll.{pollable}; + + /// An error for input-stream and output-stream operations. + @since(version = 0.2.0) + variant stream-error { + /// The last operation (a write or flush) failed before completion. + /// + /// More information is available in the `error` payload. + /// + /// After this, the stream will be closed. All future operations return + /// `stream-error::closed`. + last-operation-failed(error), + /// The stream is closed: no more input will be accepted by the + /// stream. A closed output-stream will return this error on all + /// future operations. + closed + } + + /// An input bytestream. + /// + /// `input-stream`s are *non-blocking* to the extent practical on underlying + /// platforms. I/O operations always return promptly; if fewer bytes are + /// promptly available than requested, they return the number of bytes promptly + /// available, which could even be zero. To wait for data to be available, + /// use the `subscribe` function to obtain a `pollable` which can be polled + /// for using `wasi:io/poll`. + @since(version = 0.2.0) + resource input-stream { + /// Perform a non-blocking read from the stream. + /// + /// When the source of a `read` is binary data, the bytes from the source + /// are returned verbatim. When the source of a `read` is known to the + /// implementation to be text, bytes containing the UTF-8 encoding of the + /// text are returned. + /// + /// This function returns a list of bytes containing the read data, + /// when successful. The returned list will contain up to `len` bytes; + /// it may return fewer than requested, but not more. The list is + /// empty when no bytes are available for reading at this time. The + /// pollable given by `subscribe` will be ready when more bytes are + /// available. + /// + /// This function fails with a `stream-error` when the operation + /// encounters an error, giving `last-operation-failed`, or when the + /// stream is closed, giving `closed`. + /// + /// When the caller gives a `len` of 0, it represents a request to + /// read 0 bytes. If the stream is still open, this call should + /// succeed and return an empty list, or otherwise fail with `closed`. + /// + /// The `len` parameter is a `u64`, which could represent a list of u8 which + /// is not possible to allocate in wasm32, or not desirable to allocate as + /// as a return value by the callee. The callee may return a list of bytes + /// less than `len` in size while more bytes are available for reading. + @since(version = 0.2.0) + read: func( + /// The maximum number of bytes to read + len: u64 + ) -> result, stream-error>; + + /// Read bytes from a stream, after blocking until at least one byte can + /// be read. Except for blocking, behavior is identical to `read`. + @since(version = 0.2.0) + blocking-read: func( + /// The maximum number of bytes to read + len: u64 + ) -> result, stream-error>; + + /// Skip bytes from a stream. Returns number of bytes skipped. + /// + /// Behaves identical to `read`, except instead of returning a list + /// of bytes, returns the number of bytes consumed from the stream. + @since(version = 0.2.0) + skip: func( + /// The maximum number of bytes to skip. + len: u64, + ) -> result; + + /// Skip bytes from a stream, after blocking until at least one byte + /// can be skipped. Except for blocking behavior, identical to `skip`. + @since(version = 0.2.0) + blocking-skip: func( + /// The maximum number of bytes to skip. + len: u64, + ) -> result; + + /// Create a `pollable` which will resolve once either the specified stream + /// has bytes available to read or the other end of the stream has been + /// closed. + /// The created `pollable` is a child resource of the `input-stream`. + /// Implementations may trap if the `input-stream` is dropped before + /// all derived `pollable`s created with this function are dropped. + @since(version = 0.2.0) + subscribe: func() -> pollable; + } + + + /// An output bytestream. + /// + /// `output-stream`s are *non-blocking* to the extent practical on + /// underlying platforms. Except where specified otherwise, I/O operations also + /// always return promptly, after the number of bytes that can be written + /// promptly, which could even be zero. To wait for the stream to be ready to + /// accept data, the `subscribe` function to obtain a `pollable` which can be + /// polled for using `wasi:io/poll`. + /// + /// Dropping an `output-stream` while there's still an active write in + /// progress may result in the data being lost. Before dropping the stream, + /// be sure to fully flush your writes. + @since(version = 0.2.0) + resource output-stream { + /// Check readiness for writing. This function never blocks. + /// + /// Returns the number of bytes permitted for the next call to `write`, + /// or an error. Calling `write` with more bytes than this function has + /// permitted will trap. + /// + /// When this function returns 0 bytes, the `subscribe` pollable will + /// become ready when this function will report at least 1 byte, or an + /// error. + @since(version = 0.2.0) + check-write: func() -> result; + + /// Perform a write. This function never blocks. + /// + /// When the destination of a `write` is binary data, the bytes from + /// `contents` are written verbatim. When the destination of a `write` is + /// known to the implementation to be text, the bytes of `contents` are + /// transcoded from UTF-8 into the encoding of the destination and then + /// written. + /// + /// Precondition: check-write gave permit of Ok(n) and contents has a + /// length of less than or equal to n. Otherwise, this function will trap. + /// + /// returns Err(closed) without writing if the stream has closed since + /// the last call to check-write provided a permit. + @since(version = 0.2.0) + write: func( + contents: list + ) -> result<_, stream-error>; + + /// Perform a write of up to 4096 bytes, and then flush the stream. Block + /// until all of these operations are complete, or an error occurs. + /// + /// This is a convenience wrapper around the use of `check-write`, + /// `subscribe`, `write`, and `flush`, and is implemented with the + /// following pseudo-code: + /// + /// ```text + /// let pollable = this.subscribe(); + /// while !contents.is_empty() { + /// // Wait for the stream to become writable + /// pollable.block(); + /// let Ok(n) = this.check-write(); // eliding error handling + /// let len = min(n, contents.len()); + /// let (chunk, rest) = contents.split_at(len); + /// this.write(chunk ); // eliding error handling + /// contents = rest; + /// } + /// this.flush(); + /// // Wait for completion of `flush` + /// pollable.block(); + /// // Check for any errors that arose during `flush` + /// let _ = this.check-write(); // eliding error handling + /// ``` + @since(version = 0.2.0) + blocking-write-and-flush: func( + contents: list + ) -> result<_, stream-error>; + + /// Request to flush buffered output. This function never blocks. + /// + /// This tells the output-stream that the caller intends any buffered + /// output to be flushed. the output which is expected to be flushed + /// is all that has been passed to `write` prior to this call. + /// + /// Upon calling this function, the `output-stream` will not accept any + /// writes (`check-write` will return `ok(0)`) until the flush has + /// completed. The `subscribe` pollable will become ready when the + /// flush has completed and the stream can accept more writes. + @since(version = 0.2.0) + flush: func() -> result<_, stream-error>; + + /// Request to flush buffered output, and block until flush completes + /// and stream is ready for writing again. + @since(version = 0.2.0) + blocking-flush: func() -> result<_, stream-error>; + + /// Create a `pollable` which will resolve once the output-stream + /// is ready for more writing, or an error has occurred. When this + /// pollable is ready, `check-write` will return `ok(n)` with n>0, or an + /// error. + /// + /// If the stream is closed, this pollable is always ready immediately. + /// + /// The created `pollable` is a child resource of the `output-stream`. + /// Implementations may trap if the `output-stream` is dropped before + /// all derived `pollable`s created with this function are dropped. + @since(version = 0.2.0) + subscribe: func() -> pollable; + + /// Write zeroes to a stream. + /// + /// This should be used precisely like `write` with the exact same + /// preconditions (must use check-write first), but instead of + /// passing a list of bytes, you simply pass the number of zero-bytes + /// that should be written. + @since(version = 0.2.0) + write-zeroes: func( + /// The number of zero-bytes to write + len: u64 + ) -> result<_, stream-error>; + + /// Perform a write of up to 4096 zeroes, and then flush the stream. + /// Block until all of these operations are complete, or an error + /// occurs. + /// + /// This is a convenience wrapper around the use of `check-write`, + /// `subscribe`, `write-zeroes`, and `flush`, and is implemented with + /// the following pseudo-code: + /// + /// ```text + /// let pollable = this.subscribe(); + /// while num_zeroes != 0 { + /// // Wait for the stream to become writable + /// pollable.block(); + /// let Ok(n) = this.check-write(); // eliding error handling + /// let len = min(n, num_zeroes); + /// this.write-zeroes(len); // eliding error handling + /// num_zeroes -= len; + /// } + /// this.flush(); + /// // Wait for completion of `flush` + /// pollable.block(); + /// // Check for any errors that arose during `flush` + /// let _ = this.check-write(); // eliding error handling + /// ``` + @since(version = 0.2.0) + blocking-write-zeroes-and-flush: func( + /// The number of zero-bytes to write + len: u64 + ) -> result<_, stream-error>; + + /// Read from one stream and write to another. + /// + /// The behavior of splice is equivalent to: + /// 1. calling `check-write` on the `output-stream` + /// 2. calling `read` on the `input-stream` with the smaller of the + /// `check-write` permitted length and the `len` provided to `splice` + /// 3. calling `write` on the `output-stream` with that read data. + /// + /// Any error reported by the call to `check-write`, `read`, or + /// `write` ends the splice and reports that error. + /// + /// This function returns the number of bytes transferred; it may be less + /// than `len`. + @since(version = 0.2.0) + splice: func( + /// The stream to read from + src: borrow, + /// The number of bytes to splice + len: u64, + ) -> result; + + /// Read from one stream and write to another, with blocking. + /// + /// This is similar to `splice`, except that it blocks until the + /// `output-stream` is ready for writing, and the `input-stream` + /// is ready for reading, before performing the `splice`. + @since(version = 0.2.0) + blocking-splice: func( + /// The stream to read from + src: borrow, + /// The number of bytes to splice + len: u64, + ) -> result; + } +} diff --git a/crates/wasi-io/wit/deps/io/world.wit b/crates/wasi-io/wit/deps/io/world.wit new file mode 100644 index 000000000000..f1d2102dca1d --- /dev/null +++ b/crates/wasi-io/wit/deps/io/world.wit @@ -0,0 +1,10 @@ +package wasi:io@0.2.3; + +@since(version = 0.2.0) +world imports { + @since(version = 0.2.0) + import streams; + + @since(version = 0.2.0) + import poll; +} diff --git a/crates/wasi-io/wit/world.wit b/crates/wasi-io/wit/world.wit new file mode 100644 index 000000000000..258bac64c4fa --- /dev/null +++ b/crates/wasi-io/wit/world.wit @@ -0,0 +1,6 @@ +// We actually don't use this; it's just to let bindgen! find the corresponding world in wit/deps. +package wasmtime:wasi-io; + +world bindings { + include wasi:io/imports@0.2.3; +} diff --git a/crates/wasi/Cargo.toml b/crates/wasi/Cargo.toml index db02061ce315..e18afd81c2c0 100644 --- a/crates/wasi/Cargo.toml +++ b/crates/wasi/Cargo.toml @@ -17,6 +17,7 @@ workspace = true [dependencies] wasmtime = { workspace = true, features = ["component-model", "async", "runtime", "std"] } +wasmtime-wasi-io = { workspace = true, features = ["std"] } anyhow = { workspace = true } wiggle = { workspace = true, optional = true, features = ["wasmtime"] } tokio = { workspace = true, features = ["time", "sync", "io-std", "io-util", "rt", "rt-multi-thread", "net"] } diff --git a/crates/wasi/src/bindings.rs b/crates/wasi/src/bindings.rs index 7449b3a3f202..5f117cddf3e2 100644 --- a/crates/wasi/src/bindings.rs +++ b/crates/wasi/src/bindings.rs @@ -146,7 +146,8 @@ /// ``` pub mod sync { mod generated { - use crate::{FsError, SocketError, StreamError}; + use crate::{FsError, SocketError}; + use wasmtime_wasi_io::streams::StreamError; wasmtime::component::bindgen!({ path: "wit", @@ -159,12 +160,11 @@ pub mod sync { }, trappable_imports: true, with: { - // These interfaces come from the outer module, as it's - // sync/async agnostic. + // These interfaces contain only synchronous methods, so they + // can be aliased directly "wasi:clocks": crate::bindings::clocks, "wasi:random": crate::bindings::random, "wasi:cli": crate::bindings::cli, - "wasi:io/error": crate::bindings::io::error, "wasi:filesystem/preopens": crate::bindings::filesystem::preopens, "wasi:sockets/network": crate::bindings::sockets::network, @@ -173,13 +173,19 @@ pub mod sync { // way everything has the same type. "wasi:filesystem/types/descriptor": super::super::filesystem::types::Descriptor, "wasi:filesystem/types/directory-entry-stream": super::super::filesystem::types::DirectoryEntryStream, - "wasi:io/poll/pollable": super::super::io::poll::Pollable, - "wasi:io/streams/input-stream": super::super::io::streams::InputStream, - "wasi:io/streams/output-stream": super::super::io::streams::OutputStream, "wasi:sockets/tcp/tcp-socket": super::super::sockets::tcp::TcpSocket, "wasi:sockets/udp/incoming-datagram-stream": super::super::sockets::udp::IncomingDatagramStream, "wasi:sockets/udp/outgoing-datagram-stream": super::super::sockets::udp::OutgoingDatagramStream, "wasi:sockets/udp/udp-socket": super::super::sockets::udp::UdpSocket, + + // Error host trait from wasmtime-wasi-io is synchronous, so we can alias it + "wasi:io/error": wasmtime_wasi_io::bindings::wasi::io::error, + // Configure the resource types from wasmtime-wasi-io, though + // this bindgen will make a new synchronous Host traits + "wasi:io/poll/pollable": wasmtime_wasi_io::poll::DynPollable, + "wasi:io/streams/input-stream": wasmtime_wasi_io::streams::DynInputStream, + "wasi:io/streams/output-stream": wasmtime_wasi_io::streams::DynOutputStream, + }, require_store_data_send: true, }); @@ -399,14 +405,20 @@ mod async_io { ], }, trappable_error_type: { - "wasi:io/streams/stream-error" => crate::StreamError, + "wasi:io/streams/stream-error" => wasmtime_wasi_io::streams::StreamError, "wasi:filesystem/types/error-code" => crate::FsError, "wasi:sockets/network/error-code" => crate::SocketError, }, with: { - // Configure all resources to be concrete types defined in this crate, - // so that way we get to use nice typed helper methods with - // `ResourceTable`. + // All interfaces in the wasi:io package should be aliased to + // the wasmtime-wasi-io generated code. Note that this will also + // map the resource types to those defined in that crate as well. + "wasi:io/poll": wasmtime_wasi_io::bindings::wasi::io::poll, + "wasi:io/streams": wasmtime_wasi_io::bindings::wasi::io::streams, + "wasi:io/error": wasmtime_wasi_io::bindings::wasi::io::error, + + // Configure all other resources to be concrete types defined in + // this crate "wasi:sockets/network/network": crate::network::Network, "wasi:sockets/tcp/tcp-socket": crate::tcp::TcpSocket, "wasi:sockets/udp/udp-socket": crate::udp::UdpSocket, @@ -415,10 +427,6 @@ mod async_io { "wasi:sockets/ip-name-lookup/resolve-address-stream": crate::ip_name_lookup::ResolveAddressStream, "wasi:filesystem/types/directory-entry-stream": crate::filesystem::ReaddirIterator, "wasi:filesystem/types/descriptor": crate::filesystem::Descriptor, - "wasi:io/streams/input-stream": crate::stream::InputStream, - "wasi:io/streams/output-stream": crate::stream::OutputStream, - "wasi:io/error/error": crate::stream::Error, - "wasi:io/poll/pollable": crate::poll::Pollable, "wasi:cli/terminal-input/terminal-input": crate::stdio::TerminalInput, "wasi:cli/terminal-output/terminal-output": crate::stdio::TerminalOutput, }, diff --git a/crates/wasi/src/ctx.rs b/crates/wasi/src/ctx.rs index b86895712a9e..a64edd1dcf50 100644 --- a/crates/wasi/src/ctx.rs +++ b/crates/wasi/src/ctx.rs @@ -524,7 +524,18 @@ impl WasiCtxBuilder { } } -/// A struct which provides access to internal WASI state. +/// Per-[`Store`] state which holds state necessary to implement WASI from this +/// crate. +/// +/// This structure is created through [`WasiCtxBuilder`] and is stored within +/// the `T` of [`Store`][`Store`]. Access to the structure is provided +/// through the [`WasiView`](crate::WasiView) trait as an implementation on `T`. +/// +/// Note that this structure itself does not have any accessors, it's here for +/// internal use within the `wasmtime-wasi` crate's implementation of +/// bindgen-generated traits. +/// +/// [`Store`]: wasmtime::Store /// /// # Example /// @@ -557,18 +568,6 @@ impl WasiCtxBuilder { /// } /// } /// ``` -/// Per-[`Store`] state which holds state necessary to implement WASI from this -/// crate. -/// -/// This structure is created through [`WasiCtxBuilder`] and is stored within -/// the `T` of [`Store`][`Store`]. Access to the structure is provided -/// through the [`WasiView`] trait as an implementation on `T`. -/// -/// Note that this structure itself does not have any accessors, it's here for -/// internal use within the `wasmtime-wasi` crate's implementation of -/// bindgen-generated traits. -/// -/// [`Store`]: wasmtime::Store pub struct WasiCtx { pub(crate) random: Box, pub(crate) insecure_random: Box, diff --git a/crates/wasi/src/filesystem.rs b/crates/wasi/src/filesystem.rs index 2c6f039de4f0..14f70892068a 100644 --- a/crates/wasi/src/filesystem.rs +++ b/crates/wasi/src/filesystem.rs @@ -1,8 +1,6 @@ use crate::bindings::filesystem::types; use crate::runtime::{spawn_blocking, AbortOnDropJoinHandle}; -use crate::{ - HostInputStream, HostOutputStream, StreamError, StreamResult, Subscribe, TrappableError, -}; +use crate::{InputStream, OutputStream, Pollable, StreamError, StreamResult, TrappableError}; use anyhow::anyhow; use bytes::{Bytes, BytesMut}; use std::io; @@ -300,7 +298,7 @@ impl FileInputStream { } } #[async_trait::async_trait] -impl HostInputStream for FileInputStream { +impl InputStream for FileInputStream { fn read(&mut self, size: usize) -> StreamResult { match &mut self.state { ReadState::Idle => { @@ -368,7 +366,7 @@ impl HostInputStream for FileInputStream { } } #[async_trait::async_trait] -impl Subscribe for FileInputStream { +impl Pollable for FileInputStream { async fn ready(&mut self) { if let ReadState::Idle = self.state { // The guest hasn't initiated any read, but is nonetheless waiting @@ -467,7 +465,7 @@ impl FileOutputStream { const FILE_WRITE_CAPACITY: usize = 1024 * 1024; #[async_trait::async_trait] -impl HostOutputStream for FileOutputStream { +impl OutputStream for FileOutputStream { fn write(&mut self, buf: Bytes) -> Result<(), StreamError> { match self.state { OutputState::Ready => {} @@ -566,7 +564,7 @@ impl HostOutputStream for FileOutputStream { } #[async_trait::async_trait] -impl Subscribe for FileOutputStream { +impl Pollable for FileOutputStream { async fn ready(&mut self) { if let OutputState::Waiting(task) = &mut self.state { self.state = match task.await { diff --git a/crates/wasi/src/host/clocks.rs b/crates/wasi/src/host/clocks.rs index feefb9473b27..fec0a6061c49 100644 --- a/crates/wasi/src/host/clocks.rs +++ b/crates/wasi/src/host/clocks.rs @@ -4,11 +4,11 @@ use crate::bindings::{ clocks::monotonic_clock::{self, Duration as WasiDuration, Instant}, clocks::wall_clock::{self, Datetime}, }; -use crate::poll::{subscribe, Subscribe}; -use crate::{IoView, Pollable, WasiImpl, WasiView}; +use crate::{DynPollable, IoView, WasiImpl, WasiView}; use cap_std::time::SystemTime; use std::time::Duration; use wasmtime::component::Resource; +use wasmtime_wasi_io::poll::{subscribe, Pollable}; impl TryFrom for Datetime { type Error = anyhow::Error; @@ -48,7 +48,7 @@ where fn subscribe_to_duration( table: &mut wasmtime::component::ResourceTable, duration: tokio::time::Duration, -) -> anyhow::Result> { +) -> anyhow::Result> { let sleep = if duration.is_zero() { table.push(Deadline::Past)? } else if let Some(deadline) = tokio::time::Instant::now().checked_add(duration) { @@ -76,7 +76,7 @@ where Ok(self.ctx().monotonic_clock.resolution()) } - fn subscribe_instant(&mut self, when: Instant) -> anyhow::Result> { + fn subscribe_instant(&mut self, when: Instant) -> anyhow::Result> { let clock_now = self.ctx().monotonic_clock.now(); let duration = if when > clock_now { Duration::from_nanos(when - clock_now) @@ -86,7 +86,10 @@ where subscribe_to_duration(&mut self.table(), duration) } - fn subscribe_duration(&mut self, duration: WasiDuration) -> anyhow::Result> { + fn subscribe_duration( + &mut self, + duration: WasiDuration, + ) -> anyhow::Result> { subscribe_to_duration(&mut self.table(), Duration::from_nanos(duration)) } } @@ -98,7 +101,7 @@ enum Deadline { } #[async_trait::async_trait] -impl Subscribe for Deadline { +impl Pollable for Deadline { async fn ready(&mut self) { match self { Deadline::Past => {} diff --git a/crates/wasi/src/host/filesystem.rs b/crates/wasi/src/host/filesystem.rs index a8cfb91754f3..0f7d096758fe 100644 --- a/crates/wasi/src/host/filesystem.rs +++ b/crates/wasi/src/host/filesystem.rs @@ -3,13 +3,13 @@ use crate::bindings::filesystem::preopens; use crate::bindings::filesystem::types::{ self, ErrorCode, HostDescriptor, HostDirectoryEntryStream, }; -use crate::bindings::io::streams::{InputStream, OutputStream}; use crate::filesystem::{ Descriptor, Dir, File, FileInputStream, FileOutputStream, OpenMode, ReaddirIterator, }; use crate::{DirPerms, FilePerms, FsError, FsResult, IoView, WasiImpl, WasiView}; use anyhow::Context; use wasmtime::component::Resource; +use wasmtime_wasi_io::streams::{DynInputStream, DynOutputStream}; mod sync; @@ -735,7 +735,7 @@ where &mut self, fd: Resource, offset: types::Filesize, - ) -> FsResult> { + ) -> FsResult> { // Trap if fd lookup fails: let f = self.table().get(&fd)?.file()?; @@ -744,7 +744,7 @@ where } // Create a stream view for it. - let reader: InputStream = Box::new(FileInputStream::new(f, offset)); + let reader: DynInputStream = Box::new(FileInputStream::new(f, offset)); // Insert the stream view into the table. Trap if the table is full. let index = self.table().push(reader)?; @@ -756,7 +756,7 @@ where &mut self, fd: Resource, offset: types::Filesize, - ) -> FsResult> { + ) -> FsResult> { // Trap if fd lookup fails: let f = self.table().get(&fd)?.file()?; @@ -766,7 +766,7 @@ where // Create a stream view for it. let writer = FileOutputStream::write_at(f, offset); - let writer: OutputStream = Box::new(writer); + let writer: DynOutputStream = Box::new(writer); // Insert the stream view into the table. Trap if the table is full. let index = self.table().push(writer)?; @@ -777,7 +777,7 @@ where fn append_via_stream( &mut self, fd: Resource, - ) -> FsResult> { + ) -> FsResult> { // Trap if fd lookup fails: let f = self.table().get(&fd)?.file()?; @@ -787,7 +787,7 @@ where // Create a stream view for it. let appender = FileOutputStream::append(f); - let appender: OutputStream = Box::new(appender); + let appender: DynOutputStream = Box::new(appender); // Insert the stream view into the table. Trap if the table is full. let index = self.table().push(appender)?; diff --git a/crates/wasi/src/host/io.rs b/crates/wasi/src/host/io.rs index 5ad1a65e84da..75d41865102e 100644 --- a/crates/wasi/src/host/io.rs +++ b/crates/wasi/src/host/io.rs @@ -1,39 +1,30 @@ use crate::{ - bindings::io::error, - bindings::io::streams::{self, InputStream, OutputStream}, - poll::subscribe, - IoImpl, IoView, Pollable, StreamError, StreamResult, + bindings::sync::io::poll::Pollable, + bindings::sync::io::streams::{self, InputStream, OutputStream}, + runtime::in_tokio, + IoImpl, IoView, StreamError, StreamResult, }; use wasmtime::component::Resource; +use wasmtime_wasi_io::bindings::wasi::io::streams::{ + self as async_streams, Host as AsyncHost, HostInputStream as AsyncHostInputStream, + HostOutputStream as AsyncHostOutputStream, +}; -impl error::Host for IoImpl where T: IoView {} - -impl streams::Host for IoImpl -where - T: IoView, -{ - fn convert_stream_error(&mut self, err: StreamError) -> anyhow::Result { - match err { - StreamError::Closed => Ok(streams::StreamError::Closed), - StreamError::LastOperationFailed(e) => Ok(streams::StreamError::LastOperationFailed( - self.table().push(e)?, - )), - StreamError::Trap(e) => Err(e), +impl From for streams::StreamError { + fn from(other: async_streams::StreamError) -> Self { + match other { + async_streams::StreamError::LastOperationFailed(e) => Self::LastOperationFailed(e), + async_streams::StreamError::Closed => Self::Closed, } } } -impl error::HostError for IoImpl +impl streams::Host for IoImpl where T: IoView, { - fn drop(&mut self, err: Resource) -> anyhow::Result<()> { - self.table().delete(err)?; - Ok(()) - } - - fn to_debug_string(&mut self, err: Resource) -> anyhow::Result { - Ok(format!("{:?}", self.table().get(&err)?)) + fn convert_stream_error(&mut self, err: StreamError) -> anyhow::Result { + Ok(AsyncHost::convert_stream_error(self, err)?.into()) } } @@ -41,132 +32,75 @@ impl streams::HostOutputStream for IoImpl where T: IoView, { - async fn drop(&mut self, stream: Resource) -> anyhow::Result<()> { - self.table().delete(stream)?.cancel().await; - Ok(()) + fn drop(&mut self, stream: Resource) -> anyhow::Result<()> { + in_tokio(async { AsyncHostOutputStream::drop(self, stream).await }) } fn check_write(&mut self, stream: Resource) -> StreamResult { - let bytes = self.table().get_mut(&stream)?.check_write()?; - Ok(bytes as u64) + Ok(AsyncHostOutputStream::check_write(self, stream)?) } fn write(&mut self, stream: Resource, bytes: Vec) -> StreamResult<()> { - self.table().get_mut(&stream)?.write(bytes.into())?; - Ok(()) + Ok(AsyncHostOutputStream::write(self, stream, bytes)?) } - fn subscribe(&mut self, stream: Resource) -> anyhow::Result> { - subscribe(self.table(), stream) - } - - async fn blocking_write_and_flush( + fn blocking_write_and_flush( &mut self, stream: Resource, bytes: Vec, ) -> StreamResult<()> { - if bytes.len() > 4096 { - return Err(StreamError::trap( - "Buffer too large for blocking-write-and-flush (expected at most 4096)", - )); - } - - self.table() - .get_mut(&stream)? - .blocking_write_and_flush(bytes.into()) - .await + in_tokio(async { + AsyncHostOutputStream::blocking_write_and_flush(self, stream, bytes).await + }) } - async fn blocking_write_zeroes_and_flush( + fn blocking_write_zeroes_and_flush( &mut self, stream: Resource, len: u64, ) -> StreamResult<()> { - if len > 4096 { - return Err(StreamError::trap( - "Buffer too large for blocking-write-zeroes-and-flush (expected at most 4096)", - )); - } + in_tokio(async { + AsyncHostOutputStream::blocking_write_zeroes_and_flush(self, stream, len).await + }) + } - self.table() - .get_mut(&stream)? - .blocking_write_zeroes_and_flush(len as usize) - .await + fn subscribe(&mut self, stream: Resource) -> anyhow::Result> { + Ok(AsyncHostOutputStream::subscribe(self, stream)?) } fn write_zeroes(&mut self, stream: Resource, len: u64) -> StreamResult<()> { - self.table().get_mut(&stream)?.write_zeroes(len as usize)?; - Ok(()) + Ok(AsyncHostOutputStream::write_zeroes(self, stream, len)?) } fn flush(&mut self, stream: Resource) -> StreamResult<()> { - self.table().get_mut(&stream)?.flush()?; - Ok(()) + Ok(AsyncHostOutputStream::flush( + self, + Resource::new_borrow(stream.rep()), + )?) } - async fn blocking_flush(&mut self, stream: Resource) -> StreamResult<()> { - let s = self.table().get_mut(&stream)?; - s.flush()?; - s.write_ready().await?; - Ok(()) + fn blocking_flush(&mut self, stream: Resource) -> StreamResult<()> { + in_tokio(async { + AsyncHostOutputStream::blocking_flush(self, Resource::new_borrow(stream.rep())).await + }) } fn splice( &mut self, - dest: Resource, + dst: Resource, src: Resource, len: u64, ) -> StreamResult { - let len = len.try_into().unwrap_or(usize::MAX); - - let permit = { - let output = self.table().get_mut(&dest)?; - output.check_write()? - }; - let len = len.min(permit); - if len == 0 { - return Ok(0); - } - - let contents = self.table().get_mut(&src)?.read(len)?; - - let len = contents.len(); - if len == 0 { - return Ok(0); - } - - let output = self.table().get_mut(&dest)?; - output.write(contents)?; - Ok(len.try_into().expect("usize can fit in u64")) + AsyncHostOutputStream::splice(self, dst, src, len) } - async fn blocking_splice( + fn blocking_splice( &mut self, - dest: Resource, + dst: Resource, src: Resource, len: u64, ) -> StreamResult { - let len = len.try_into().unwrap_or(usize::MAX); - - let permit = { - let output = self.table().get_mut(&dest)?; - output.write_ready().await? - }; - let len = len.min(permit); - if len == 0 { - return Ok(0); - } - - let contents = self.table().get_mut(&src)?.blocking_read(len).await?; - - let len = contents.len(); - if len == 0 { - return Ok(0); - } - - let output = self.table().get_mut(&dest)?; - output.blocking_write_and_flush(contents).await?; - Ok(len.try_into().expect("usize can fit in u64")) + in_tokio(async { AsyncHostOutputStream::blocking_splice(self, dst, src, len).await }) } } @@ -174,197 +108,27 @@ impl streams::HostInputStream for IoImpl where T: IoView, { - async fn drop(&mut self, stream: Resource) -> anyhow::Result<()> { - self.table().delete(stream)?.cancel().await; - Ok(()) + fn drop(&mut self, stream: Resource) -> anyhow::Result<()> { + in_tokio(async { AsyncHostInputStream::drop(self, stream).await }) } fn read(&mut self, stream: Resource, len: u64) -> StreamResult> { - let len = len.try_into().unwrap_or(usize::MAX); - let bytes = self.table().get_mut(&stream)?.read(len)?; - debug_assert!(bytes.len() <= len); - Ok(bytes.into()) + AsyncHostInputStream::read(self, stream, len) } - async fn blocking_read( - &mut self, - stream: Resource, - len: u64, - ) -> StreamResult> { - let len = len.try_into().unwrap_or(usize::MAX); - let bytes = self.table().get_mut(&stream)?.blocking_read(len).await?; - debug_assert!(bytes.len() <= len); - Ok(bytes.into()) + fn blocking_read(&mut self, stream: Resource, len: u64) -> StreamResult> { + in_tokio(async { AsyncHostInputStream::blocking_read(self, stream, len).await }) } fn skip(&mut self, stream: Resource, len: u64) -> StreamResult { - let len = len.try_into().unwrap_or(usize::MAX); - let written = self.table().get_mut(&stream)?.skip(len)?; - Ok(written.try_into().expect("usize always fits in u64")) + AsyncHostInputStream::skip(self, stream, len) } - async fn blocking_skip( - &mut self, - stream: Resource, - len: u64, - ) -> StreamResult { - let len = len.try_into().unwrap_or(usize::MAX); - let written = self.table().get_mut(&stream)?.blocking_skip(len).await?; - Ok(written.try_into().expect("usize always fits in u64")) + fn blocking_skip(&mut self, stream: Resource, len: u64) -> StreamResult { + in_tokio(async { AsyncHostInputStream::blocking_skip(self, stream, len).await }) } fn subscribe(&mut self, stream: Resource) -> anyhow::Result> { - crate::poll::subscribe(self.table(), stream) - } -} - -pub mod sync { - use crate::{ - bindings::io::streams::{ - self as async_streams, Host as AsyncHost, HostInputStream as AsyncHostInputStream, - HostOutputStream as AsyncHostOutputStream, - }, - bindings::sync::io::poll::Pollable, - bindings::sync::io::streams::{self, InputStream, OutputStream}, - runtime::in_tokio, - IoImpl, IoView, StreamError, StreamResult, - }; - use wasmtime::component::Resource; - - impl From for streams::StreamError { - fn from(other: async_streams::StreamError) -> Self { - match other { - async_streams::StreamError::LastOperationFailed(e) => Self::LastOperationFailed(e), - async_streams::StreamError::Closed => Self::Closed, - } - } - } - - impl streams::Host for IoImpl - where - T: IoView, - { - fn convert_stream_error( - &mut self, - err: StreamError, - ) -> anyhow::Result { - Ok(AsyncHost::convert_stream_error(self, err)?.into()) - } - } - - impl streams::HostOutputStream for IoImpl - where - T: IoView, - { - fn drop(&mut self, stream: Resource) -> anyhow::Result<()> { - in_tokio(async { AsyncHostOutputStream::drop(self, stream).await }) - } - - fn check_write(&mut self, stream: Resource) -> StreamResult { - Ok(AsyncHostOutputStream::check_write(self, stream)?) - } - - fn write(&mut self, stream: Resource, bytes: Vec) -> StreamResult<()> { - Ok(AsyncHostOutputStream::write(self, stream, bytes)?) - } - - fn blocking_write_and_flush( - &mut self, - stream: Resource, - bytes: Vec, - ) -> StreamResult<()> { - in_tokio(async { - AsyncHostOutputStream::blocking_write_and_flush(self, stream, bytes).await - }) - } - - fn blocking_write_zeroes_and_flush( - &mut self, - stream: Resource, - len: u64, - ) -> StreamResult<()> { - in_tokio(async { - AsyncHostOutputStream::blocking_write_zeroes_and_flush(self, stream, len).await - }) - } - - fn subscribe( - &mut self, - stream: Resource, - ) -> anyhow::Result> { - Ok(AsyncHostOutputStream::subscribe(self, stream)?) - } - - fn write_zeroes(&mut self, stream: Resource, len: u64) -> StreamResult<()> { - Ok(AsyncHostOutputStream::write_zeroes(self, stream, len)?) - } - - fn flush(&mut self, stream: Resource) -> StreamResult<()> { - Ok(AsyncHostOutputStream::flush( - self, - Resource::new_borrow(stream.rep()), - )?) - } - - fn blocking_flush(&mut self, stream: Resource) -> StreamResult<()> { - in_tokio(async { - AsyncHostOutputStream::blocking_flush(self, Resource::new_borrow(stream.rep())) - .await - }) - } - - fn splice( - &mut self, - dst: Resource, - src: Resource, - len: u64, - ) -> StreamResult { - AsyncHostOutputStream::splice(self, dst, src, len) - } - - fn blocking_splice( - &mut self, - dst: Resource, - src: Resource, - len: u64, - ) -> StreamResult { - in_tokio(async { AsyncHostOutputStream::blocking_splice(self, dst, src, len).await }) - } - } - - impl streams::HostInputStream for IoImpl - where - T: IoView, - { - fn drop(&mut self, stream: Resource) -> anyhow::Result<()> { - in_tokio(async { AsyncHostInputStream::drop(self, stream).await }) - } - - fn read(&mut self, stream: Resource, len: u64) -> StreamResult> { - AsyncHostInputStream::read(self, stream, len) - } - - fn blocking_read( - &mut self, - stream: Resource, - len: u64, - ) -> StreamResult> { - in_tokio(async { AsyncHostInputStream::blocking_read(self, stream, len).await }) - } - - fn skip(&mut self, stream: Resource, len: u64) -> StreamResult { - AsyncHostInputStream::skip(self, stream, len) - } - - fn blocking_skip(&mut self, stream: Resource, len: u64) -> StreamResult { - in_tokio(async { AsyncHostInputStream::blocking_skip(self, stream, len).await }) - } - - fn subscribe( - &mut self, - stream: Resource, - ) -> anyhow::Result> { - AsyncHostInputStream::subscribe(self, stream) - } + AsyncHostInputStream::subscribe(self, stream) } } diff --git a/crates/wasi/src/host/tcp.rs b/crates/wasi/src/host/tcp.rs index 04bfafd7141a..0b8ad5ae5b9b 100644 --- a/crates/wasi/src/host/tcp.rs +++ b/crates/wasi/src/host/tcp.rs @@ -1,16 +1,20 @@ use crate::network::SocketAddrUse; use crate::{ bindings::{ - io::streams::{InputStream, OutputStream}, sockets::network::{IpAddressFamily, IpSocketAddress, Network}, sockets::tcp::{self, ShutdownType}, }, network::SocketAddressFamily, }; -use crate::{IoView, Pollable, SocketResult, WasiImpl, WasiView}; +use crate::{SocketResult, WasiImpl, WasiView}; use std::net::SocketAddr; use std::time::Duration; use wasmtime::component::Resource; +use wasmtime_wasi_io::{ + poll::DynPollable, + streams::{DynInputStream, DynOutputStream}, + IoView, +}; impl tcp::Host for WasiImpl where T: WasiView {} @@ -72,7 +76,7 @@ where fn finish_connect( &mut self, this: Resource, - ) -> SocketResult<(Resource, Resource)> { + ) -> SocketResult<(Resource, Resource)> { let table = self.table(); let socket = table.get_mut(&this)?; @@ -103,8 +107,8 @@ where this: Resource, ) -> SocketResult<( Resource, - Resource, - Resource, + Resource, + Resource, )> { self.ctx().allowed_network_uses.check_allowed_tcp()?; let table = self.table(); @@ -280,8 +284,11 @@ where socket.set_send_buffer_size(value) } - fn subscribe(&mut self, this: Resource) -> anyhow::Result> { - crate::poll::subscribe(self.table(), this) + fn subscribe( + &mut self, + this: Resource, + ) -> anyhow::Result> { + wasmtime_wasi_io::poll::subscribe(self.table(), this) } fn shutdown( diff --git a/crates/wasi/src/host/udp.rs b/crates/wasi/src/host/udp.rs index 98f1a5644da6..ce67e270f859 100644 --- a/crates/wasi/src/host/udp.rs +++ b/crates/wasi/src/host/udp.rs @@ -6,9 +6,9 @@ use crate::{ sockets::udp, }, udp::{IncomingDatagramStream, OutgoingDatagramStream, SendState, UdpState}, - Subscribe, + Pollable, }; -use crate::{IoView, Pollable, SocketError, SocketResult, WasiImpl, WasiView}; +use crate::{IoView, SocketError, SocketResult, WasiImpl, WasiView}; use anyhow::anyhow; use async_trait::async_trait; use io_lifetimes::AsSocketlike; @@ -16,6 +16,7 @@ use rustix::io::Errno; use std::net::SocketAddr; use tokio::io::Interest; use wasmtime::component::Resource; +use wasmtime_wasi_io::poll::DynPollable; /// Theoretical maximum byte size of a UDP datagram, the real limit is lower, /// but we do not account for e.g. the transport layer here for simplicity. @@ -287,8 +288,11 @@ where Ok(()) } - fn subscribe(&mut self, this: Resource) -> anyhow::Result> { - crate::poll::subscribe(self.table(), this) + fn subscribe( + &mut self, + this: Resource, + ) -> anyhow::Result> { + wasmtime_wasi_io::poll::subscribe(self.table(), this) } fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { @@ -370,8 +374,8 @@ where fn subscribe( &mut self, this: Resource, - ) -> anyhow::Result> { - crate::poll::subscribe(self.table(), this) + ) -> anyhow::Result> { + wasmtime_wasi_io::poll::subscribe(self.table(), this) } fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { @@ -387,7 +391,7 @@ where } #[async_trait] -impl Subscribe for IncomingDatagramStream { +impl Pollable for IncomingDatagramStream { async fn ready(&mut self) { // FIXME: Add `Interest::ERROR` when we update to tokio 1.32. self.inner @@ -509,8 +513,8 @@ where fn subscribe( &mut self, this: Resource, - ) -> anyhow::Result> { - crate::poll::subscribe(self.table(), this) + ) -> anyhow::Result> { + wasmtime_wasi_io::poll::subscribe(self.table(), this) } fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { @@ -526,7 +530,7 @@ where } #[async_trait] -impl Subscribe for OutgoingDatagramStream { +impl Pollable for OutgoingDatagramStream { async fn ready(&mut self) { match self.send_state { SendState::Idle | SendState::Permitted(_) => {} diff --git a/crates/wasi/src/ip_name_lookup.rs b/crates/wasi/src/ip_name_lookup.rs index 8ed47b0d1e50..90824aa8a57d 100644 --- a/crates/wasi/src/ip_name_lookup.rs +++ b/crates/wasi/src/ip_name_lookup.rs @@ -1,7 +1,6 @@ use crate::bindings::sockets::ip_name_lookup::{Host, HostResolveAddressStream}; use crate::bindings::sockets::network::{ErrorCode, IpAddress, Network}; use crate::host::network::util; -use crate::poll::{subscribe, Pollable, Subscribe}; use crate::runtime::{spawn_blocking, AbortOnDropJoinHandle}; use crate::{IoView, SocketError, WasiImpl, WasiView}; use anyhow::Result; @@ -11,6 +10,7 @@ use std::pin::Pin; use std::str::FromStr; use std::vec; use wasmtime::component::Resource; +use wasmtime_wasi_io::poll::{subscribe, DynPollable, Pollable}; use super::network::{from_ipv4_addr, from_ipv6_addr}; @@ -73,7 +73,7 @@ where fn subscribe( &mut self, resource: Resource, - ) -> Result> { + ) -> Result> { subscribe(self.table(), resource) } @@ -84,7 +84,7 @@ where } #[async_trait::async_trait] -impl Subscribe for ResolveAddressStream { +impl Pollable for ResolveAddressStream { async fn ready(&mut self) { if let ResolveAddressStream::Waiting(future) = self { *self = ResolveAddressStream::Done(future.await.map(|v| v.into_iter())); diff --git a/crates/wasi/src/lib.rs b/crates/wasi/src/lib.rs index 58375b784f70..71eac772645c 100644 --- a/crates/wasi/src/lib.rs +++ b/crates/wasi/src/lib.rs @@ -1,5 +1,6 @@ //! # Wasmtime's WASI Implementation //! +//! //! This crate provides a Wasmtime host implementation of WASI 0.2 (aka WASIp2 //! aka Preview 2) and WASI 0.1 (aka WASIp1 aka Preview 1). WASI is implemented //! with the Rust crates [`tokio`] and [`cap-std`] primarily, meaning that @@ -27,9 +28,6 @@ //! * [`wasi:clocks/wall-clock`] //! * [`wasi:filesystem/preopens`] //! * [`wasi:filesystem/types`] -//! * [`wasi:io/error`] -//! * [`wasi:io/poll`] -//! * [`wasi:io/streams`] //! * [`wasi:random/insecure-seed`] //! * [`wasi:random/insecure`] //! * [`wasi:random/random`] @@ -42,8 +40,24 @@ //! * [`wasi:sockets/udp`] //! //! All traits are implemented in terms of a [`WasiView`] trait which provides -//! basic access to [`WasiCtx`], configuration for WASI, and [`ResourceTable`], -//! the state for all host-defined component model resources. +//! access to [`WasiCtx`], which defines the configuration for WASI. +//! The [`WasiView`] trait imples [`IoView`], which provides access to a common +//! [`ResourceTable`], which owns all host-defined component model resources. +//! +//! The [`wasmtime-wasi-io`] crate contains implementations of the +//! following interfaces, and this crate reuses those implementations: +//! +//! * [`wasi:io/error`] +//! * [`wasi:io/poll`] +//! * [`wasi:io/streams`] +//! +//! These traits are implemented in terms of a [`IoView`] trait, which only +//! provides access to a common [`ResourceTable`]. All aspects of +//! `wasmtime-wasi-io` that are used by this crate are re-exported. Unless you +//! are implementing other host functionality that needs to interact with the +//! WASI scheduler and don't want to use other functionality provided by +//! `wasmtime-wasi`, you don't need to take a direct dependency on +//! `wasmtime-wasi-io`. //! //! # Generated Bindings //! @@ -57,8 +71,12 @@ //! //! This crate's implementation of WASI is done in terms of an implementation of //! [`WasiView`]. This trait provides a "view" into WASI-related state that is -//! contained within a [`Store`](wasmtime::Store). All implementations of -//! traits look like: +//! contained within a [`Store`](wasmtime::Store). [`WasiView`] implies the +//! [`IoView`] trait, which provides access to common [`ResourceTable`] which +//! owns all host-implemented component model resources. +//! +//! For all of the generated bindings in this crate (Host traits), +//! implementations are provided looking like: //! //! ``` //! # use wasmtime_wasi::WasiImpl; @@ -72,21 +90,50 @@ //! The [`add_to_linker_sync`] and [`add_to_linker_async`] function then require //! that `T: WasiView` with [`Linker`](wasmtime::component::Linker). //! -//! To implement the [`WasiView`] trait you will first select a `T` to put in -//! `Store`. Next you'll implement the [`WasiView`] trait for `T`. Somewhere -//! within `T` you'll store: +//! To implement the [`WasiView`] and [`IoView`] trait you will first select a +//! `T` to put in `Store` (typically, by defining your own struct). +//! Somewhere within `T` you'll store: //! -//! * [`WasiCtx`] - created through [`WasiCtxBuilder`]. //! * [`ResourceTable`] - created through default constructors. +//! * [`WasiCtx`] - created through [`WasiCtxBuilder`]. +//! +//! You'll then write implementations of the [`IoView`] and [`WasiView`] +//! traits to access those items in your `T`. For example: +//! ``` +//! use wasmtime::component::ResourceTable; +//! use wasmtime_wasi::{WasiCtx, IoView, WasiView}; +//! struct MyCtx { +//! table: ResourceTable, +//! wasi: WasiCtx, +//! } +//! impl IoView for MyCtx { +//! fn table(&mut self) -> &mut ResourceTable { +//! &mut self.table +//! } +//! } +//! impl WasiView for MyCtx { +//! fn ctx(&mut self) -> &mut WasiCtx { +//! &mut self.wasi +//! } +//! } //! -//! These two fields are then accessed through the methods of [`WasiView`]. +//! ``` //! //! # Async and Sync //! -//! Many WASI functions are not blocking from WebAssembly's point of view, but -//! for those that do they're provided in two flavors: asynchronous and -//! synchronous. Which version you will use depends on how -//! [`Config::async_support`][async] is set. +//! As of WASI0.2, WASI functions are not blocking from WebAssembly's point of +//! view: a WebAssembly call into these functions returns when they are +//! complete. +//! +//! This crate provides an implementation of those functions in the host, +//! where for some functions, it is appropriate to implement them using +//! async Rust and the Tokio executor, so that the host implementation can be +//! nonblocking when Wasmtime's [`Config::async_support`][async] is set. +//! Synchronous wrappers are provided for all async implementations, which +//! creates a private Tokio executor. +//! +//! Users can choose between these modes of implementation using variants +//! of the add_to_linker functions: //! //! * For non-async users (the default of `Config`), use [`add_to_linker_sync`]. //! * For async users, use [`add_to_linker_async`]. @@ -107,13 +154,14 @@ //! allows it). There are a few important traits, however, that are specific to //! this crate. //! -//! * [`HostInputStream`] and [`HostOutputStream`] - these are the host traits +//! * [`InputStream`] and [`OutputStream`] - these are the host traits //! behind the WASI `input-stream` and `output-stream` types in the //! `wasi:io/streams` interface. These enable embedders to build their own -//! custom stream and insert them into a [`ResourceTable`] to be used from +//! custom stream and insert them into a [`ResourceTable`] (as a boxed trait +//! object, see [`DynInputStream`] and [`DynOutputStream`]) to be used from //! wasm. //! -//! * [`Subscribe`] - this trait enables building arbitrary logic to get hooked +//! * [`Pollable`] - this trait enables building arbitrary logic to get hooked //! into a `pollable` resource from `wasi:io/poll`. A pollable resource is //! created through the [`subscribe`] function. //! @@ -132,8 +180,8 @@ //! //! Usage of this crate is done through a few steps to get everything hooked up: //! -//! 1. First implement [`WasiView`] for your type which is the `T` in -//! `Store`. +//! 1. First implement [`IoView`] and [`WasiView`] for your type which is the +//! `T` in `Store`. //! 2. Add WASI interfaces to a `wasmtime::component::Linker`. This is either //! done through top-level functions like [`add_to_linker_sync`] or through //! individual `add_to_linker` functions in generated bindings throughout @@ -150,6 +198,7 @@ //! [`wasmtime::component::bindgen!`]: https://docs.rs/wasmtime/latest/wasmtime/component/macro.bindgen.html //! [`tokio`]: https://crates.io/crates/tokio //! [`cap-std`]: https://crates.io/crates/cap-std +//! [`wasmtime-wasi-io`]: https://crates.io/crates/wasmtime-wasi-io //! [`wasi:cli/environment`]: bindings::cli::environment::Host //! [`wasi:cli/exit`]: bindings::cli::exit::Host //! [`wasi:cli/stderr`]: bindings::cli::stderr::Host @@ -164,9 +213,9 @@ //! [`wasi:clocks/wall-clock`]: bindings::clocks::wall_clock::Host //! [`wasi:filesystem/preopens`]: bindings::filesystem::preopens::Host //! [`wasi:filesystem/types`]: bindings::filesystem::types::Host -//! [`wasi:io/error`]: bindings::io::error::Host -//! [`wasi:io/poll`]: bindings::io::poll::Host -//! [`wasi:io/streams`]: bindings::io::streams::Host +//! [`wasi:io/error`]: wasmtime_wasi_io::bindings::wasi::io::error::Host +//! [`wasi:io/poll`]: wasmtime_wasi_io::bindings::wasi::io::poll::Host +//! [`wasi:io/streams`]: wasmtime_wasi_io::bindings::wasi::io::streams::Host //! [`wasi:random/insecure-seed`]: bindings::random::insecure_seed::Host //! [`wasi:random/insecure`]: bindings::random::insecure::Host //! [`wasi:random/random`]: bindings::random::random::Host @@ -202,7 +251,6 @@ pub mod preview1; mod random; pub mod runtime; mod stdio; -mod stream; mod tcp; mod udp; mod view; @@ -213,16 +261,12 @@ pub use self::ctx::{WasiCtx, WasiCtxBuilder}; pub use self::error::{I32Exit, TrappableError}; pub use self::filesystem::{DirPerms, FileInputStream, FilePerms, FsError, FsResult}; pub use self::network::{Network, SocketAddrUse, SocketError, SocketResult}; -pub use self::poll::{subscribe, ClosureFuture, MakeFuture, Pollable, PollableFuture, Subscribe}; pub use self::random::{thread_rng, Deterministic}; pub use self::stdio::{ stderr, stdin, stdout, AsyncStdinStream, AsyncStdoutStream, IsATTY, OutputFile, Stderr, Stdin, StdinStream, Stdout, StdoutStream, }; -pub use self::stream::{ - HostInputStream, HostOutputStream, InputStream, OutputStream, StreamError, StreamResult, -}; -pub use self::view::{IoImpl, IoView, WasiImpl, WasiView}; +pub use self::view::{WasiImpl, WasiView}; #[doc(no_inline)] pub use async_trait::async_trait; #[doc(no_inline)] @@ -231,6 +275,14 @@ pub use cap_fs_ext::SystemTimeSpec; pub use cap_rand::RngCore; #[doc(no_inline)] pub use wasmtime::component::{ResourceTable, ResourceTableError}; +// These contents of wasmtime-wasi-io are re-exported by this crate for compatibility: +// they were originally defined in this crate before being factored out, and many +// users of this crate depend on them at these names. +pub use wasmtime_wasi_io::poll::{subscribe, DynFuture, DynPollable, MakeFuture, Pollable}; +pub use wasmtime_wasi_io::streams::{ + DynInputStream, DynOutputStream, InputStream, OutputStream, StreamError, StreamResult, +}; +pub use wasmtime_wasi_io::{IoImpl, IoView}; /// Add all WASI interfaces from this crate into the `linker` provided. /// @@ -301,16 +353,14 @@ pub fn add_to_linker_with_options_async( options: &crate::bindings::LinkOptions, ) -> anyhow::Result<()> { let l = linker; - let io_closure = io_type_annotate::(|t| IoImpl(t)); + wasmtime_wasi_io::add_to_linker_async(l)?; + let closure = type_annotate::(|t| WasiImpl(IoImpl(t))); crate::bindings::clocks::wall_clock::add_to_linker_get_host(l, closure)?; crate::bindings::clocks::monotonic_clock::add_to_linker_get_host(l, closure)?; crate::bindings::filesystem::types::add_to_linker_get_host(l, closure)?; crate::bindings::filesystem::preopens::add_to_linker_get_host(l, closure)?; - crate::bindings::io::error::add_to_linker_get_host(l, io_closure)?; - crate::bindings::io::poll::add_to_linker_get_host(l, io_closure)?; - crate::bindings::io::streams::add_to_linker_get_host(l, io_closure)?; crate::bindings::random::random::add_to_linker_get_host(l, closure)?; crate::bindings::random::insecure::add_to_linker_get_host(l, closure)?; crate::bindings::random::insecure_seed::add_to_linker_get_host(l, closure)?; @@ -403,15 +453,17 @@ pub fn add_to_linker_with_options_sync( ) -> anyhow::Result<()> { let l = linker; let io_closure = io_type_annotate::(|t| IoImpl(t)); + wasmtime_wasi_io::bindings::wasi::io::error::add_to_linker_get_host(l, io_closure)?; + + crate::bindings::sync::io::poll::add_to_linker_get_host(l, io_closure)?; + crate::bindings::sync::io::streams::add_to_linker_get_host(l, io_closure)?; + let closure = type_annotate::(|t| WasiImpl(IoImpl(t))); crate::bindings::clocks::wall_clock::add_to_linker_get_host(l, closure)?; crate::bindings::clocks::monotonic_clock::add_to_linker_get_host(l, closure)?; crate::bindings::sync::filesystem::types::add_to_linker_get_host(l, closure)?; crate::bindings::filesystem::preopens::add_to_linker_get_host(l, closure)?; - crate::bindings::io::error::add_to_linker_get_host(l, io_closure)?; - crate::bindings::sync::io::poll::add_to_linker_get_host(l, io_closure)?; - crate::bindings::sync::io::streams::add_to_linker_get_host(l, io_closure)?; crate::bindings::random::random::add_to_linker_get_host(l, closure)?; crate::bindings::random::insecure::add_to_linker_get_host(l, closure)?; crate::bindings::random::insecure_seed::add_to_linker_get_host(l, closure)?; diff --git a/crates/wasi/src/pipe.rs b/crates/wasi/src/pipe.rs index c09fc1c33b94..afd627defc2c 100644 --- a/crates/wasi/src/pipe.rs +++ b/crates/wasi/src/pipe.rs @@ -7,12 +7,14 @@ //! Some convenience constructors are included for common backing types like `Vec` and `String`, //! but the virtual pipes can be instantiated with any `Read` or `Write` type. //! -use crate::poll::Subscribe; -use crate::{HostInputStream, HostOutputStream, StreamError}; use anyhow::anyhow; use bytes::Bytes; use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; +use wasmtime_wasi_io::{ + poll::Pollable, + streams::{InputStream, OutputStream, StreamError}, +}; pub use crate::write_stream::AsyncWriteStream; @@ -34,7 +36,7 @@ impl MemoryInputPipe { } #[async_trait::async_trait] -impl HostInputStream for MemoryInputPipe { +impl InputStream for MemoryInputPipe { fn read(&mut self, size: usize) -> Result { let mut buffer = self.buffer.lock().unwrap(); if buffer.is_empty() { @@ -48,7 +50,7 @@ impl HostInputStream for MemoryInputPipe { } #[async_trait::async_trait] -impl Subscribe for MemoryInputPipe { +impl Pollable for MemoryInputPipe { async fn ready(&mut self) {} } @@ -76,7 +78,7 @@ impl MemoryOutputPipe { } #[async_trait::async_trait] -impl HostOutputStream for MemoryOutputPipe { +impl OutputStream for MemoryOutputPipe { fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> { let mut buf = self.buffer.lock().unwrap(); if bytes.len() > self.capacity - buf.len() { @@ -104,11 +106,11 @@ impl HostOutputStream for MemoryOutputPipe { } #[async_trait::async_trait] -impl Subscribe for MemoryOutputPipe { +impl Pollable for MemoryOutputPipe { async fn ready(&mut self) {} } -/// Provides a [`HostInputStream`] impl from a [`tokio::io::AsyncRead`] impl +/// Provides a [`InputStream`] impl from a [`tokio::io::AsyncRead`] impl pub struct AsyncReadStream { closed: bool, buffer: Option>, @@ -117,7 +119,7 @@ pub struct AsyncReadStream { } impl AsyncReadStream { - /// Create a [`AsyncReadStream`]. In order to use the [`HostInputStream`] impl + /// Create a [`AsyncReadStream`]. In order to use the [`InputStream`] impl /// provided by this struct, the argument must impl [`tokio::io::AsyncRead`]. pub fn new(mut reader: T) -> Self { let (sender, receiver) = mpsc::channel(1); @@ -150,7 +152,7 @@ impl AsyncReadStream { } #[async_trait::async_trait] -impl HostInputStream for AsyncReadStream { +impl InputStream for AsyncReadStream { fn read(&mut self, size: usize) -> Result { use mpsc::error::TryRecvError; @@ -200,7 +202,7 @@ impl HostInputStream for AsyncReadStream { } } #[async_trait::async_trait] -impl Subscribe for AsyncReadStream { +impl Pollable for AsyncReadStream { async fn ready(&mut self) { if self.buffer.is_some() || self.closed { return; @@ -219,7 +221,7 @@ impl Subscribe for AsyncReadStream { pub struct SinkOutputStream; #[async_trait::async_trait] -impl HostOutputStream for SinkOutputStream { +impl OutputStream for SinkOutputStream { fn write(&mut self, _buf: Bytes) -> Result<(), StreamError> { Ok(()) } @@ -235,7 +237,7 @@ impl HostOutputStream for SinkOutputStream { } #[async_trait::async_trait] -impl Subscribe for SinkOutputStream { +impl Pollable for SinkOutputStream { async fn ready(&mut self) {} } @@ -244,14 +246,14 @@ impl Subscribe for SinkOutputStream { pub struct ClosedInputStream; #[async_trait::async_trait] -impl HostInputStream for ClosedInputStream { +impl InputStream for ClosedInputStream { fn read(&mut self, _size: usize) -> Result { Err(StreamError::Closed) } } #[async_trait::async_trait] -impl Subscribe for ClosedInputStream { +impl Pollable for ClosedInputStream { async fn ready(&mut self) {} } @@ -260,7 +262,7 @@ impl Subscribe for ClosedInputStream { pub struct ClosedOutputStream; #[async_trait::async_trait] -impl HostOutputStream for ClosedOutputStream { +impl OutputStream for ClosedOutputStream { fn write(&mut self, _: Bytes) -> Result<(), StreamError> { Err(StreamError::Closed) } @@ -274,7 +276,7 @@ impl HostOutputStream for ClosedOutputStream { } #[async_trait::async_trait] -impl Subscribe for ClosedOutputStream { +impl Pollable for ClosedOutputStream { async fn ready(&mut self) {} } diff --git a/crates/wasi/src/poll.rs b/crates/wasi/src/poll.rs index 0d774b9b5e30..ee5fd9c44e5c 100644 --- a/crates/wasi/src/poll.rs +++ b/crates/wasi/src/poll.rs @@ -1,243 +1,29 @@ -use crate::{bindings::io::poll, IoImpl, IoView}; -use anyhow::{anyhow, Result}; -use std::any::Any; -use std::collections::HashMap; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use wasmtime::component::{Resource, ResourceTable}; +use crate::runtime::in_tokio; +use wasmtime_wasi_io::{bindings::wasi::io::poll as async_poll, poll::DynPollable, IoImpl, IoView}; -pub type PollableFuture<'a> = Pin + Send + 'a>>; -pub type MakeFuture = for<'a> fn(&'a mut dyn Any) -> PollableFuture<'a>; -pub type ClosureFuture = Box PollableFuture<'static> + Send + 'static>; +use anyhow::Result; +use wasmtime::component::Resource; -/// A host representation of the `wasi:io/poll.pollable` resource. -/// -/// A pollable is not the same thing as a Rust Future: the same pollable may be used to -/// repeatedly check for readiness of a given condition, e.g. if a stream is readable -/// or writable. So, rather than containing a Future, which can only become Ready once, a -/// Pollable contains a way to create a Future in each call to `poll`. -pub struct Pollable { - index: u32, - make_future: MakeFuture, - remove_index_on_delete: Option Result<()>>, -} - -/// A trait used internally within a [`Pollable`] to create a `pollable` -/// resource in `wasi:io/poll`. -/// -/// This trait is the internal implementation detail of any pollable resource in -/// this crate's implementation of WASI. The `ready` function is an `async fn` -/// which resolves when the implementation is ready. Using native `async` Rust -/// enables this type's readiness to compose with other types' readiness -/// throughout the WASI implementation. -/// -/// This trait is used in conjunction with [`subscribe`] to create a `pollable` -/// resource. -/// -/// # Example -/// -/// This is a simple example of creating a `Pollable` resource from a few -/// parameters. -/// -/// ``` -/// use tokio::time::{self, Duration, Instant}; -/// use wasmtime_wasi::{IoView, Subscribe, subscribe, Pollable, async_trait}; -/// use wasmtime::component::Resource; -/// use wasmtime::Result; -/// -/// fn sleep(cx: &mut dyn IoView, dur: Duration) -> Result> { -/// let end = Instant::now() + dur; -/// let sleep = MySleep { end }; -/// let sleep_resource = cx.table().push(sleep)?; -/// subscribe(cx.table(), sleep_resource) -/// } -/// -/// struct MySleep { -/// end: Instant, -/// } -/// -/// #[async_trait] -/// impl Subscribe for MySleep { -/// async fn ready(&mut self) { -/// tokio::time::sleep_until(self.end).await; -/// } -/// } -/// ``` -#[async_trait::async_trait] -pub trait Subscribe: Send + 'static { - /// An asynchronous function which resolves when this object's readiness - /// operation is ready. - /// - /// This function is invoked as part of `poll` in `wasi:io/poll`. The - /// meaning of when this function Returns depends on what object this - /// [`Subscribe`] is attached to. When the returned future resolves then the - /// corresponding call to `wasi:io/poll` will return. - /// - /// Note that this method does not return an error. Returning an error - /// should be done through accessors on the object that this `pollable` is - /// connected to. The call to `wasi:io/poll` itself does not return errors, - /// only a list of ready objects. - async fn ready(&mut self); -} - -/// Creates a `pollable` resource which is subscribed to the provided -/// `resource`. -/// -/// If `resource` is an owned resource then it will be deleted when the returned -/// resource is deleted. Otherwise the returned resource is considered a "child" -/// of the given `resource` which means that the given resource cannot be -/// deleted while the `pollable` is still alive. -pub fn subscribe(table: &mut ResourceTable, resource: Resource) -> Result> -where - T: Subscribe, -{ - fn make_future<'a, T>(stream: &'a mut dyn Any) -> PollableFuture<'a> - where - T: Subscribe, - { - stream.downcast_mut::().unwrap().ready() - } - - let pollable = Pollable { - index: resource.rep(), - remove_index_on_delete: if resource.owned() { - Some(|table, idx| { - let resource = Resource::::new_own(idx); - table.delete(resource)?; - Ok(()) - }) - } else { - None - }, - make_future: make_future::, - }; - - Ok(table.push_child(pollable, &resource)?) -} - -impl poll::Host for IoImpl +impl crate::bindings::sync::io::poll::Host for IoImpl where T: IoView, { - async fn poll(&mut self, pollables: Vec>) -> Result> { - type ReadylistIndex = u32; - - if pollables.is_empty() { - return Err(anyhow!("empty poll list")); - } - - let table = self.table(); - - let mut table_futures: HashMap)> = HashMap::new(); - - for (ix, p) in pollables.iter().enumerate() { - let ix: u32 = ix.try_into()?; - - let pollable = table.get(p)?; - let (_, list) = table_futures - .entry(pollable.index) - .or_insert((pollable.make_future, Vec::new())); - list.push(ix); - } - - let mut futures: Vec<(PollableFuture<'_>, Vec)> = Vec::new(); - for (entry, (make_future, readylist_indices)) in table.iter_entries(table_futures) { - let entry = entry?; - futures.push((make_future(entry), readylist_indices)); - } - - struct PollList<'a> { - futures: Vec<(PollableFuture<'a>, Vec)>, - } - impl<'a> Future for PollList<'a> { - type Output = Vec; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut any_ready = false; - let mut results = Vec::new(); - for (fut, readylist_indicies) in self.futures.iter_mut() { - match fut.as_mut().poll(cx) { - Poll::Ready(()) => { - results.extend_from_slice(readylist_indicies); - any_ready = true; - } - Poll::Pending => {} - } - } - if any_ready { - Poll::Ready(results) - } else { - Poll::Pending - } - } - } - - Ok(PollList { futures }.await) + fn poll(&mut self, pollables: Vec>) -> Result> { + in_tokio(async { async_poll::Host::poll(self, pollables).await }) } } -impl crate::bindings::io::poll::HostPollable for IoImpl +impl crate::bindings::sync::io::poll::HostPollable for IoImpl where T: IoView, { - async fn block(&mut self, pollable: Resource) -> Result<()> { - let table = self.table(); - let pollable = table.get(&pollable)?; - let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?); - ready.await; - Ok(()) + fn ready(&mut self, pollable: Resource) -> Result { + in_tokio(async { async_poll::HostPollable::ready(self, pollable).await }) } - async fn ready(&mut self, pollable: Resource) -> Result { - let table = self.table(); - let pollable = table.get(&pollable)?; - let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?); - futures::pin_mut!(ready); - Ok(matches!( - futures::future::poll_immediate(ready).await, - Some(()) - )) + fn block(&mut self, pollable: Resource) -> Result<()> { + in_tokio(async { async_poll::HostPollable::block(self, pollable).await }) } - fn drop(&mut self, pollable: Resource) -> Result<()> { - let pollable = self.table().delete(pollable)?; - if let Some(delete) = pollable.remove_index_on_delete { - delete(self.table(), pollable.index)?; - } - Ok(()) - } -} - -pub mod sync { - use crate::{ - bindings::io::poll as async_poll, - bindings::sync::io::poll::{self, Pollable}, - runtime::in_tokio, - IoImpl, IoView, - }; - use anyhow::Result; - use wasmtime::component::Resource; - - impl poll::Host for IoImpl - where - T: IoView, - { - fn poll(&mut self, pollables: Vec>) -> Result> { - in_tokio(async { async_poll::Host::poll(self, pollables).await }) - } - } - - impl crate::bindings::sync::io::poll::HostPollable for IoImpl - where - T: IoView, - { - fn ready(&mut self, pollable: Resource) -> Result { - in_tokio(async { async_poll::HostPollable::ready(self, pollable).await }) - } - fn block(&mut self, pollable: Resource) -> Result<()> { - in_tokio(async { async_poll::HostPollable::block(self, pollable).await }) - } - fn drop(&mut self, pollable: Resource) -> Result<()> { - async_poll::HostPollable::drop(self, pollable) - } + fn drop(&mut self, pollable: Resource) -> Result<()> { + async_poll::HostPollable::drop(self, pollable) } } diff --git a/crates/wasi/src/preview1.rs b/crates/wasi/src/preview1.rs index 5f355246038f..07b7e3e2a858 100644 --- a/crates/wasi/src/preview1.rs +++ b/crates/wasi/src/preview1.rs @@ -70,12 +70,8 @@ use crate::bindings::{ }, clocks::{monotonic_clock, wall_clock}, filesystem::{preopens::Host as _, types as filesystem}, - io::streams, -}; -use crate::{ - FsError, IoImpl, IoView, IsATTY, ResourceTable, StreamError, StreamResult, WasiCtx, WasiImpl, - WasiView, }; +use crate::{FsError, IsATTY, ResourceTable, WasiCtx, WasiImpl, WasiView}; use anyhow::{bail, Context}; use std::collections::{BTreeMap, HashSet}; use std::mem::{self, size_of, size_of_val}; @@ -85,14 +81,19 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use system_interface::fs::FileIoExt; use wasmtime::component::Resource; +use wasmtime_wasi_io::{ + bindings::wasi::io::streams, + streams::{StreamError, StreamResult}, + IoImpl, IoView, +}; use wiggle::tracing::instrument; use wiggle::{GuestError, GuestMemory, GuestPtr, GuestType}; // Bring all WASI traits in scope that this implementation builds on. use crate::bindings::cli::environment::Host as _; use crate::bindings::filesystem::types::HostDescriptor as _; -use crate::bindings::io::poll::Host as _; use crate::bindings::random::random::Host as _; +use wasmtime_wasi_io::bindings::wasi::io::poll::Host as _; /// Structure containing state for WASIp1. /// diff --git a/crates/wasi/src/stdio.rs b/crates/wasi/src/stdio.rs index 17ab5820db73..05c7abafadeb 100644 --- a/crates/wasi/src/stdio.rs +++ b/crates/wasi/src/stdio.rs @@ -2,17 +2,16 @@ use crate::bindings::cli::{ stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr, terminal_stdin, terminal_stdout, }; -use crate::bindings::io::streams; use crate::pipe; use crate::{ - HostInputStream, HostOutputStream, IoView, StreamError, StreamResult, Subscribe, WasiImpl, - WasiView, + InputStream, IoView, OutputStream, Pollable, StreamError, StreamResult, WasiImpl, WasiView, }; use bytes::Bytes; use std::io::IsTerminal; use std::sync::Arc; use tokio::sync::Mutex; use wasmtime::component::Resource; +use wasmtime_wasi_io::streams; /// A trait used to represent the standard input to a guest program. /// @@ -33,14 +32,14 @@ pub trait StdinStream: Send { /// all become ready for reading. Subsequently if one is read from it may /// mean that all the others are no longer ready for reading. This is /// basically a consequence of the way the WIT APIs are designed today. - fn stream(&self) -> Box; + fn stream(&self) -> Box; /// Returns whether this stream is backed by a TTY. fn isatty(&self) -> bool; } impl StdinStream for pipe::MemoryInputPipe { - fn stream(&self) -> Box { + fn stream(&self) -> Box { Box::new(self.clone()) } @@ -50,7 +49,7 @@ impl StdinStream for pipe::MemoryInputPipe { } impl StdinStream for pipe::ClosedInputStream { - fn stream(&self) -> Box { + fn stream(&self) -> Box { Box::new(*self) } @@ -62,9 +61,9 @@ impl StdinStream for pipe::ClosedInputStream { /// An impl of [`StdinStream`] built on top of [`crate::pipe::AsyncReadStream`]. // // Note the usage of `tokio::sync::Mutex` here as opposed to a -// `std::sync::Mutex`. This is intentionally done to implement the `Subscribe` +// `std::sync::Mutex`. This is intentionally done to implement the `Pollable` // variant of this trait. Note that in doing so we're left with the quandry of -// how to implement methods of `HostInputStream` since those methods are not +// how to implement methods of `InputStream` since those methods are not // `async`. They're currently implemented with `try_lock`, which then raises the // question of what to do on contention. Currently traps are returned. // @@ -82,11 +81,11 @@ impl StdinStream for pipe::ClosedInputStream { // bindings themselves. It's possible for the host to take this and work with it // on its own task, but that's niche enough it's not designed for. // -// Overall that means that the guest is either calling `Subscribe` or it's -// calling `HostInputStream` methods. This means that there should never be -// contention between the two at this time. This may all change in the future -// with WASI 0.3, but perhaps we'll have a better story for stdio at that time -// (see the doc block on the `HostOutputStream` impl below) +// Overall that means that the guest is either calling `Pollable` or +// `InputStream` methods. This means that there should never be contention +// between the two at this time. This may all change in the future with WASI +// 0.3, but perhaps we'll have a better story for stdio at that time (see the +// doc block on the `OutputStream` impl below) pub struct AsyncStdinStream(Arc>); impl AsyncStdinStream { @@ -96,7 +95,7 @@ impl AsyncStdinStream { } impl StdinStream for AsyncStdinStream { - fn stream(&self) -> Box { + fn stream(&self) -> Box { Box::new(Self(self.0.clone())) } fn isatty(&self) -> bool { @@ -105,7 +104,7 @@ impl StdinStream for AsyncStdinStream { } #[async_trait::async_trait] -impl HostInputStream for AsyncStdinStream { +impl InputStream for AsyncStdinStream { fn read(&mut self, size: usize) -> Result { match self.0.try_lock() { Ok(mut stream) => stream.read(size), @@ -130,7 +129,7 @@ impl HostInputStream for AsyncStdinStream { } #[async_trait::async_trait] -impl Subscribe for AsyncStdinStream { +impl Pollable for AsyncStdinStream { async fn ready(&mut self) { self.0.lock().await.ready().await } @@ -153,14 +152,14 @@ pub trait StdoutStream: Send { /// obtain. /// /// Implementations must be able to handle this - fn stream(&self) -> Box; + fn stream(&self) -> Box; /// Returns whether this stream is backed by a TTY. fn isatty(&self) -> bool; } impl StdoutStream for pipe::MemoryOutputPipe { - fn stream(&self) -> Box { + fn stream(&self) -> Box { Box::new(self.clone()) } @@ -170,7 +169,7 @@ impl StdoutStream for pipe::MemoryOutputPipe { } impl StdoutStream for pipe::SinkOutputStream { - fn stream(&self) -> Box { + fn stream(&self) -> Box { Box::new(*self) } @@ -180,7 +179,7 @@ impl StdoutStream for pipe::SinkOutputStream { } impl StdoutStream for pipe::ClosedOutputStream { - fn stream(&self) -> Box { + fn stream(&self) -> Box { Box::new(*self) } @@ -205,7 +204,7 @@ impl OutputFile { } impl StdoutStream for OutputFile { - fn stream(&self) -> Box { + fn stream(&self) -> Box { Box::new(OutputFileStream { file: Arc::clone(&self.file), }) @@ -221,11 +220,11 @@ struct OutputFileStream { } #[async_trait::async_trait] -impl Subscribe for OutputFileStream { +impl Pollable for OutputFileStream { async fn ready(&mut self) {} } -impl HostOutputStream for OutputFileStream { +impl OutputStream for OutputFileStream { fn write(&mut self, bytes: Bytes) -> StreamResult<()> { use std::io::Write; self.file @@ -260,8 +259,8 @@ pub fn stdout() -> Stdout { } impl StdoutStream for Stdout { - fn stream(&self) -> Box { - Box::new(OutputStream::Stdout) + fn stream(&self) -> Box { + Box::new(StdioOutputStream::Stdout) } fn isatty(&self) -> bool { @@ -284,8 +283,8 @@ pub fn stderr() -> Stderr { } impl StdoutStream for Stderr { - fn stream(&self) -> Box { - Box::new(OutputStream::Stderr) + fn stream(&self) -> Box { + Box::new(StdioOutputStream::Stderr) } fn isatty(&self) -> bool { @@ -293,17 +292,17 @@ impl StdoutStream for Stderr { } } -enum OutputStream { +enum StdioOutputStream { Stdout, Stderr, } -impl HostOutputStream for OutputStream { +impl OutputStream for StdioOutputStream { fn write(&mut self, bytes: Bytes) -> StreamResult<()> { use std::io::Write; match self { - OutputStream::Stdout => std::io::stdout().write_all(&bytes), - OutputStream::Stderr => std::io::stderr().write_all(&bytes), + StdioOutputStream::Stdout => std::io::stdout().write_all(&bytes), + StdioOutputStream::Stderr => std::io::stderr().write_all(&bytes), } .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e))) } @@ -311,8 +310,8 @@ impl HostOutputStream for OutputStream { fn flush(&mut self) -> StreamResult<()> { use std::io::Write; match self { - OutputStream::Stdout => std::io::stdout().flush(), - OutputStream::Stderr => std::io::stderr().flush(), + StdioOutputStream::Stdout => std::io::stdout().flush(), + StdioOutputStream::Stderr => std::io::stderr().flush(), } .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e))) } @@ -323,16 +322,16 @@ impl HostOutputStream for OutputStream { } #[async_trait::async_trait] -impl Subscribe for OutputStream { +impl Pollable for StdioOutputStream { async fn ready(&mut self) {} } /// A wrapper of [`crate::pipe::AsyncWriteStream`] that implements -/// [`StdoutStream`]. Note that the [`HostOutputStream`] impl for this is not +/// [`StdoutStream`]. Note that the [`OutputStream`] impl for this is not /// correct when used for interleaved async IO. // // Note that the use of `tokio::sync::Mutex` here is intentional, in addition to -// the `try_lock()` calls below in the implementation of `HostOutputStream`. For +// the `try_lock()` calls below in the implementation of `OutputStream`. For // more information see the documentation on `AsyncStdinStream`. pub struct AsyncStdoutStream(Arc>); @@ -343,7 +342,7 @@ impl AsyncStdoutStream { } impl StdoutStream for AsyncStdoutStream { - fn stream(&self) -> Box { + fn stream(&self) -> Box { Box::new(Self(self.0.clone())) } fn isatty(&self) -> bool { @@ -367,7 +366,7 @@ impl StdoutStream for AsyncStdoutStream { // If that expectation doesn't turn out to be true, and you find yourself at // this comment to correct it: sorry about that. #[async_trait::async_trait] -impl HostOutputStream for AsyncStdoutStream { +impl OutputStream for AsyncStdoutStream { fn check_write(&mut self) -> Result { match self.0.try_lock() { Ok(mut stream) => stream.check_write(), @@ -398,7 +397,7 @@ impl HostOutputStream for AsyncStdoutStream { } #[async_trait::async_trait] -impl Subscribe for AsyncStdoutStream { +impl Pollable for AsyncStdoutStream { async fn ready(&mut self) { self.0.lock().await.ready().await } @@ -414,7 +413,7 @@ impl stdin::Host for WasiImpl where T: WasiView, { - fn get_stdin(&mut self) -> Result, anyhow::Error> { + fn get_stdin(&mut self) -> Result, anyhow::Error> { let stream = self.ctx().stdin.stream(); Ok(self.table().push(stream)?) } @@ -424,7 +423,7 @@ impl stdout::Host for WasiImpl where T: WasiView, { - fn get_stdout(&mut self) -> Result, anyhow::Error> { + fn get_stdout(&mut self) -> Result, anyhow::Error> { let stream = self.ctx().stdout.stream(); Ok(self.table().push(stream)?) } @@ -434,7 +433,7 @@ impl stderr::Host for WasiImpl where T: WasiView, { - fn get_stderr(&mut self) -> Result, anyhow::Error> { + fn get_stderr(&mut self) -> Result, anyhow::Error> { let stream = self.ctx().stderr.stream(); Ok(self.table().push(stream)?) } @@ -507,7 +506,7 @@ where mod test { use crate::stdio::StdoutStream; use crate::write_stream::AsyncWriteStream; - use crate::{AsyncStdoutStream, HostOutputStream}; + use crate::{AsyncStdoutStream, OutputStream}; use anyhow::Result; use bytes::Bytes; use tokio::io::AsyncReadExt; @@ -515,7 +514,7 @@ mod test { #[test] fn memory_stdin_stream() { // A StdinStream has the property that there are multiple - // HostInputStreams created, using the stream() method which are each + // InputStreams created, using the stream() method which are each // views on the same shared state underneath. Consuming input on one // stream results in consuming that input on all streams. // @@ -544,7 +543,7 @@ mod test { #[tokio::test] async fn async_stdin_stream() { // A StdinStream has the property that there are multiple - // HostInputStreams created, using the stream() method which are each + // InputStreams created, using the stream() method which are each // views on the same shared state underneath. Consuming input on one // stream results in consuming that input on all streams. // @@ -599,10 +598,7 @@ mod test { task.await.unwrap(); } - async fn blocking_write_and_flush( - s: &mut dyn HostOutputStream, - mut bytes: Bytes, - ) -> Result<()> { + async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> { while !bytes.is_empty() { let permit = s.write_ready().await?; let len = bytes.len().min(permit); diff --git a/crates/wasi/src/stdio/worker_thread_stdin.rs b/crates/wasi/src/stdio/worker_thread_stdin.rs index 894bdd61affd..190069e18962 100644 --- a/crates/wasi/src/stdio/worker_thread_stdin.rs +++ b/crates/wasi/src/stdio/worker_thread_stdin.rs @@ -23,14 +23,16 @@ //! This module is one that's likely to change over time though as new systems //! are encountered along with preexisting bugs. -use crate::poll::Subscribe; use crate::stdio::StdinStream; -use crate::{HostInputStream, StreamError}; use bytes::{Bytes, BytesMut}; use std::io::{IsTerminal, Read}; use std::mem; use std::sync::{Condvar, Mutex, OnceLock}; use tokio::sync::Notify; +use wasmtime_wasi_io::{ + poll::Pollable, + streams::{InputStream, StreamError}, +}; #[derive(Default)] struct GlobalStdin { @@ -96,7 +98,7 @@ fn create() -> GlobalStdin { GlobalStdin::default() } -/// Only public interface is the [`HostInputStream`] impl. +/// Only public interface is the [`InputStream`] impl. #[derive(Clone)] pub struct Stdin; @@ -109,7 +111,7 @@ pub fn stdin() -> Stdin { } impl StdinStream for Stdin { - fn stream(&self) -> Box { + fn stream(&self) -> Box { Box::new(Stdin) } @@ -119,7 +121,7 @@ impl StdinStream for Stdin { } #[async_trait::async_trait] -impl HostInputStream for Stdin { +impl InputStream for Stdin { fn read(&mut self, size: usize) -> Result { let g = GlobalStdin::get(); let mut locked = g.state.lock().unwrap(); @@ -152,7 +154,7 @@ impl HostInputStream for Stdin { } #[async_trait::async_trait] -impl Subscribe for Stdin { +impl Pollable for Stdin { async fn ready(&mut self) { let g = GlobalStdin::get(); diff --git a/crates/wasi/src/tcp.rs b/crates/wasi/src/tcp.rs index 5b05ebb25339..5edaef9c776a 100644 --- a/crates/wasi/src/tcp.rs +++ b/crates/wasi/src/tcp.rs @@ -3,8 +3,8 @@ use crate::host::network; use crate::network::SocketAddressFamily; use crate::runtime::{with_ambient_tokio_runtime, AbortOnDropJoinHandle}; use crate::{ - HostInputStream, HostOutputStream, InputStream, OutputStream, SocketError, SocketResult, - StreamError, Subscribe, + DynInputStream, DynOutputStream, InputStream, OutputStream, Pollable, SocketError, + SocketResult, StreamError, }; use anyhow::Result; use cap_net_ext::AddressFamily; @@ -263,7 +263,7 @@ impl TcpSocket { Ok(()) } - pub fn finish_connect(&mut self) -> SocketResult<(InputStream, OutputStream)> { + pub fn finish_connect(&mut self) -> SocketResult<(DynInputStream, DynOutputStream)> { let previous_state = std::mem::replace(&mut self.tcp_state, TcpState::Closed); let result = match previous_state { TcpState::ConnectReady(result) => result, @@ -293,8 +293,8 @@ impl TcpSocket { reader: reader.clone(), writer: writer.clone(), }; - let input: InputStream = Box::new(TcpReadStream(reader)); - let output: OutputStream = Box::new(TcpWriteStream(writer)); + let input: DynInputStream = Box::new(TcpReadStream(reader)); + let output: DynOutputStream = Box::new(TcpWriteStream(writer)); Ok((input, output)) } Err(err) => { @@ -360,7 +360,7 @@ impl TcpSocket { } } - pub fn accept(&mut self) -> SocketResult<(Self, InputStream, OutputStream)> { + pub fn accept(&mut self) -> SocketResult<(Self, DynInputStream, DynOutputStream)> { let TcpState::Listening { listener, pending_accept, @@ -445,8 +445,8 @@ impl TcpSocket { let reader = Arc::new(Mutex::new(TcpReader::new(client.clone()))); let writer = Arc::new(Mutex::new(TcpWriter::new(client.clone()))); - let input: InputStream = Box::new(TcpReadStream(reader.clone())); - let output: OutputStream = Box::new(TcpWriteStream(writer.clone())); + let input: DynInputStream = Box::new(TcpReadStream(reader.clone())); + let output: DynOutputStream = Box::new(TcpWriteStream(writer.clone())); let tcp_socket = TcpSocket::from_state( TcpState::Connected { stream: client, @@ -656,7 +656,7 @@ impl TcpSocket { } #[async_trait::async_trait] -impl Subscribe for TcpSocket { +impl Pollable for TcpSocket { async fn ready(&mut self) { match &mut self.tcp_state { TcpState::Default(..) @@ -748,14 +748,14 @@ impl TcpReader { struct TcpReadStream(Arc>); #[async_trait::async_trait] -impl HostInputStream for TcpReadStream { +impl InputStream for TcpReadStream { fn read(&mut self, size: usize) -> Result { try_lock_for_stream(&self.0)?.read(size) } } #[async_trait::async_trait] -impl Subscribe for TcpReadStream { +impl Pollable for TcpReadStream { async fn ready(&mut self) { self.0.lock().await.ready().await } @@ -954,7 +954,7 @@ impl TcpWriter { struct TcpWriteStream(Arc>); #[async_trait::async_trait] -impl HostOutputStream for TcpWriteStream { +impl OutputStream for TcpWriteStream { fn write(&mut self, bytes: bytes::Bytes) -> Result<(), StreamError> { try_lock_for_stream(&self.0)?.write(bytes) } @@ -973,7 +973,7 @@ impl HostOutputStream for TcpWriteStream { } #[async_trait::async_trait] -impl Subscribe for TcpWriteStream { +impl Pollable for TcpWriteStream { async fn ready(&mut self) { self.0.lock().await.ready().await } diff --git a/crates/wasi/src/udp.rs b/crates/wasi/src/udp.rs index 1e0b1a199cf4..04e0981b95ee 100644 --- a/crates/wasi/src/udp.rs +++ b/crates/wasi/src/udp.rs @@ -1,5 +1,4 @@ use crate::host::network::util; -use crate::poll::Subscribe; use crate::runtime::with_ambient_tokio_runtime; use async_trait::async_trait; use cap_net_ext::{AddressFamily, Blocking}; @@ -7,6 +6,7 @@ use io_lifetimes::raw::{FromRawSocketlike, IntoRawSocketlike}; use std::io; use std::net::SocketAddr; use std::sync::Arc; +use wasmtime_wasi_io::poll::Pollable; use super::network::{SocketAddrCheck, SocketAddressFamily}; @@ -49,7 +49,7 @@ pub struct UdpSocket { } #[async_trait] -impl Subscribe for UdpSocket { +impl Pollable for UdpSocket { async fn ready(&mut self) { // None of the socket-level operations block natively } diff --git a/crates/wasi/src/view.rs b/crates/wasi/src/view.rs index 58375b1b60ab..5ec710958b13 100644 --- a/crates/wasi/src/view.rs +++ b/crates/wasi/src/view.rs @@ -1,64 +1,59 @@ use crate::ctx::WasiCtx; use wasmtime::component::ResourceTable; +pub use wasmtime_wasi_io::{IoImpl, IoView}; -pub trait IoView: Send { - /// Yields mutable access to the internal resource management that this - /// context contains. - /// - /// Embedders can add custom resources to this table as well to give - /// resources to wasm as well. - fn table(&mut self) -> &mut ResourceTable; -} - +/// A trait which provides access to the [`WasiCtx`] inside the embedder's `T` +/// of [`Store`][`Store`]. +/// +/// This crate's WASI Host implementations depend on the contents of +/// [`WasiCtx`]. The `T` type [`Store`][`Store`] is defined in each +/// embedding of Wasmtime. These implementations are connected to the +/// [`Linker`][`Linker`] by the +/// [`add_to_linker_sync`](crate::add_to_linker_sync) and +/// [`add_to_linker_async`](crate::add_to_linker_async) functions. +/// +/// The [`WasiView`] trait implies the [`IoView`] trait, so each `T` must +/// also contain a [`ResourceTable`] and impl `IoView`. +/// +/// # Example +/// +/// ``` +/// use wasmtime_wasi::{WasiCtx, ResourceTable, WasiView, IoView, WasiCtxBuilder}; +/// +/// struct MyState { +/// ctx: WasiCtx, +/// table: ResourceTable, +/// } +/// +/// impl IoView for MyState { +/// fn table(&mut self) -> &mut ResourceTable { &mut self.table } +/// } +/// impl WasiView for MyState { +/// fn ctx(&mut self) -> &mut WasiCtx { &mut self.ctx } +/// } +/// ``` +/// [`Store`]: wasmtime::Store +/// [`Linker`]: wasmtime::component::Linker +/// [`ResourceTable`]: wasmtime::component::ResourceTable +/// pub trait WasiView: IoView { - /// Yields mutable access to the configuration used for this context. - /// - /// The returned type is created through [`WasiCtxBuilder`]. + /// Yields mutable access to the [`WasiCtx`] configuration used for this + /// context. fn ctx(&mut self) -> &mut WasiCtx; } -impl IoView for &mut T { - fn table(&mut self) -> &mut ResourceTable { - T::table(self) - } -} impl WasiView for &mut T { fn ctx(&mut self) -> &mut WasiCtx { T::ctx(self) } } -impl IoView for Box { - fn table(&mut self) -> &mut ResourceTable { - T::table(self) - } -} impl WasiView for Box { fn ctx(&mut self) -> &mut WasiCtx { T::ctx(self) } } -/// A small newtype wrapper which serves as the basis for implementations of -/// `Host` WASI traits in this crate. -/// -/// This type is used as the basis for the implementation of all `Host` traits -/// generated by `bindgen!` for WASI interfaces. This is used automatically with -/// [`add_to_linker_sync`](crate::add_to_linker_sync) and -/// [`add_to_linker_async`](crate::add_to_linker_async). -/// -/// This type is otherwise provided if you're calling the `add_to_linker` -/// functions generated by `bindgen!` from the [`bindings` -/// module](crate::bindings). In this situation you'll want to create a value of -/// this type in the closures added to a `Linker`. -#[repr(transparent)] -pub struct IoImpl(pub T); -impl IoView for IoImpl { - fn table(&mut self) -> &mut ResourceTable { - T::table(&mut self.0) - } -} - /// A small newtype wrapper which serves as the basis for implementations of /// `Host` WASI traits in this crate. /// diff --git a/crates/wasi/src/write_stream.rs b/crates/wasi/src/write_stream.rs index fe3658662ff2..f1c6c3d66730 100644 --- a/crates/wasi/src/write_stream.rs +++ b/crates/wasi/src/write_stream.rs @@ -1,4 +1,4 @@ -use crate::{HostOutputStream, StreamError, Subscribe}; +use crate::{OutputStream, Pollable, StreamError}; use anyhow::anyhow; use bytes::Bytes; use std::sync::{Arc, Mutex}; @@ -136,14 +136,14 @@ impl Worker { } } -/// Provides a [`HostOutputStream`] impl from a [`tokio::io::AsyncWrite`] impl +/// Provides a [`OutputStream`] impl from a [`tokio::io::AsyncWrite`] impl pub struct AsyncWriteStream { worker: Arc, join_handle: Option>, } impl AsyncWriteStream { - /// Create a [`AsyncWriteStream`]. In order to use the [`HostOutputStream`] impl + /// Create a [`AsyncWriteStream`]. In order to use the [`OutputStream`] impl /// provided by this struct, the argument must impl [`tokio::io::AsyncWrite`]. pub fn new( write_budget: usize, @@ -162,7 +162,7 @@ impl AsyncWriteStream { } #[async_trait::async_trait] -impl HostOutputStream for AsyncWriteStream { +impl OutputStream for AsyncWriteStream { fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> { let mut state = self.worker.state(); state.check_error()?; @@ -204,7 +204,7 @@ impl HostOutputStream for AsyncWriteStream { } } #[async_trait::async_trait] -impl Subscribe for AsyncWriteStream { +impl Pollable for AsyncWriteStream { async fn ready(&mut self) { self.worker.ready().await; } diff --git a/crates/wasi/tests/all/api.rs b/crates/wasi/tests/all/api.rs index ca5219988744..1218966d89c3 100644 --- a/crates/wasi/tests/all/api.rs +++ b/crates/wasi/tests/all/api.rs @@ -168,7 +168,7 @@ async fn api_reactor() -> Result<()> { // Note, this works because of the add_to_linker invocations using the // `host` crate for `streams`, not because of `with` in the bindgen macro. let writepipe = wasmtime_wasi::pipe::MemoryOutputPipe::new(4096); - let stream: wasmtime_wasi::OutputStream = Box::new(writepipe.clone()); + let stream: wasmtime_wasi::DynOutputStream = Box::new(writepipe.clone()); let table_ix = store.data_mut().table().push(stream)?; let r = reactor.call_write_strings_to(&mut store, table_ix).await?; assert_eq!(r, Ok(())); diff --git a/crates/wasi/tests/process_stdin.rs b/crates/wasi/tests/process_stdin.rs index df1aeff7d815..dc1688301d28 100644 --- a/crates/wasi/tests/process_stdin.rs +++ b/crates/wasi/tests/process_stdin.rs @@ -1,6 +1,6 @@ use std::io::{BufRead, Write}; use std::process::Command; -use wasmtime_wasi::{HostInputStream, Subscribe}; +use wasmtime_wasi::{InputStream, Pollable}; const VAR_NAME: &str = "__CHILD_PROCESS"; diff --git a/crates/wasmtime/src/runtime/component/resource_table.rs b/crates/wasmtime/src/runtime/component/resource_table.rs index cceb65c20cd4..ab8e0e3c6017 100644 --- a/crates/wasmtime/src/runtime/component/resource_table.rs +++ b/crates/wasmtime/src/runtime/component/resource_table.rs @@ -1,6 +1,6 @@ use super::Resource; use crate::prelude::*; -use alloc::collections::BTreeSet; +use alloc::collections::{BTreeMap, BTreeSet}; use core::any::Any; use core::fmt; @@ -29,8 +29,7 @@ impl fmt::Display for ResourceTableError { } } -#[cfg(feature = "std")] -impl std::error::Error for ResourceTableError {} +impl core::error::Error for ResourceTableError {} /// The `ResourceTable` type maps a `Resource` to its `T`. #[derive(Debug)] @@ -285,12 +284,11 @@ impl ResourceTable { } /// Zip the values of the map with mutable references to table entries corresponding to each - /// key. As the keys in the `HashMap` are unique, this iterator can give mutable references + /// key. As the keys in the `BTreeMap` are unique, this iterator can give mutable references /// with the same lifetime as the mutable reference to the [ResourceTable]. - #[cfg(feature = "std")] pub fn iter_entries<'a, T>( &'a mut self, - map: std::collections::HashMap, + map: BTreeMap, ) -> impl Iterator, T)> { map.into_iter().map(move |(k, v)| { let item = self diff --git a/scripts/publish.rs b/scripts/publish.rs index fbd38d574618..14964fc91255 100644 --- a/scripts/publish.rs +++ b/scripts/publish.rs @@ -65,6 +65,7 @@ const CRATES_TO_PUBLISH: &[&str] = &[ "wiggle", "wasi-common", // other misc wasmtime crates + "wasmtime-wasi-io", "wasmtime-wasi", "wasmtime-wasi-http", "wasmtime-wasi-nn", @@ -87,7 +88,9 @@ const PUBLIC_CRATES: &[&str] = &[ // these are actually public crates which we cannot break the API of in // patch releases. "wasmtime", + "wasmtime-wasi-io", "wasmtime-wasi", + "wasmtime-wasi-http", "wasmtime-wasi-nn", "wasmtime-wasi-config", "wasmtime-wasi-keyvalue", diff --git a/src/commands/serve.rs b/src/commands/serve.rs index cd54a468a910..1f04d1875afc 100644 --- a/src/commands/serve.rs +++ b/src/commands/serve.rs @@ -578,7 +578,7 @@ impl LogStream { } impl wasmtime_wasi::StdoutStream for LogStream { - fn stream(&self) -> Box { + fn stream(&self) -> Box { Box::new(self.clone()) } @@ -592,7 +592,7 @@ impl wasmtime_wasi::StdoutStream for LogStream { } } -impl wasmtime_wasi::HostOutputStream for LogStream { +impl wasmtime_wasi::OutputStream for LogStream { fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> { let mut bytes = &bytes[..]; @@ -634,7 +634,7 @@ impl wasmtime_wasi::HostOutputStream for LogStream { } #[async_trait::async_trait] -impl wasmtime_wasi::Subscribe for LogStream { +impl wasmtime_wasi::Pollable for LogStream { async fn ready(&mut self) {} }