Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add the ability to spawn futures #679

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions rayon-core/src/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#![allow(missing_debug_implementations)]
#![allow(missing_docs)]

use crate::job::JobResult;
use crate::unwind;
use crate::ThreadPool;
use crate::{spawn, spawn_fifo};
use crate::{Scope, ScopeFifo};

use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};

pub struct AsyncSpawn<T> {
state: Arc<Mutex<State<T>>>,
}

struct AsyncSpawnJob<T> {
state: Arc<Mutex<State<T>>>,
}

struct State<T> {
result: JobResult<T>,
waker: Option<Waker>,
}

fn new<T>() -> (AsyncSpawn<T>, AsyncSpawnJob<T>) {
let state = Arc::new(Mutex::new(State {
result: JobResult::None,
waker: None,
}));
(
AsyncSpawn {
state: state.clone(),
},
AsyncSpawnJob { state },
)
}

impl<T> Future for AsyncSpawn<T> {
type Output = T;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut guard = self.state.lock().expect("rayon future lock");
match mem::replace(&mut guard.result, JobResult::None) {
JobResult::None => {
guard.waker = Some(cx.waker().clone());
Poll::Pending
}
JobResult::Ok(x) => Poll::Ready(x),
JobResult::Panic(p) => {
drop(guard); // don't poison the lock
unwind::resume_unwinding(p);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is usually how we propagate panics, but maybe the future should yield Result<T, ...> instead?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be best to let Rayon's panic_handler handle the actual panic, but also panic here with something like panic!("the spawned task has panicked") rather than resuming with the original one.

If one were to retrieve the result of a spawned task without using futures, they would probably create a channel and send the result through it. Then, if the task panics, the sender side of the channel gets dropped, thus disconnecting it. If one attempts to receive the result from the receiver side of the channel, the receiver.recv() call panics because the channel is disconnected.

So that way spawn_future would closely match the behavior of spawn + using a channel to retrieve the result.

Over the past few months I did a lot of exploration with panic propagation strategies in asynchronous contexts and talked to people about their use cases. In the end, I figured the generally best way of handling panics is to pass them to the panic handler and raise a new panic whenever the result of a failed task is polled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be OK with that too. I was thinking of Result as an analogy from std::thread::spawn + JoinHandle::join to rayon::spawn_future + Future::poll.

}
}
}
}

impl<T> AsyncSpawnJob<T> {
fn execute(self, func: impl FnOnce() -> T) {
let result = unwind::halt_unwinding(func);
let mut guard = self.state.lock().expect("rayon future lock");
guard.result = match result {
Ok(x) => JobResult::Ok(x),
Err(p) => JobResult::Panic(p),
};
if let Some(waker) = guard.waker.take() {
waker.wake();
}
}
}

pub fn async_spawn<F, T>(func: F) -> AsyncSpawn<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (future, job) = new();
spawn(move || job.execute(func));
future
}

pub fn async_spawn_fifo<F, T>(func: F) -> AsyncSpawn<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (future, job) = new();
spawn_fifo(move || job.execute(func));
future
}

impl ThreadPool {
pub fn async_spawn<F, T>(&self, func: F) -> AsyncSpawn<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (future, job) = new();
self.spawn(move || job.execute(func));
future
}

pub fn async_spawn_fifo<F, T>(&self, func: F) -> AsyncSpawn<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (future, job) = new();
self.spawn_fifo(move || job.execute(func));
future
}
}

impl<'scope> Scope<'scope> {
pub fn async_spawn<F, T>(&self, func: F) -> AsyncSpawn<T>
where
F: FnOnce(&Self) -> T + Send + 'scope,
T: Send + 'scope,
{
let (future, job) = new();
self.spawn(|scope| job.execute(move || func(scope)));
future
}
}

impl<'scope> ScopeFifo<'scope> {
pub fn async_spawn_fifo<F, T>(&self, func: F) -> AsyncSpawn<T>
where
F: FnOnce(&Self) -> T + Send + 'scope,
T: Send + 'scope,
{
let (future, job) = new();
self.spawn_fifo(|scope| job.execute(move || func(scope)));
future
}
}
2 changes: 2 additions & 0 deletions rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ mod log;
mod private;

mod broadcast;
mod future;
mod job;
mod join;
mod latch;
Expand All @@ -94,6 +95,7 @@ mod compile_fail;
mod test;

pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext};
pub use self::future::{async_spawn, async_spawn_fifo};
pub use self::join::{join, join_context};
pub use self::registry::ThreadBuilder;
pub use self::scope::{in_place_scope, scope, Scope};
Expand Down