Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use rayon as the bevy_task thread pool #11995

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions crates/bevy_asset/src/processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl AssetProcessor {
let start_time = std::time::Instant::now();
debug!("Processing Assets");
IoTaskPool::get().scope(|scope| {
scope.spawn(async move {
scope.spawn_async(async move {
self.initialize().await.unwrap();
for source in self.sources().iter_processed() {
self.process_assets_internal(scope, source, PathBuf::from(""))
Expand Down Expand Up @@ -316,7 +316,7 @@ impl AssetProcessor {
error!("AddFolder event cannot be handled in single threaded mode (or WASM) yet.");
#[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))]
IoTaskPool::get().scope(|scope| {
scope.spawn(async move {
scope.spawn_async(async move {
self.process_assets_internal(scope, source, path)
.await
.unwrap();
Expand Down Expand Up @@ -445,7 +445,7 @@ impl AssetProcessor {
} else {
// Files without extensions are skipped
let processor = self.clone();
scope.spawn(async move {
scope.spawn_async(async move {
processor.process_asset(source, path).await;
});
}
Expand All @@ -461,7 +461,7 @@ impl AssetProcessor {
for path in check_reprocess_queue.drain(..) {
let processor = self.clone();
let source = self.get_source(path.source()).unwrap();
scope.spawn(async move {
scope.spawn_async(async move {
processor.process_asset(source, path.into()).await;
});
}
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_asset/src/server/loaders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl AssetLoaders {
MaybeAssetLoader::Ready(_) => unreachable!(),
MaybeAssetLoader::Pending { sender, .. } => {
IoTaskPool::get()
.spawn(async move {
.spawn_async(async move {
let _ = sender.broadcast(loader).await;
})
.detach();
Expand Down
8 changes: 4 additions & 4 deletions crates/bevy_asset/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl AssetServer {
let owned_handle = Some(handle.clone().untyped());
let server = self.clone();
IoTaskPool::get()
.spawn(async move {
.spawn_async(async move {
if let Err(err) = server.load_internal(owned_handle, path, false, None).await {
error!("{}", err);
}
Expand Down Expand Up @@ -367,7 +367,7 @@ impl AssetServer {

let server = self.clone();
IoTaskPool::get()
.spawn(async move {
.spawn_async(async move {
let path_clone = path.clone();
match server.load_untyped_async(path).await {
Ok(handle) => server.send_asset_event(InternalAssetEvent::Loaded {
Expand Down Expand Up @@ -552,7 +552,7 @@ impl AssetServer {
let server = self.clone();
let path = path.into().into_owned();
IoTaskPool::get()
.spawn(async move {
.spawn_async(async move {
let mut reloaded = false;

let requests = server
Expand Down Expand Up @@ -691,7 +691,7 @@ impl AssetServer {
let path = path.into_owned();
let server = self.clone();
IoTaskPool::get()
.spawn(async move {
.spawn_async(async move {
let Ok(source) = server.get_source(path.source()) else {
error!(
"Failed to load {path}. AssetSource {:?} does not exist",
Expand Down
6 changes: 3 additions & 3 deletions crates/bevy_ecs/src/query/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ impl<D: QueryData, F: QueryFilter> QueryState<D, F> {
) {
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
// QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual
bevy_tasks::ComputeTaskPool::get().scope(|scope| {
let _: Vec<()> = bevy_tasks::ComputeTaskPool::get().scope(|scope| {
if D::IS_DENSE && F::IS_DENSE {
// SAFETY: We only access table data that has been registered in `self.archetype_component_access`.
let tables = &world.storages().tables;
Expand All @@ -1145,7 +1145,7 @@ impl<D: QueryData, F: QueryFilter> QueryState<D, F> {
while offset < table.entity_count() {
let mut func = func.clone();
let len = batch_size.min(table.entity_count() - offset);
scope.spawn(async move {
scope.spawn(move || {
#[cfg(feature = "trace")]
let _span = self.par_iter_span.enter();
let table = &world
Expand All @@ -1172,7 +1172,7 @@ impl<D: QueryData, F: QueryFilter> QueryState<D, F> {
while offset < archetype.len() {
let mut func = func.clone();
let len = batch_size.min(archetype.len() - offset);
scope.spawn(async move {
scope.spawn(move || {
#[cfg(feature = "trace")]
let _span = self.par_iter_span.enter();
let archetype =
Expand Down
19 changes: 10 additions & 9 deletions crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl SystemExecutor for MultiThreadedExecutor {
// and thread wake-up. Only spawn the task if the executor does not immediately
// terminate.
if block_on(poll_once(&mut executor)).is_none() {
scope.spawn(executor);
scope.spawn_async(executor);
}
},
);
Expand Down Expand Up @@ -525,7 +525,7 @@ impl MultiThreadedExecutor {
let system = unsafe { &mut *systems[system_index].get() };
let sender = self.sender.clone();
let panic_payload = self.panic_payload.clone();
let task = async move {
let mut task = move || {
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
// SAFETY:
// - The caller ensures that we have permission to
Expand All @@ -550,20 +550,21 @@ impl MultiThreadedExecutor {
}
};

#[cfg(feature = "trace")]
let task = task.instrument(
self.system_task_metadata[system_index]
.system_task_span
.clone(),
);

let system_meta = &self.system_task_metadata[system_index];
self.active_access
.extend(&system_meta.archetype_component_access);

if system_meta.is_send {
scope.spawn(task);
} else {
let task = async move { task() };
#[cfg(feature = "trace")]
let task = task.instrument(
self.system_task_metadata[system_index]
.system_task_span
.clone(),
);

self.local_thread_running = true;
scope.spawn_on_external(task);
}
Expand Down
1 change: 1 addition & 0 deletions crates/bevy_ecs/src/schedule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,7 @@ mod tests {

/// verify the [`SimpleExecutor`] supports stepping
#[test]
#[allow(deprecated)]
fn simple_executor() {
assert_executor_supports_stepping!(ExecutorKind::Simple);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_gltf/src/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ async fn load_gltf<'a, 'b, 'c>(
let parent_path = load_context.path().parent().unwrap();
let linear_textures = &linear_textures;
let buffer_data = &buffer_data;
scope.spawn(async move {
scope.spawn_async(async move {
load_image(
gltf_texture,
buffer_data,
Expand Down
4 changes: 2 additions & 2 deletions crates/bevy_render/src/pipelined_rendering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl Plugin for PipelinedRenderingPlugin {
// run a scope here to allow main world to use this thread while it's waiting for the render app
let sent_app = compute_task_pool
.scope(|s| {
s.spawn(async { app_to_render_receiver.recv().await });
s.spawn_async(async { app_to_render_receiver.recv().await });
})
.pop();
let Some(Ok(mut render_app)) = sent_app else {
Expand Down Expand Up @@ -182,7 +182,7 @@ fn update_rendering(app_world: &mut World, _sub_app: &mut App) {
// while we wait for the render world to be received.
let mut render_app = ComputeTaskPool::get()
.scope_with_executor(true, Some(&*main_thread_executor.0), |s| {
s.spawn(async { render_channels.recv().await });
s.spawn_async(async { render_channels.recv().await });
})
.pop()
.unwrap();
Expand Down
4 changes: 3 additions & 1 deletion crates/bevy_render/src/render_resource/pipeline_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,9 @@ fn create_pipeline_task(
sync: bool,
) -> CachedPipelineState {
if !sync {
return CachedPipelineState::Creating(bevy_tasks::AsyncComputeTaskPool::get().spawn(task));
return CachedPipelineState::Creating(
bevy_tasks::AsyncComputeTaskPool::get().spawn_async(task),
);
}

match futures_lite::future::block_on(task) {
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_render/src/renderer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ impl<'w> RenderContext<'w> {
command_buffers
.push((i, command_buffer_generation_task(render_device)));
} else {
task_pool.spawn(async move {
task_pool.spawn_async(async move {
(i, command_buffer_generation_task(render_device))
});
}
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_render/src/view/window/screenshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ pub(crate) fn collect_screenshots(world: &mut World) {
));
};

AsyncComputeTaskPool::get().spawn(finish).detach();
AsyncComputeTaskPool::get().spawn_async(finish).detach();
}
}
}
3 changes: 2 additions & 1 deletion crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ multi-threaded = []
[dependencies]
futures-lite = "2.0.1"
async-executor = "1.7.2"
async-channel = "2.2.0"
async-io = { version = "2.0.0", optional = true }
async-task = "4.2.0"
concurrent-queue = "2.0.0"
rayon-core = "1.0"
parking = "2.2"

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4"
Expand Down
32 changes: 16 additions & 16 deletions crates/bevy_tasks/src/iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ where
fn count(mut self, pool: &TaskPool) -> usize {
pool.scope(|s| {
while let Some(batch) = self.next_batch() {
s.spawn(async move { batch.count() });
s.spawn_async(async move { batch.count() });
}
})
.iter()
Expand Down Expand Up @@ -108,7 +108,7 @@ where
pool.scope(|s| {
while let Some(batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move {
s.spawn_async(async move {
batch.for_each(newf);
});
}
Expand Down Expand Up @@ -197,7 +197,7 @@ where
{
pool.scope(|s| {
while let Some(batch) = self.next_batch() {
s.spawn(async move { batch.collect::<Vec<_>>() });
s.spawn_async(async move { batch.collect::<Vec<_>>() });
}
})
.into_iter()
Expand All @@ -219,7 +219,7 @@ where
pool.scope(|s| {
while let Some(batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move { batch.partition::<Vec<_>, F>(newf) });
s.spawn_async(async move { batch.partition::<Vec<_>, F>(newf) });
}
})
.into_iter()
Expand All @@ -246,7 +246,7 @@ where
while let Some(batch) = self.next_batch() {
let newf = f.clone();
let newi = init.clone();
s.spawn(async move { batch.fold(newi, newf) });
s.spawn_async(async move { batch.fold(newi, newf) });
}
})
}
Expand All @@ -263,7 +263,7 @@ where
pool.scope(|s| {
while let Some(mut batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move { batch.all(newf) });
s.spawn_async(async move { batch.all(newf) });
}
})
.into_iter()
Expand All @@ -282,7 +282,7 @@ where
pool.scope(|s| {
while let Some(mut batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move { batch.any(newf) });
s.spawn_async(async move { batch.any(newf) });
}
})
.into_iter()
Expand All @@ -302,7 +302,7 @@ where
let poses = pool.scope(|s| {
while let Some(batch) = self.next_batch() {
let mut newf = f.clone();
s.spawn(async move {
s.spawn_async(async move {
let mut len = 0;
let mut pos = None;
for item in batch {
Expand Down Expand Up @@ -334,7 +334,7 @@ where
{
pool.scope(|s| {
while let Some(batch) = self.next_batch() {
s.spawn(async move { batch.max() });
s.spawn_async(async move { batch.max() });
}
})
.into_iter()
Expand All @@ -351,7 +351,7 @@ where
{
pool.scope(|s| {
while let Some(batch) = self.next_batch() {
s.spawn(async move { batch.min() });
s.spawn_async(async move { batch.min() });
}
})
.into_iter()
Expand All @@ -371,7 +371,7 @@ where
pool.scope(|s| {
while let Some(batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move { batch.max_by_key(newf) });
s.spawn_async(async move { batch.max_by_key(newf) });
}
})
.into_iter()
Expand All @@ -391,7 +391,7 @@ where
pool.scope(|s| {
while let Some(batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move { batch.max_by(newf) });
s.spawn_async(async move { batch.max_by(newf) });
}
})
.into_iter()
Expand All @@ -411,7 +411,7 @@ where
pool.scope(|s| {
while let Some(batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move { batch.min_by_key(newf) });
s.spawn_async(async move { batch.min_by_key(newf) });
}
})
.into_iter()
Expand All @@ -431,7 +431,7 @@ where
pool.scope(|s| {
while let Some(batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move { batch.min_by(newf) });
s.spawn_async(async move { batch.min_by(newf) });
}
})
.into_iter()
Expand Down Expand Up @@ -484,7 +484,7 @@ where
{
pool.scope(|s| {
while let Some(batch) = self.next_batch() {
s.spawn(async move { batch.sum() });
s.spawn_async(async move { batch.sum() });
}
})
.into_iter()
Expand All @@ -501,7 +501,7 @@ where
{
pool.scope(|s| {
while let Some(batch) = self.next_batch() {
s.spawn(async move { batch.product() });
s.spawn_async(async move { batch.product() });
}
})
.into_iter()
Expand Down
Loading
Loading