Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider mixing Tokio & Rayon #25

Closed
JackKelly opened this issue Jan 18, 2024 · 6 comments
Closed

Consider mixing Tokio & Rayon #25

JackKelly opened this issue Jan 18, 2024 · 6 comments
Assignees

Comments

@JackKelly
Copy link
Owner

JackKelly commented Jan 18, 2024

This idea is very early! Right now, I only have a very fuzzy grasp of what I'm trying to achieve, and a fuzzy grasp of how that might be implemented! I'll use this issue to collect links and ideas.

Background and context

At first glance, Tokio should be used for async IO (like building a web server). And Rayon should be used for running CPU-intensive tasks in parallel. The issue is that light-speed-io wants to do both those tasks: loading data from huge numbers of files using non-blocking IO (using io_uring on Linux), and also doing lots of CPU-intensive processing on those files (like decompressing in parallel).

The use-case that motivates me to think again about using Tokio with Rayon

Jacob described a use-case, which I summarised in #24. The basic idea is: say we have millions of small files, and we want to combine these files to create a "re-chunked" version of this dataset on disk. In Jacob's example, there are 146,000 GRIB2 files per NWP init time, and all these files need to be saved into a handful of Zarr chunks. Each GRIB2 file has to be decompressed; and then the decompressed files have to be combined and sliced up again, and then those slices are compressed and saved back to disk.

The broad shape of a possible solution

  • change LSIO's API so that users can group IO operations together. For the GRIB2 example, users would say to LSIO: "Group1 consists of the 146,000 GRIB files that must be combined into a handful of Zarr chunks. Group2 consists of the next 146,000 GRIB files.".
  • Users can optionally provide a map function to be applied (in parallel) to each GRIB2 file (to decompress it)
  • Users provide a reduce_group function which receives all the GRIB buffers in that group. This outputs a vector of buffers (and paths and byte_ranges).
  • Users can provide another map function that'll be applied in parallel to these buffers (to compress them).
  • The compressed buffers are written to storage. The output storage system might be different to the input storage system. For example, we might be reading GRIB files from a cloud storage bucket, and writing Zarr chunks to a local SSD.

How Rust async might help

Hopefully users could write code like:

UPDATE: I need to learn more about Rust's Streams (async iterators).

// LSIO will create a thread which owns an io_uring instance, and keeps the SQ topped up.
// LSIO will read group_1 before starting to read group_2.
// `read_groups` will return an Stream of Streams (!).
let groups =  reader.read_groups([group_1_filenames, group_2_filenames]);

