Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DESIGN DISCUSSION: How to execute a DAG of IO and compute tasks? 2 Tokio threadpools? Or Tokio + Rayon? #10

Open
JackKelly opened this issue Sep 2, 2024 · 6 comments

Comments

@JackKelly
Copy link
Owner

JackKelly commented Sep 2, 2024

Andrew Lamb at InfluxData wrote a blog post (in 2022) making compelling arguments for scheduling CPU-bound tasks using Tokio. The essential "trick" is to use two Tokio threadpools: one for IO, and another for CPU-bound tasks (so that CPU-bound tasks don't block IO tasks).

For hypergrib, it might be nice to be able to use Rust's async API to naturally express a directed acyclic graph (DAG) of tasks. For example:

graph TD
    L1[Load GRIB message 1] --> D1[Decode msg 1] --> M[Merge into final array]
    L2[Load GRIB message 2] --> D2[Decode msg 2] --> M[Merge into final array]
Loading

Andrew Lamb's blog post suggests using two Tokio threadpools. Andrew's implementation involves ~750 lines of custom Rust code (including tests).

If we really wanted to avoid using Rayon (and use two Tokio threadpools) then I think we could do it by "just" creating two Tokio threadpools. Something like:

use tokio::runtime::Runtime;

// Create the runtime
let cpu_runtime  = Runtime::new().unwrap();

// Execute the future, blocking the current thread until completion
cpu_handle = cpu_runtime.spawn(cpu_main);

let io_runtime = Runtime::new().unwrap();
io_handle = io_runtime.spawn(io_main);

cpu_handle.await??;
io_handle.await??;

(Although I'm really not sure if that'll work! And I'm not sure how to pass Futures between the two runtimes?)

On ballance, I think I prefer Alice Ryhl's recommendation of using Tokio with Rayon, and using a tokio::sync::oneshot::channel to pass things between Tokio and Rayon. I'm 99% sure this'll still allow us to construct a DAG of tasks. And feels like it'll result in less code in hypergrib. And, crucially, we may have tasks that run a long time (seconds?), but Andrew Lamb suggests that, even when using two tokio threadpools, tasks in the CPU threadpool still shouldn't block for more than something like 100ms. But it does add a pretty heavyweight dependency (Rayon).

Further reading

@JackKelly
Copy link
Owner Author

Actually, on second thoughts, maybe it's easy to just have a struct which holds the two Runtime instances, and we pass around that struct, and call struct.cpu.spawn. Or we don't pass the struct around and just have a single function which sends tasks to the Runtimes.

Although it's possible that Tokio doesn't allow multiple Runtimes to be installed on the same thread, hence the complexity in Andrew Lamb's solution.

@JackKelly
Copy link
Owner Author

This post seems to suggest that it's fine to start multiple Runtimes from the same thread: https://matthewtejo.substack.com/p/building-robust-server-with-async

@JackKelly
Copy link
Owner Author

There's still the concern that some of our tasks might take a long time (multiple seconds). I should write a little test to check the tokio scheduler can still run. For example, start a runtime with 2 worker threads. Submit one 1-sec task and ten 100ms tasks, and check that Tokio can schedule the 100ms tasks on the second thread. The whole thing should execute in 1 sec.

@JackKelly
Copy link
Owner Author

I've written a little test, where I submit some 1s tasks, and some 100ms tasks to the same tokio scheduler and it "does the right thing"! Code here: https://github.com/JackKelly/learning_rust/blob/main/tokio_long_tasks/src/main.rs

@JackKelly
Copy link
Owner Author

OK! I think I've got it working nicely with two runtimes, and without much additional complexity.

Here's the code: https://github.com/JackKelly/learning_rust/blob/e06d97a409bfe6afea4e1a5ce5409a78ba7128c0/tokio_two_runtimes/src/main.rs

And here's an example output:

jack@jack-NUC:~/dev/rust/learning_rust/tokio_two_runtimes$ time cargo run --release
   Compiling tokio_two_runtimes v0.1.0 (/home/jack/dev/rust/learning_rust/tokio_two_runtimes)
    Finished `release` profile [optimized] target(s) in 0.94s
     Running `target/release/tokio_two_runtimes`

ThreadId(1) RuntimeId(1) Starting main_rt!
tokio::time::sleep on main_rt
Finished tokio_time::sleep on main_rt
ThreadId(3) RuntimeId(1) Spawning tasks on main_rt!
ThreadId(3) RuntimeId(1) Ending tasks on main_rt!
ThreadId(2) RuntimeId(1) ** Starting 100ms task 3 ^^!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 9 ^^!
ThreadId(5) RuntimeId(2) Starting tasks on cpu_rt_handle!
ThreadId(5) RuntimeId(2) Ending tasks on cpu_rt_handle!
ThreadId(5) RuntimeId(2) ** Starting 1s task 1 ^^!
ThreadId(4) RuntimeId(2) ** Starting 1s task 0 ^^!
ThreadId(1) RuntimeId(1) Ending main_rt!
ThreadId(2) RuntimeId(1) ** Finished 100ms task 3 ##!
ThreadId(2) RuntimeId(1) ** Starting 1s task 0 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 9 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 4 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 4 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 5 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 5 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 6 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 6 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 7 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 7 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 8 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 8 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 1 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 1 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 0 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 0 ##!
ThreadId(3) RuntimeId(1) ** Starting 100ms task 2 ^^!
ThreadId(3) RuntimeId(1) ** Finished 100ms task 2 ##!
ThreadId(5) RuntimeId(2) ** Finished 1s task 1 ##!
ThreadId(4) RuntimeId(2) ** Finished 1s task 0 ##!
ThreadId(2) RuntimeId(1) ** Finished 1s task 0 ##!

real    0m3.093s
user    0m3.151s
sys     0m0.228s

@JackKelly
Copy link
Owner Author

JackKelly commented Sep 4, 2024

Conclusion

Let's not use Rayon. Instead, let's use two Tokio runtimes (using the method outlined in the comment immediately above).

This has several advantages:

  • Don't have to depend on Rayon. (Rayon is great! But it's large, and so will increase compile time)
  • Only need to use a single programming model (async/await). (Instead of also using Rayon's programming model)
  • This should make it a little easier to define and run DAGs which include IO and computation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant