Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add no_std support to bevy_tasks #15464

Merged
merged 13 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,31 @@ license = "MIT OR Apache-2.0"
keywords = ["bevy"]

[features]
multi_threaded = ["dep:async-channel", "dep:concurrent-queue"]
default = ["std", "async_executor"]
std = ["futures-lite/std", "async-task/std", "spin/std", "edge-executor?/std"]
multi_threaded = ["std", "dep:async-channel", "dep:concurrent-queue"]
async_executor = ["std", "dep:async-executor"]
edge_executor = ["dep:edge-executor"]
critical-section = ["edge-executor/critical-section"]
portable-atomic = ["edge-executor/portable-atomic"]

[dependencies]
futures-lite = "2.0.1"
async-executor = "1.11"
futures-lite = { version = "2.0.1", default-features = false, features = [
"alloc",
] }
async-task = { version = "4.4.0", default-features = false }
spin = { version = "0.9.8", default-features = false, features = [
"spin_mutex",
"rwlock",
"once",
] }
derive_more = { version = "1", default-features = false, features = [
"deref",
"deref_mut",
] }

async-executor = { version = "1.11", optional = true }
edge-executor = { version = "0.4.1", default-features = false, optional = true }
async-channel = { version = "2.3.0", optional = true }
async-io = { version = "2.0.0", optional = true }
concurrent-queue = { version = "2.0.0", optional = true }
Expand Down
59 changes: 59 additions & 0 deletions crates/bevy_tasks/src/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
pub use async_task::Task;
use core::{
fmt,
panic::{RefUnwindSafe, UnwindSafe},
};
use derive_more::{Deref, DerefMut};

#[cfg(feature = "multi_threaded")]
pub use async_task::FallibleTask;

#[cfg(feature = "async_executor")]
type ExecutorInner<'a> = async_executor::Executor<'a>;

#[cfg(feature = "async_executor")]
type LocalExecutorInner<'a> = async_executor::LocalExecutor<'a>;

#[cfg(all(not(feature = "async_executor"), feature = "edge_executor"))]
type ExecutorInner<'a> = edge_executor::Executor<'a, 64>;

#[cfg(all(not(feature = "async_executor"), feature = "edge_executor"))]
type LocalExecutorInner<'a> = edge_executor::LocalExecutor<'a, 64>;

#[derive(Deref, DerefMut, Default)]
pub struct Executor<'a>(ExecutorInner<'a>);

#[derive(Deref, DerefMut, Default)]
pub struct LocalExecutor<'a>(LocalExecutorInner<'a>);

impl Executor<'_> {
#[allow(dead_code, reason = "not all feature flags require this function")]
pub const fn new() -> Self {
Self(ExecutorInner::new())
}
}

impl LocalExecutor<'_> {
#[allow(dead_code, reason = "not all feature flags require this function")]
pub const fn new() -> Self {
Self(LocalExecutorInner::new())
}
}

impl UnwindSafe for Executor<'_> {}
impl RefUnwindSafe for Executor<'_> {}

impl UnwindSafe for LocalExecutor<'_> {}
impl RefUnwindSafe for LocalExecutor<'_> {}

impl fmt::Debug for Executor<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Executor").finish()
}
}

impl fmt::Debug for LocalExecutor<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LocalExecutor").finish()
}
}
1 change: 1 addition & 0 deletions crates/bevy_tasks/src/iter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::TaskPool;
use alloc::vec::Vec;

mod adapters;
pub use adapters::*;
Expand Down
22 changes: 17 additions & 5 deletions crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
html_logo_url = "https://bevyengine.org/assets/icon.png",
html_favicon_url = "https://bevyengine.org/assets/icon.png"
)]
#![cfg_attr(not(feature = "std"), no_std)]

extern crate alloc;

mod executor;

mod slice;
pub use slice::{ParallelSlice, ParallelSliceMut};

Expand Down Expand Up @@ -37,9 +40,9 @@ mod thread_executor;
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
pub use thread_executor::{ThreadExecutor, ThreadExecutorTicker};

#[cfg(feature = "async-io")]
#[cfg(all(feature = "async-io", feature = "std"))]
pub use async_io::block_on;
#[cfg(not(feature = "async-io"))]
#[cfg(all(not(feature = "async-io"), feature = "std"))]
pub use futures_lite::future::block_on;
pub use futures_lite::future::poll_once;

Expand All @@ -54,13 +57,17 @@ pub use futures_lite;
pub mod prelude {
#[doc(hidden)]
pub use crate::{
block_on,
iter::ParallelIterator,
slice::{ParallelSlice, ParallelSliceMut},
usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool},
};

#[cfg(feature = "std")]
#[doc(hidden)]
pub use crate::block_on;
}

#[cfg(feature = "std")]
use core::num::NonZero;

/// Gets the logical CPU core count available to the current process.
Expand All @@ -70,7 +77,12 @@ use core::num::NonZero;
///
/// This will always return at least 1.
pub fn available_parallelism() -> usize {
std::thread::available_parallelism()
#[cfg(feature = "std")]
return std::thread::available_parallelism()
.map(NonZero::<usize>::get)
.unwrap_or(1)
.unwrap_or(1);

// Without access to std, assume a single thread is available
#[cfg(not(feature = "std"))]
return 1;
}
130 changes: 99 additions & 31 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
use alloc::{rc::Rc, sync::Arc};
use alloc::{string::String, sync::Arc, vec::Vec};
use core::{cell::RefCell, future::Future, marker::PhantomData, mem};

use crate::Task;

#[cfg(feature = "std")]
use crate::executor::LocalExecutor;

#[cfg(not(feature = "std"))]
use crate::executor::Executor as LocalExecutor;

#[cfg(feature = "std")]
thread_local! {
static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() };
static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() };
}

#[cfg(not(feature = "std"))]
static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() };

#[cfg(feature = "std")]
type ScopeResult<T> = alloc::rc::Rc<RefCell<Option<T>>>;

#[cfg(not(feature = "std"))]
type ScopeResult<T> = Arc<spin::Mutex<Option<T>>>;

/// Used to create a [`TaskPool`].
#[derive(Debug, Default, Clone)]
pub struct TaskPoolBuilder {}
Expand Down Expand Up @@ -124,15 +140,13 @@ impl TaskPool {
// Any usages of the references passed into `Scope` must be accessed through
// the transmuted reference for the rest of this function.

let executor = &async_executor::LocalExecutor::new();
let executor = &LocalExecutor::new();
// SAFETY: As above, all futures must complete in this function so we can change the lifetime
let executor: &'env async_executor::LocalExecutor<'env> =
unsafe { mem::transmute(executor) };
let executor: &'env LocalExecutor<'env> = unsafe { mem::transmute(executor) };

let results: RefCell<Vec<Rc<RefCell<Option<T>>>>> = RefCell::new(Vec::new());
let results: RefCell<Vec<ScopeResult<T>>> = RefCell::new(Vec::new());
// SAFETY: As above, all futures must complete in this function so we can change the lifetime
let results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>> =
unsafe { mem::transmute(&results) };
let results: &'env RefCell<Vec<ScopeResult<T>>> = unsafe { mem::transmute(&results) };

let mut scope = Scope {
executor,
Expand All @@ -152,7 +166,16 @@ impl TaskPool {
let results = scope.results.borrow();
results
.iter()
.map(|result| result.borrow_mut().take().unwrap())
.map(|result| {
#[cfg(feature = "std")]
return result.borrow_mut().take().unwrap();

#[cfg(not(feature = "std"))]
{
let mut lock = result.lock();
return lock.take().unwrap();
}
})
.collect()
}

Expand All @@ -162,29 +185,42 @@ impl TaskPool {
/// end-user.
///
/// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
pub fn spawn<T>(
&self,
future: impl Future<Output = T> + 'static + MaybeSend + MaybeSync,
) -> Task<T>
where
T: 'static,
T: 'static + MaybeSend + MaybeSync,
{
#[cfg(target_arch = "wasm32")]
#[cfg(all(target_arch = "wasm32", feature = "std"))]
return Task::wrap_future(future);

#[cfg(not(target_arch = "wasm32"))]
{
LOCAL_EXECUTOR.with(|executor| {
let task = executor.spawn(future);
// Loop until all tasks are done
while executor.try_tick() {}
#[cfg(all(not(target_arch = "wasm32"), feature = "std"))]
return LOCAL_EXECUTOR.with(|executor| {
let task = executor.spawn(future);
// Loop until all tasks are done
while executor.try_tick() {}

Task::new(task)
})
}
Task::new(task)
});

#[cfg(not(feature = "std"))]
return {
let task = LOCAL_EXECUTOR.spawn(future);
// Loop until all tasks are done
while LOCAL_EXECUTOR.try_tick() {}

Task::new(task)
};
}

/// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`].
pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
pub fn spawn_local<T>(
&self,
future: impl Future<Output = T> + 'static + MaybeSend + MaybeSync,
) -> Task<T>
where
T: 'static,
T: 'static + MaybeSend + MaybeSync,
{
self.spawn(future)
}
Expand All @@ -202,9 +238,13 @@ impl TaskPool {
/// ```
pub fn with_local_executor<F, R>(&self, f: F) -> R
where
F: FnOnce(&async_executor::LocalExecutor) -> R,
F: FnOnce(&LocalExecutor) -> R,
{
LOCAL_EXECUTOR.with(f)
#[cfg(feature = "std")]
return LOCAL_EXECUTOR.with(f);

#[cfg(not(feature = "std"))]
return f(&LOCAL_EXECUTOR);
}
}

Expand All @@ -213,9 +253,9 @@ impl TaskPool {
/// For more information, see [`TaskPool::scope`].
#[derive(Debug)]
pub struct Scope<'scope, 'env: 'scope, T> {
executor: &'scope async_executor::LocalExecutor<'scope>,
executor: &'scope LocalExecutor<'scope>,
// Vector to gather results of all futures spawned during scope run
results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>>,
results: &'env RefCell<Vec<ScopeResult<T>>>,

// make `Scope` invariant over 'scope and 'env
scope: PhantomData<&'scope mut &'scope ()>,
Expand All @@ -230,7 +270,7 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
/// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn<Fut: Future<Output = T> + 'scope>(&self, f: Fut) {
pub fn spawn<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
self.spawn_on_scope(f);
}

Expand All @@ -241,7 +281,7 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
/// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn_on_external<Fut: Future<Output = T> + 'scope>(&self, f: Fut) {
pub fn spawn_on_external<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
self.spawn_on_scope(f);
}

Expand All @@ -250,13 +290,41 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
/// returned as a part of [`TaskPool::scope`]'s return value.
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope>(&self, f: Fut) {
let result = Rc::new(RefCell::new(None));
pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
let result = ScopeResult::<T>::default();
self.results.borrow_mut().push(result.clone());
let f = async move {
let temp_result = f.await;

#[cfg(feature = "std")]
result.borrow_mut().replace(temp_result);

#[cfg(not(feature = "std"))]
{
let mut lock = result.lock();
*lock = Some(temp_result);
}
};
self.executor.spawn(f).detach();
}
}

#[cfg(feature = "std")]
mod send_sync_bounds {
pub trait MaybeSend {}
impl<T> MaybeSend for T {}

pub trait MaybeSync {}
impl<T> MaybeSync for T {}
}

#[cfg(not(feature = "std"))]
mod send_sync_bounds {
pub trait MaybeSend: Send {}
impl<T: Send> MaybeSend for T {}

pub trait MaybeSync: Sync {}
impl<T: Sync> MaybeSync for T {}
}

use send_sync_bounds::{MaybeSend, MaybeSync};
1 change: 1 addition & 0 deletions crates/bevy_tasks/src/slice.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::TaskPool;
use alloc::vec::Vec;

/// Provides functions for mapping read-only slices across a provided [`TaskPool`].
pub trait ParallelSlice<T: Sync>: AsRef<[T]> {
Expand Down
4 changes: 2 additions & 2 deletions crates/bevy_tasks/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ use core::{
/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic.
#[derive(Debug)]
#[must_use = "Tasks are canceled when dropped, use `.detach()` to run them in the background."]
pub struct Task<T>(async_executor::Task<T>);
pub struct Task<T>(crate::executor::Task<T>);

impl<T> Task<T> {
/// Creates a new task from a given `async_executor::Task`
pub fn new(task: async_executor::Task<T>) -> Self {
pub fn new(task: crate::executor::Task<T>) -> Self {
Self(task)
}

Expand Down
Loading
Loading