for group in groups {
    // Concurrently decompress items in group
    let handles = Vec::with_capacity(group.len());
    for item in group {
        let handle = tokio::spawn(async move {
            item.buffer = decompress_async(item.buffer).await;
            item
        };
        handles.push(handle);
    }

    // Wait for all buffers to be decompressed.
    // (I'm not sure if this is valid Rust!)
    let items = handles.iter().map(|handle| handle.await.unwrap()).collect();

    // Reduce all items in this group:
    let combined_items = combine_async(items).await;

    for item in combined_items {
        tokio::spawn(async move {
            let item.buffer = compress_async(item.buffer).await;
            // TODO: Use Sink to write data:
            writer.submit_write(item).await;
        };
     }
}

// Adapted from https://ryhl.io/blog/async-what-is-blocking/#the-rayon-crate
async fn decompress_async(buffer: [u8]) -> [u8] {
    let (send, recv) = tokio::sync::oneshot::channel();

    // Spawn a task on rayon.
    rayon::spawn(move || {
        // Perform an expensive computation.
        let decompressed_buffer = decompress(buffer)
        // Send the result back to Tokio.
        let _ = send.send(decompressed_buffer);
    });

    // Wait for the rayon task.
    recv.await.expect("Panic in rayon::spawn")
}

Links

Tutorials / blog posts

  • Alice Ryhl's December 2020 blog post "Async: What is blocking?". Gives a great example of how to use Rayon with Tokio. I shouldn't use Tokio's spawn_blocking. spawn_blocking is best suited for wrapping blocking IO, not for CPU-intensive tasks. Instead, use rayon::spawn with tokio::sync::oneshot (see Alice's blog for more details).

Rust crates & PRs

  • Rayon draft PR: [WIP] Add the ability to spawn futures. Very relevant discussion. But the discussion largely halted in 2020 (with the PR still in "draft").
  • Rayon issue: "Using [Rayon's] ThreadPool for blocking I/O?". Conclusion: Don't wait on blocking IO in a Rayon task, because that blocks that thread from participating in other Rayon tasks.
  • tokio-rayon: "Mix async code with CPU-heavy thread pools using Tokio + Rayon". Last release was in 2021.
  • The futures crate. Contains Streams (which LSIO could use as the source of data. Streams are async iterators.), and Sinks (for writing data).
  • async_stream "Provides two macros, stream! and try_stream!, allowing the caller to define asynchronous streams of elements. These are implemented using async & await notation." Allows us to define streams using a very similar approach to Python, using yield. Also allows us to write for await value in input to implement one stream from another stream.
@JackKelly JackKelly self-assigned this Jan 18, 2024
@JackKelly JackKelly moved this to In Progress in light-speed-io Jan 18, 2024
@JackKelly
Copy link
Owner Author

JackKelly commented Jan 19, 2024

Why use tokio? Why not just use Rayon?

Maybe the "dream" with tokio is that users can avoid writing all the code above, and, by using Streams & Sinks, users can instead write code like:

// `read` returns a `Stream` of `Stream`s(!). In other words, `read` returns
// a `Stream` of groups. Each group is a `Stream` of buffers.
// Groups are completed in order:
// LSIO will complete all operations in group n before starting the ops in group n+1.
// But the operations _within_ a group complete in arbitrary order.
// This allows users to specify, for example, one group of input files will be
// collated into a single group of output files.
// UPDATE: Instead of `read` returning a Stream of Streams, I think it may be
// simpler to return a single Stream, where each Item has a `group_id`. group_id=0
// will be completed before any operations in group_id=1 are started.
read(file_chunks)
     // Iterate over each *group*
    .zip(sinks)
    .for_each(|buffers, sink| async move { 
        // Iterate over each *buffer* within this group.

        // Decompress individual chunks in parallel. `decompress` submits tasks to Rayon's threadpool.
        // QUESTION: Should we use `Stream::then` instead of `map`?
        let decomp_handles: Vec = buffers.map(|buf| tokio::spawn(async {decompress(buf)}).collect();

        // Wait for all decompression tasks to complete:
        let uncompressed_input_bufs = decomp_handles.map(|handle| handle.await.unwrap());

        // Collate (or "reduce") the uncompressed buffers, to produce an iterator of
        // uncompressed output buffers:
        let uncompressed_output_bufs = collate(uncompressed_input_bufs);

        // Compress output buffers (in parallel), and submit to the sink:
        uncompresed_output_bufs.for_each(|buf| tokio::spawn(async {
            let compressed_buf = compress(buf);
            sink.feed(compressed_buf).await;  // feed(item).await returns when Item is _submitted_,
                                              // it doesn't wait for Item to be flushed.
        });
    });

// Wait for all writing to complete:
sinks.for_each(|sink| async move {sink.close().await});

Either way, I should continue learning about tokio, Streams and Sinks.

Some reasons for needing Tokio:

  • Rayon doesn't (yet) have a concept of a Future. So, if LSIO only uses Rayon, then LSIO's API will be synchronous. The only way for users to overlap CPU processing with IO will be for LSIO to orchestrate processing on each chunk, on behalf of the user. The user would supply a processing function. And LSIO would use Rayon to parallelise that processing. But this makes LSIO's API complex, and - crucially - not very flexible.
  • I think that, if we use tokio with Rayon, then we can substantially simplify LSIO's API:
    • When reading, LSIO will only return a Stream of data.
    • LSIO won't orchestrate any CPU processing.
    • Instead, users will be responsible for specifying the processing
    • But, because we're using async, the CPU processing and the IO can easily be overlapped.
    • LSIO would still have a lot to do. LSIO would:
      • provide a unified API for different storage backends
      • The public API would be very simple. It'd basically consist of two methods:
        • read(chunks) -> Stream<Chunk>
        • write(path) -> Sink<Chunk>
      • LSIO's API would be entirely focused on large batches of IO operations. These IO ops can be grouped, and LSIO will run the groups in order.
      • LSIO will optimise the schedule of IO operations. But the Stream returned to the user will contain buffers which correspond exactly to the users' requests. Even though, under the hood, LSIO might have merged or split IO operations.
      • LSIO will, optionally, take responsibility for allocating buffers for IO. And re-using buffers (to minimise heap allocation).
      • Provide an async Python API for reading raw data. (See PyO3's async docs)
      • A subsequent crate ("light-speed-codecs"??) would make it easy for Python users to compress / decompress chunks.
      • Then I'd write a Zarr crate, which would use light-speed-codecs and light-speed-io.
      • And write a kind of Rust kerchunk, which can also convert between formats by overlapping the reads, processing, and writing. Reading and writing can be to/from different storage subsystems.

@JackKelly
Copy link
Owner Author

JackKelly commented Jan 19, 2024

(@jacobbieker the conversation we had on Wednesday morning about needing to handle use-cases like your ICON GRIB-to-Zarr use-case has pushed me to completely reconsider the API and internal design for light-speed-io! Which is great! I'm optimistic that this new API will be much more flexible... as well as being much simpler! Thank you! Your timing was perfect... if we'd waited another week, and I'd started writing lots of Rust code, then it would have been harder to change the design so much!)

@jacobbieker
Copy link

Glad it helped! Very excited to see how this progresses!

@JackKelly
Copy link
Owner Author

JackKelly commented Jan 22, 2024

Some relevant links:

@JackKelly
Copy link
Owner Author

Oh, wait, AsyncIterator already has a from_iter method! https://docs.rs/async-iterator/latest/async_iterator/trait.FromIterator.html

Although I need to think how to keep the submission queue topped up, within the async iterator.

JackKelly added a commit that referenced this issue Jan 23, 2024
@JackKelly
Copy link
Owner Author

My plan is now for LSIO to "just" focus on async IO. So LSIO won't contain any Rayon code. And to have a separate crate for processing Streams in parallel (#26) using Rayon.

See https://github.com/JackKelly/light-speed-io/milestone/1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

No branches or pull requests

2 participants