diff --git a/src/SUMMARY.md b/src/SUMMARY.md index 0aacd144..7e320591 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -14,7 +14,7 @@ - [Async and await](part-guide/async-await.md) - [More async/await topics](part-guide/more-async-await.md) - [IO and issues with blocking](part-guide/io.md) -- [Concurrency primitives](part-guide/concurrency-primitives.md) +- [Composing futures concurrently](part-guide/concurrency-primitives.md) - [Channels, locking, and synchronization](part-guide/sync.md) - [Tools for async programming](part-guide/tools.md) - [Destruction and clean-up](part-guide/dtors.md) @@ -31,13 +31,13 @@ - [async in sync, sync in async]() - [Async IO: readiness vs completion, and io_uring]() - [Design patterns]() -- [Cancellation]() (cancellation safety) +- [Cancellation and cancellation safety](part-reference/cancellation.md) (cancellation safety) - [Starvation]() - [Pinning]() - [Async and FFI]() - [Comparing async programming in Rust to other languages]() - [The implementation of async/await in rustc]() -- [Structured concurrency?]() +- [Structured concurrency](part-reference/structured.md) # Old chapters diff --git a/src/navigation/index.md b/src/navigation/index.md index f6ac831b..13f0fd6d 100644 --- a/src/navigation/index.md +++ b/src/navigation/index.md @@ -19,8 +19,10 @@ - [Cancellation](../part-guide/more-async-await.md#cancellation) - [`CancellationToken`](../part-guide/more-async-await.md#cancellation) + - [In `select`](../part-guide/concurrency-primitives.md#race-select) - [Concurrency](../part-guide/concurrency.md) - [c.f., parallelism](../part-guide/concurrency.md#concurrency-and-parallelism) + - [Primitives (`join`, `select`, etc.)](../part-guide/concurrency-primitives.md) @@ -38,6 +40,7 @@ +- [`join`](../part-guide/concurrency-primitives.md#join) - [Joining tasks](../part-guide/async-await.md#joining-tasks) - [`JoinHandle`](../part-guide/async-await.md#joinhandle) - [`abort`](../part-guide/more-async-await.md#cancellation) @@ -55,12 +58,14 @@ +- [`race`](../part-guide/concurrency-primitives.md#race-select) - [Reactor](../part-guide/async-await.md#the-runtime) - [Runtimes](../part-guide/async-await.md#the-runtime) - [Scheduler](../part-guide/async-await.md#the-runtime) +- [`select`](../part-guide/concurrency-primitives.md#race-select) - [Spawning tasks](../part-guide/async-await.md#spawning-tasks) @@ -74,3 +79,4 @@ - Traits - [async](../part-guide/more-async-await.md#async-traits) - `Future` +[`try_join`](../part-guide/concurrency-primitives.md#join) diff --git a/src/navigation/topics.md b/src/navigation/topics.md index 181e6fe7..9c3c506b 100644 --- a/src/navigation/topics.md +++ b/src/navigation/topics.md @@ -4,17 +4,22 @@ - [Introduction](../part-guide/concurrency.md#concurrency-and-parallelism) - [Running async tasks in parallel using `spawn`](../part-guide/async-await.md#spawning-tasks) +- [Running futures concurrently using `join` and `select`](../part-guide/concurrency-primitives.md) + ## Correctness and safety - Cancellation - [Introduction](../part-guide/more-async-await.md#cancellation) + - [In `select` and `try_join`](../part-guide/concurrency-primitives.md) + ## Performance - Blocking - [Introduction](../part-guide/more-async-await.md#blocking-and-cancellation) + ## Testing - [Unit test syntax](../part-guide/more-async-await.md#unit-tests) diff --git a/src/part-guide/async-await.md b/src/part-guide/async-await.md index 96e1bc5a..e4478874 100644 --- a/src/part-guide/async-await.md +++ b/src/part-guide/async-await.md @@ -40,6 +40,11 @@ That's it! You're ready to write some asynchronous code! The `#[tokio::main]` annotation initializes the Tokio runtime and starts an async task for running the code in `main`. Later in this guide we'll explain in more detail what that annotation is doing and how to use async code without it (which will give you more flexibility). +### Futures-rs and the ecosystem + +TODO context and history, what futures-rs is for - was used a lot, probably don't need it now, overlap with Tokio and other runtimes (sometimes with subtle semantic differences), why you might need it (working with futures directly, esp writing your own, streams, some utils) + +Other ecosystem stuff - Yosh's crates, alt runtimes, experimental stuff, other? ### Futures and tasks @@ -197,4 +202,4 @@ If we immediately `await`ed the `JoinHandle` of the first `spawn` rather than sa We'll quickly look at `JoinHandle` in a little more depth. The fact that we can `await` a `JoinHandle` is a clue that a `JoinHandle` is itself a future. `spawn` is not an `async` function, it's a regular function that returns a future (`JoinHandle`). It does some work (to schedule the task) before returning the future (unlike an async future), which is why we don't *need* to `await` `spawn`. Awaiting a `JoinHandle` waits for the spawned task to complete and then returns the result. In the above example, there was no result, we just waited for the task to complete. `JoinHandle` is a generic type and it's type parameter is the type returned by the spawned task. In the above example, the type would be `JoinHandle<()>`, a future that results in a `String` would produce a `JoinHandle` with type `JoinHandle`. -`await`ing a `JoinHandle` returns a `Result` (which is why we used `let _ = ...` in the above example, it avoids a warning about an unused `Result`). If the spawned task completed successfully, then the task's result will be in the `Ok` variant. If the task panicked or was aborted (a form of cancellation, see [TODO]()), then the result will be an `Err` containing a [`JoinError` docs](https://docs.rs/tokio/latest/tokio/task/struct.JoinError.html). If you are not using cancellation via `abort` in your project, then `unwrapping` the result of `JoinHandle.await` is a reasonable approach, since that is effectively propagating a panic from the spawned task to the spawning task. +`await`ing a `JoinHandle` returns a `Result` (which is why we used `let _ = ...` in the above example, it avoids a warning about an unused `Result`). If the spawned task completed successfully, then the task's result will be in the `Ok` variant. If the task panicked or was aborted (a form of [cancellation](../part-reference/cancellation.md)), then the result will be an `Err` containing a [`JoinError` docs](https://docs.rs/tokio/latest/tokio/task/struct.JoinError.html). If you are not using cancellation via `abort` in your project, then `unwrapping` the result of `JoinHandle.await` is a reasonable approach, since that is effectively propagating a panic from the spawned task to the spawning task. diff --git a/src/part-guide/concurrency-primitives.md b/src/part-guide/concurrency-primitives.md index a698dc8c..a04b623e 100644 --- a/src/part-guide/concurrency-primitives.md +++ b/src/part-guide/concurrency-primitives.md @@ -1,28 +1,199 @@ -# Concurrency primitives +# Composing futures concurrently -- concurrent composition of futures - - c.f., sequential composition with await, composition of tasks with spawn - - concurrent/task behaviour - - behaviour on error -- streams as alternative, forward ref -- different versions in different runtimes/other crates - - focus on the Tokio versions +In this chapter we're going to cover more ways in which futures can be composed. In particular, some new ways in which futures can be executed concurrently (but not in parallel). Superficially, the new functions/macros we introduce in this chapter are pretty simple. However, the underlying concepts can be pretty subtle. We'll start with a recap on futures, concurrency, and parallelism, but you might also want to revisit the earlier section comparing [concurrency with parallelism](concurrency.md#concurrency-and-parallelism). -From [comment](https://github.com/rust-lang/async-book/pull/230#discussion_r1829351497): A framing I've started using is that tasks are not the async/await form of threads; it's more accurate to think of them as parallelizable futures. This framing does not match Tokio and async-std's current task design; but both also have trouble propagating cancellation. See parallel_future and tasks are the wrong abstraction for more. +A futures is a deferred computation. A future can be progressed by using `await`, which hands over control to the runtime, causing the current task to wait for the result of the computation. If `a` and `b` are futures, then they can be sequentially composed (that is, combined to make a future which executes `a` to completion and then `b` to completion) by `await`ing one then the other: `async { a.await; b.await}`. + +We have also seen parallel composition of futures using `spawn`: `async { let a = spawn(a); let b = spawn(b); (a.await, b.await)}` runs the two futures in parallel. Note that the `await`s in the tuple are not awaiting the futures themselves, but are awaiting `JoinHandle`s to get the results of the futures when they complete. + +In this chapter we introduce two ways to compose futures concurrently without parallelism: `join` and `select`/`race`. In both cases, the futures run concurrently by time-slicing; each of the composed futures takes turns to execute a little bit before the next gets a turn. This is done *without involving the async runtime* (and therefore without multiple OS threads and without any potential for parallelism). The composing construct interleaves the futures locally. You can think of these constructs being like mini-executors which execute their component futures within a single async task. + +The fundamental difference between join and select/race is how they handle futures completing their work: a join finishes when all futures finish, a select/race finishes when one future finishes (all the others are cancelled). There are also variations of both for handling errors. + +These constructs (or similar concepts) are often used with streams, we'll touch on this below, but we'll talk more about that in the [streams chapter](streams.md). + +If you want parallelism (or you don't explicitly not want parallelism), spawning tasks is often a simpler alternative to these composition constructs. Spawning tasks is usually less error-prone, more general, and performance is more predictable. On the other hand, spawning is inherently less [structured](../part-reference/structured.md), which can make lifecycle and resource management harder to reason about. + +It's worth considering the performance issue in a little more depth. The potential performance problem with concurrent composition is the fairness of time sharing. If you have 100 tasks in your program, then typically the optimal way to share resources is for each task to get 1% of the processor time (or if the tasks are all waiting, then for each to have the same chance of being woken up). If you spawn 100 tasks, then this is usually what happens (roughly). However, if you spawn two tasks and join 99 futures on one of those tasks, then the scheduler will only know about two tasks and one task will get 50% of the time and the 99 futures will each get 0.5%. + +Usually the distribution of tasks is not so biased, and very often we use join/select/etc. for things like timeouts where this behaviour is actually desirable. But it is worth considering to ensure that your program has the performance characteristics you want. ## Join -- Tokio/futures-rs join macro -- c.f., joining tasks -- join in futures-concurrency -- FuturesUnordered - - like a dynamic version of join - - forward ref to stream +Tokio's [`join` macro](https://docs.rs/tokio/latest/tokio/macro.join.html) takes a list of futures and runs them all to completion concurrently (returning all the results as a tuple). It returns when all the futures have completed. The futures are always executed on the same thread (concurrently and not in parallel). + +Here's a simple example: + +```rust,norun +async fn main() { + let (result_1, result_2) = join!(do_a_thing(), do_a_thing()); + // Use `result_1` and `result_2`. +} +``` + +Here, the two executions of `do_a_thing` happen concurrently, and the results are ready when they are both done. Notice that we don't `await` to get the results. `join!` implicitly awaits it's futures and produces a value. It does not create a future. You do still need to use it within an async context (e.g., from within an async function). + +Although you can't see it in the example above, `join!` takes expressions which evaluate to futures[^into]. `join` does not create an async context in it's body and you shouldn't `await` the futures passed to `join` (otherwise they'll be evaluated before the joined futures). + +Because all the futures are executed on the same thread, if any future blocks the thread, then none of them can make progress. If using a mutex or other lock, this can easily lead to deadlock if one future is waiting for a lock held by another future. + +[`join`] does not care about the result of the futures. In particular, if a future is cancelled or returns an error, it does not affect the others - they continue to execute. If you want 'fail fast' behaviour, use [`try_join`](https://docs.rs/tokio/latest/tokio/macro.try_join.html). `try_join` works similarly to `join`, however, if any future returns an `Err`, then all the other futures are cancelled and `try_join` returns the error immediately. + +Back in the earlier chapter on [async/await](async-await.md), we used the word 'join' to talk about joining spawned tasks. As the name suggests, joining futures and tasks is related: joining means we execute multiple futures concurrently and wait for the result before continuing. The syntax is different: using a `JoinHandle` vs the `join` macro, but the idea is similar. The key difference is that when joining tasks, the tasks execute concurrently and in parallel, whereas using `join!`, the futures execute concurrently but not in parallel. Furthermore, spawned tasks are scheduled on the runtime's scheduler, whereas with `join!` the futures are 'scheduled' locally (on the same task and within the temporal scope of the macro's execution). Another difference is that if a spawned task panics, the panic is caught by the runtime, but if a future in `join` panics, then the whole task panics. + + +### Alternatives + +Running futures concurrently and collecting their results is a common requirement. You should probably use `spawn` and `JoinHandle`s unless you have a good reason not to (i.e., you explicitly do not want parallelism, and even then you might prefer to use [`spawn_local`](https://docs.rs/tokio/latest/tokio/task/fn.spawn_local.html)). The [`JoinSet`](https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html) abstraction manages such spawned tasks in a way similar to `join!`. + +Most runtimes (and [futures.rs](https://docs.rs/futures/latest/futures/macro.join.html)) have an equivalent to Tokio's `join` macro and they mostly behave the same way. There are also `join` functions, which are similar to the macro but a little less flexible. E.g., futures.rs has [`join`](https://docs.rs/futures/latest/futures/future/fn.join.html) for joining two futures, [`join3`](https://docs.rs/futures/latest/futures/future/fn.join3.html), [`join4`](https://docs.rs/futures/latest/futures/future/fn.join4.html), and [`join5`](https://docs.rs/futures/latest/futures/future/fn.join5.html) for joining the obvious number of futures, and [join_all](https://docs.rs/futures/latest/futures/future/fn.join_all.html) for joining a collection of futures (as well as `try_` variations of each of these). + +[Futures-concurrency](https://docs.rs/futures-concurrency/latest) also provides functionality for join (and try_join). In the futures-concurrency style, these operations are trait methods on groups of futures such as tuples, `Vec`s, or arrays. E.g., to join two futures, you would write `(fut1, fut2).join().await` (note that `await` is explicit here). + +If the set of futures you wish to join together varies dynamically (e.g., new futures are created as input comes in over the network), or you want the results as they complete rather than when all the futures have completed, then you'll need to use streams and the [`FuturesUnordered`](https://docs.rs/futures/latest/futures/stream/struct.FuturesUnordered.html) or [`FuturesOrdered`](https://docs.rs/futures/latest/futures/stream/struct.FuturesOrdered.html) functionality. We'll cover these in the [streams](streams.md) chapter. + + +[^into]: The expressions must have a type which implements `IntoFuture`. The expression is evaluated and converted to a future by the macro. I.e., they don't actually have to evaluate to a future, but rather something which can be converted into a future, but this is a pretty minor distinction. The expressions themselves are evaluated sequentially before any of the resulting futures are executed. + ## Race/select -- Tokio select macro -- cancellation issues -- different behaviour of futures-rs version -- race in futures-concurrency +The counterpart to joining futures is racing them (aka selecting on them). With race/select the futures are executed concurrently, but rather than waiting for all the futures to complete, we only wait for the first one to complete and then cancel the others. Although this sounds similar to joining, it is significantly more interesting (and sometimes error-prone) because now we have to reason about cancellation. + +Here's an example using Tokio's [`select`](https://docs.rs/tokio/latest/tokio/macro.select.html) macro: + +```rust,norun +async fn main() { + select! { + result = do_a_thing() => { + println!("computation completed and returned {result}); + } + _ = timeout() => { + println!("computation timed-out"); + } + } +} +``` + +You'll notice things are already more interesting than with the `join` macro because we handle the results of the futures within the `select` macro. It looks a bit like a `match` expression, but with `select`, all branches are run concurrently and the body of the branch which finishes first is executed with its result (the other branches are not executed and the futures are cancelled by `drop`ping). In the example, `do_a_thing` and `timeout` execute concurrently and the first to complete will have it's block executed (i.e., only one `println` will run), the other future will be cancelled. As with the `join` macro, awaiting the futures is implicit. + +Tokio's `select` macro supports a bunch of features: + +- pattern matching: the syntax on the left of `=` on each branch can be a pattern and the block is only executed if the result of the future matches the pattern. If the pattern does not match, then the future is no longer polled (but other futures are). This can be useful for futures which optionally return a value, e.g., `Some(x) = do_a_thing() => { ... }`. +- `if` guards: each branch may have an `if` guard. When the `select` macro runs, after evaluating each expression to produce a future, the `if` guard is evaluated and the future is only polled if the guard is true. E.g., `x = = do_a_thing() if false => { ... }` will never be polled. Note that the `if` guard is not re-evaluated during polling, only when the macro is initialized. +- `else` branch: `select` can have an `else` branch `else => { ... }`, this is executed if all the futures have stopped and none of the blocks have been executed. If this happens without an `else` branch, then `select` will panic. + +The value of the `select!` macro is the value of the executed branch (just like `match`), so all branches must have the same type. E.g., if we wanted to use the result of the above example outside of the `select`, we'd write it like + +```rust,norun +async fn main() { + let result = select! { + result = do_a_thing() => { + Some(result) + } + _ = timeout() => { + None + } + }; + + // Use `result` +} +``` + +As with `join!`, `select!` does not treat `Result`s in any special way (other than the pattern matching mentioned previously) and if a branch completes with an error, then all other branches will be cancelled and the error will be used as the result of select (in the same way as if the branch has completed successfully). + +The `select` macro intrinsically uses cancellation, so if you're trying to avoid cancellation in your program, you must avoid `select!`. In fact, `select` is often the primary source of cancellation in an async program. As discussed [elsewhere](../part-reference/cancellation.md), cancellation has many subtle issues which can lead to bugs. In particular, note that `select` cancels futures by simply dropping them. This will not notify the future being dropped or trigger any cancellation tokens, etc. + +`select!` is often used in a loop to handle streams or other sequences of futures. This adds an extra layer of complexity and opportunities for bugs. In the simple case that we create a new, independent future on each iteration of the loop, things are not much more complicated. However, this is rarely what is needed. Generally we want to preserve some state between iterations. It is common to use `select` in a loop with streams, where each iteration of the loop handles one result from the stream. E.g.: + +```rust,norun +async fn main() { + let mut stream = ...; + + loop { + select! { + result = stream.next() => { + match result { + Some(x) => println!("received: {x}"), + None => break, + } + } + _ = timeout() => { + println!("time out!"); + break; + } + } + } +} +``` + +In this example, we read values from `stream` and print them until there are none left or waiting for a result times out. What happens to any remaining data in the stream in the timeout case depends on the implementation of the stream (it might be lost! Or duplicated!). This is an example of why behaviour in the face of cancellation can be important (and tricky). + +We may want to reuse a future, not just a stream, across iterations. For example, we may want to race against a timeout future where the timeout applies to all iterations rather than applying a new timeout for each iteration. This is possible by creating the future outside of the loop and referencing it: + +```rust,norun +async fn main() { + let mut stream = ...; + let mut timeout = timeout(); + + loop { + select! { + result = stream.next() => { + match result { + Some(x) => println!("received: {x}"), + None => break, + } + } + // Create a reference to `timeout` rather than moving it. + _ = &mut timeout => { + println!("time out!"); + break; + } + } + } +} +``` + +There are a couple of important details when using `select!` in a loop with futures or streams created outside of the `select!`. These are a fundamental consequence of how `select` works, so I'll introduce them by stepping through the details of `select`, using `timeout` in the last example as an example. + +- `timeout` is created outside of the loop and initialised with some time to count down. +- On each iteration of the loop, `select` creates a reference to `timeout`, but does not change its state. +- As `select` executes, it polls `timeout` which will return `Pending` while there is time left and `Ready` when the time elapses, at which point its block is executed. + +In the above example, when `timeout` is ready, we `break` out of the loop. But what if we didn't do that? In that case, `select` would simply poll `timeout` again, which the `Future` [docs](https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll) say should not happen! `select` can't help this, it doesn't have any state (between iterations) to decide if `timeout` should be polled. Depending on how `timeout` is written, this might cause a panic, a logic error, or some kind of crash. + +You can prevent this kind of bug in several ways: + +- Use a [fused](futures.md#fusing) [future](https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.fuse) or [stream](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.fuse) so that re-polling is safe. +- Ensure that your code is structured so that futures are never re-polled, e.g., by breaking out of the loop (as in the previous example), or by using an `if` guard. + +Now, lets consider the type of `&mut timeout`. Lets assume that `timeout()` returns a type which implements `Future`, which might be an anonymous type from an async function, or it might be a named type like `Timeout`. Lets assume the latter because it makes the examples easier (but the logic applies in either case). Given that `Timeout` implents `Future`, will `&mut Timeout` implement `Future`? Not necessarily! There is a [blanket `impl`](https://doc.rust-lang.org/std/future/trait.Future.html#impl-Future-for-%26mut+F) which makes this true, but only if `Timeout` implements `Unpin`. That is not the case for all futures, so often you'll get a type error writing code like the last example. Such an error is easily fixed though by using the `pin` macro, e.g., `let mut timeout = pin!(timeout());` + +Cancellation with `select` in a loop is a rich source of subtle bugs. These usually happen where a future contains some state involving some data but not the data itself. When the future is dropped by cancellation, that state is lost but the underlying data is not updated. This can lead to data being lost or processed multiple times. + + +### Alternatives + +Futures.rs has its own [`select` macro](https://docs.rs/futures/latest/futures/macro.select.html) and futures-concurrency has a [Race trait](https://docs.rs/futures-concurrency/latest/futures_concurrency/future/trait.Race.html) which are alternatives to Tokio's `select` macro. These both have the same core semantics of concurrently racing multiple futures, processing the result of the first and cancelling the others, but they have different syntax and vary in the details. + +Futures.rs' `select` is superficially similar to Tokio's; to summarize the differences, in the futures.rs version: + +- Futures must always be fused (enforced by type-checking). +- `select` has `default` and `complete` branches, rather than an `else` branch. +- `select` does not support `if` guards. + +Futures-concurrency's `Race` has a very different syntax, similar to it's version of `join`, e.g., `(future_a, future_b).race().await` (it works on `Vec`s and arrays as well as tuples). The syntax is less flexible than the macros, but fits in nicely with most async code. Note that if you use `race` within a loop, you can still have the same issues as with `select`. + +As with `join`, spawning tasks and letting them execute in parallel is often a good alternative to using `select`. However, cancelling the remaining tasks after the first completes requires some extra work. This can be done using channels or a cancellation token. In either case, cancellation requires some action by the task being cancelled which means the task can do some tidying up or other graceful shutdown. + +A common use for `select` (especially inside a loop) is working with streams. There are stream combinator methods which can replace some uses of select. For example, [`merge`](https://docs.rs/futures-concurrency/latest/futures_concurrency/stream/trait.Merge.html) in futures-concurrency is a good alternative to merge multiple streams together. + + +## Final words + +In this section we've talked about two ways to run groups of futures concurrently. Joining futures means waiting for them all to finish; selecting (aka racing) futures means waiting for the first to finish. In contrast to spawning tasks, these compositions make no use of parallelism. + +Both `join` and `select` operate on sets of futures which are known in advance (often when writing the program, rather than at runtime). Sometimes, the futures to be composed are not known in advance - futures must be added to the set of composed futures as they are being executed. For this we need [streams](streams.md) which have their own composition operations. + +It's worth reiterating that although these composition operators are powerful and expressive, it is often easier and more appropriate to use tasks and spawning: parallelism is often desirable, you're less likely to have bugs around cancellation or blocking, and resource allocation is usually fairer (or at least simpler) and more predictable. diff --git a/src/part-guide/dtors.md b/src/part-guide/dtors.md index 289b5651..2fdb153a 100644 --- a/src/part-guide/dtors.md +++ b/src/part-guide/dtors.md @@ -33,6 +33,7 @@ - Don't use async for cleanup and don't worry too much - async clean up method + dtor bomb (i.e., separate clean-up from destruction) - centralise/out-source clean-up in a separate task or thread or supervisor object/process +- https://tokio.rs/tokio/topics/shutdown ## Why no async Drop (yet) diff --git a/src/part-guide/intro.md b/src/part-guide/intro.md index 6295bdf9..6b795055 100644 --- a/src/part-guide/intro.md +++ b/src/part-guide/intro.md @@ -2,4 +2,18 @@ This part of the book is a tutorial-style guide to async Rust. It is aimed at newcomers to async programming in Rust. It should be useful whether or not you've done async programming in other languages. If you have, you might skip the first section or skim it as a refresher. You might also want to read this [comparison to async in other languages]() sooner rather than later. -We'll start by discussing different models of [concurrent programming](concurrency.md), using processes, threads, or async tasks. This chapter will cover the essential parts of Rust's async model before we get into the nitty-gritty of programming in the second chapter where we introduce the async and await syntax. +## Core concepts + +We'll start by discussing different models of [concurrent programming](concurrency.md), using processes, threads, or async tasks. The first chapter will cover the essential parts of Rust's async model before we get into the nitty-gritty of async programming in the [second chapter](async-await.md) where we introduce the async and await programming paradigm. We cover some more async programming concepts in the [following chapter](more-async-await.md). + +One of the main motivations for async programming is more performant IO, which we cover in the [next chapter](io.md). We also cover *blocking* in detail in the same chapter. Blocking is a major hazard in async programming where a thread is blocked from making progress by an operation (often IO) which synchronously waits. + +Another motivation for async programming is that it facilitates new models for [abstraction and composition of concurrent code](concurrency-primitives.md). After covering that, we move on to [synchronization](sync.md) between concurrent tasks. + +There is a chapter on [tools for async programming](tools.md). + +The last few chapters cover some more specialised topics, starting with [async destruction and clean-up](dtors.md) (which is a common requirement, but since there is currently not a good built-in solution, is a bit of a specialist topic). + +The next two chapters in the guide go into detail on [futures](futures.md) and [runtimes](runtimes.md), two fundamental building blocks for async programming. + +Finally, we cover [timers and signal handling](timers-signals.md) and [async iterators](streams.md) (aka streams). The latter are how we program with sequences of async events (c.f., individual async events which are represented using futures or async functions). This is an area where the language is being actively developed and can be a little rough around the edges. diff --git a/src/part-guide/streams.md b/src/part-guide/streams.md index e20ab3dd..9449226f 100644 --- a/src/part-guide/streams.md +++ b/src/part-guide/streams.md @@ -23,6 +23,14 @@ - unordered variations - StreamGroup +### join/select/race with streams + +- hazards with select in a loop +- fusing +- difference to just futures +- alternatives to these + - Stream::merge, etc. + ## Implementing an async iterator - Implementing the trait diff --git a/src/part-guide/timers-signals.md b/src/part-guide/timers-signals.md index c638e0be..e1fd542f 100644 --- a/src/part-guide/timers-signals.md +++ b/src/part-guide/timers-signals.md @@ -7,6 +7,7 @@ - sleep - interval - timeout + - special future vs select/race ## Signal handling diff --git a/src/part-reference/cancellation.md b/src/part-reference/cancellation.md new file mode 100644 index 00000000..826d6e8e --- /dev/null +++ b/src/part-reference/cancellation.md @@ -0,0 +1,26 @@ +# Cancellation and cancellation safety + +Internal vs external cancellation +Threads vs futures + drop = cancel + only at await points + useful feature + still somewhat abrubt and surprising +Other cancellation mechanisms + abort + cancellation tokens + +## Cancellation safety + +Not a memory safety issue or race condition + Data loss or other logic errors +Different definitions/names + tokio's definition + general definition/halt safety + applying a replicated future idea +Simple data loss +Resumption +Issue with select or similar in loops +Splitting state between the future and the context as a root cause + + diff --git a/src/part-reference/structured.md b/src/part-reference/structured.md new file mode 100644 index 00000000..9adfec5e --- /dev/null +++ b/src/part-reference/structured.md @@ -0,0 +1,153 @@ +# Structured Concurrency + +Authors note (TODO): we might want to discuss some parts of this chapter much earlier in the book, in particularly as design principles (first intro is in guide/intro). However, in the interests of better understanding the topic and getting something written down, I'm starting with a separate chapter. It's also still a bit rough. + +(Note: the first few sections are talking about the abstract concept of structured concurrency and is not specific to Rust or async programming (c.f., synchronous concurrent programming with threads). I use 'task' to mean any thread or async task or other similar concurrency primitive). + +Structured concurrency is a philosophy for designing concurrent programs. For programs to fully adhere to the principals of structured concurrency requires certain language features and libraries, but many of the benefits are available by following the philosophy without such features. Structured concurrency is independent of language and concurrency primitives (threads vs async, etc.). Many people have found the ideas from structured concurrency to be useful when programming with async Rust. + +The essential idea of structured concurrency is that tasks are organised into a tree. Child tasks start after their parents and always finish before them. This allows results and errors to always be passed back to parent tasks, and requires that cancellation of parents is always propagated to child tasks. Primarily, temporal scope follows lexical scope, which means that a task should not outlive the function or block where it is created. However, this is not a requirement of structured concurrency as long as longer-lived tasks are reified in the program in some way (typically by using an object to represent the temporal scope of a child task within its parent task). + +TODO diagram + +Structured concurrency is named by analogy to [structured programming](https://en.wikipedia.org/wiki/Structured_programming), which is the idea that control flow should be structured using functions, loops, etc., rather than arbitrary jumps (`goto`). + +Before we consider structured concurrency, it's helpful to reflect on the sense in which common concurrent designs are unstructured. A typical pattern is that a task is started using some kind of spawning statement. That task then runs to completion concurrently with other tasks in the system (including the task which spawned it). There is no constraint on which task finishes first. The program is essentially just a bag of tasks which live independently and might terminate at any time. Any communication or synchronization of the tasks is ad hoc, and the programmer cannot assume that any other task will still be running. + +The practical downsides of unstructured concurrency are that returning results from a task must happen in an extra-linguistic fashion with no language-level guarantees around when or how this happens. Errors may go uncaught because languages' error handling mechanisms cannot be applied to the unconstrained control flow of unstructured concurrency. We also have no guarantees about the relative state of tasks - any task may be running, terminated successfully or with an error, or externally cancelled, independent of the state of any others[^join]. All this makes concurrent programs difficult to understand and maintain. This lack of structure is one reason why concurrent programming is considered categorically more difficult than sequential programming. + +It's worth noting that structured concurrency is a programming discipline which imposes restrictions on your program. Just like functions and loops are less flexible than goto, structured concurrency is less flexible than just spawning tasks. However, as with structured programming the costs of structured concurrency in flexibility are outweighed by the gains in predictability. + + +[^join]: Using join handles mitigates these downsides somewhat, but is an ad hoc mechanism with no reliable guarantees. To get the full benefits of structured concurrency you have to be meticulous about always using them, as well as handling cancellation and errors properly. This is difficult without language or library support; we'll discuss this a bit more below. + + +## Principles of structured concurrency + +The key idea of structured concurrency is that all tasks (or threads or whatever) are organized as a tree. I.e., each task (except the main task which is the root) has a single parent and there are no cycles of parents. A child task is started by its parent[^start-parent] and must *always* finish executing before its parent. There are no constraints between siblings. The parent of a task may not change. + +When reasoning about programs which implement structured concurrency, the key new fact is that if a task is live, then all of its ancestor tasks must also be live. This doesn't guarantee they are in a good state - they might be in the process of shutting down or handling an error, but they must be running in some form. This means that for any task (except the root task), there is always a live task to send results or errors to. Indeed, the ideal approach is that the language's error handling is extended so that errors are always propagated to the parent task. In Rust, this should apply to both returning `Result::Err` and to panicking. + +Furthermore, the lifetime of child tasks can be represented in the parent task. In the common case, the lifetime of a task (its temporal scope) is tied to the lexical scope in which it is started. For example, all tasks started within a function should complete before the function returns. This is an extremely powerful reasoning tool. Of course, this is too restrictive for all cases, and so the temporal scope of tasks can extend beyond a lexical scope by using an object in the program (often called a 'scope' or 'nursery'). Such an object can be passed or stored, and thus have an arbitrary lifetime. We still have an important reasoning tool: the tasks tied to that object cannot outlive it (in Rust this property lets us integrate tasks with the lifetime system). + +The above leads to another benefit of structured concurrency: it lets us reason about resource management across multiple tasks. Cleanup code is called when a resource will no longer be used (e.g., closing a file handle). In sequential code, the problem of when to call cleanup code is solved by ensuring destructors are called when an object goes out of scope. However, in concurrent code, an object might still be in use by another task and so when to clean up is unclear (reference counting or garbage collection are solutions in many cases, but make reasoning about the lifetimes of objects difficult which can lead to errors, and also has runtime overheads). + +The principle of a parent task outliving it's children has an important implication for cancellation: if a task is cancelled, then all its child tasks must be cancelled, and their cancellation must complete before the parent's cancellation completes. That in turn has implications for how cancellation can be implemented in a structurally concurrent system. + +If a task completes early due to an error (in Rust, this might mean a panic, as well as an early return), then before returning the task must wait for all its child tasks to complete. In practice, an early return must trigger cancellation of child tasks. This is analogous to panicking in Rust: panicking triggers destructors in the current scope before walking up the stack, calling destructors in each scope until the program terminates or the panic is caught. Under structural concurrency, an early return must trigger cancellation of child tasks (and thus cleanup of objects in those tasks) and walks down the tree of tasks cancelling all (transitive) children. + +Some designs work very naturally under structured concurrency (e.g., worker tasks with a single job to complete), while others don't fit so well. Generally these patterns are ones where not being tied to a specific task is a feature, e.g., worker pools or background threads. Even using these patterns, the tasks usually shouldn't outlive the whole program and so there is always one task which can be the parent. + + +[^start-parent]: This is not actually a hard requirement for structured concurrency. If the temporal scope of a task can be represented in the program and passed between tasks, then a child task can be started by one task but have another as its parent. + + +### Implementing structured concurrency + +The exemplar implementation of structured concurrency is the Python [Trio](https://trio.readthedocs.io/en/stable/) library. Trio is a general purpose library for async programming and IO designed around the concepts of structured concurrency. Trio programs use the `async with` construct to define a lexical scope for spawning tasks. Spawned tasks are associated with a [nursery](https://trio.readthedocs.io/en/stable/reference-core.html#nurseries-and-spawning) object (which is somewhat like a [Scope](https://doc.rust-lang.org/stable/std/thread/struct.Scope.html) in Rust). The lifetime of a task is tied to the dynamic temporal scope of its nursery, and in the common case, the lexical scope of an `async with` block. This enforces the parent/child relationship between tasks and thus the tree-invariant of structured concurrency. + +Error handling uses Python exceptions which are automatically propagated to parent tasks. + + +### Partially structured concurrency + +Like many programming techniques, the full benefits of structured concurrency come from *only* using it. If all concurrency is structured, then it makes it much easier to reason about the behaviour of the whole program. However, that has requirements on a language which are not easily met; it is easy enough to do unstructured concurrency in Rust, for example. However, even applying the principles of structured concurrency selectively, or thinking in terms of structured concurrency can be useful. + +One can use structured concurrency as a design discipline. When designing a program, always consider and document the parent-child relationships between tasks and ensure that a child task terminates before it's parent. This is usually fairly easy under normal execution, but can be difficult in the face of cancellation and panics. + +Another element of structured concurrency which is fairly easy to adopt is to always propagate errors to the parent task. Just like regular error handling, the best thing to do might be to ignore the error, but this should be explicit in the code of the parent task. + +Another programming discipline to learn from structured concurrency is to cancel all child tasks in the event of cancelling a parent task. This makes the structural concurrency guarantees much more reliable and makes cancellation in general easier to reason about. + + +## Practical structured concurrency with async Rust + + +Concurrency in Rust (whether async or using threads) is inherently unstructured. Tasks can be arbitrarily spawned, errors and panics on other tasks can be ignored, and cancellation is usually instantaneous and does not propagate to other tasks (see below for why these issues can't be easily solved). However, there are several ways you can get some of the benefits of structured concurrency in your programs: + +- Design your programs at a high level in accordance with structured concurrency. +- Stick to structured concurrency idioms where possible (and avoid unstructured idioms). +- Use crates to make structured concurrency more ergonomic and reliable. + +One of the trickiest issues with using structured concurrency with Rust is propagating cancellation to child futures/tasks. If you're using futures and [composing them concurrently](../part-guide/concurrency-primitives.md), then this happens naturally if abruptly (dropping a future drops any futures it owns, cancelling them). However, when a task is dropped, there is no opportunity to send a signal to tasks it has spawned (at least not with Tokio[^join_handle]). + +The implication of this is that you can only assume a weaker invariant than with 'real' structured concurrency: rather than being able to assume that a parent task is always alive, you can only assume that the parent is always alive unless it has been cancelled or it has panicked. While this is sub-optimal, it can still simplify programming because you never have to handle the case of having no parent to handle some result *under normal execution*. + +TODO + +- ownership/lifetimes naturally leading to sc +- reasoning about resources + + +[^join_handle]: The semantics of Tokio's `JoinHandle` is that if the handle is dropped, then the underlying task is 'released' (c.f., dropped), i.e., the result of the child task is not handled by any other task. + + +### Applying structured concurrency to the design of async programs + +In terms of designing programs, applying structured concurrency has a few implications: + +- Organising the concurrency of a program in a tree structure, i.e., thinking in terms of parent and child tasks. +- Temporal scope should follow lexical scope where possible, or in concrete terms a function shouldn't return (including early returns and panics) until any tasks launched in the function are complete. +- Data generally flows from child tasks to parent tasks. Of course, some data will flow from parents to children or in other ways, but primarily, tasks pass the results of their work to their parent tasks for further processing. This includes errors, so parent tasks should handle the errors of their children. + +If you're writing a library and want to use structured concurrency (or you want the library to be usable in a concurrent-structured program), then it is important that encapsulation of the library component includes temporal encapsulation. I.e., it doesn't start tasks which keep running beyond the API functions returning. + +Since Rust can't enforce the rules of structured concurrency, it's important to be aware of, and to document, in which ways the program (or component) is structured and where it violates the structured concurrency discipline. + +One useful compromise pattern is to only allow unstructured concurrency at the highest level of abstraction, and only for tasks spawned from the outer-most functions of the main task (ideally only from the `main` function, but programs often have some setup or configuration code which means that the logical 'top level' of a program is actually a few functions deep). Under such a pattern, a bunch of tasks are spawned from `main`, usually with distinct responsibilities and limited interaction between each other. These tasks might be restarted, new tasks started by any other task, or have a limited lifetime tied to clients or similar, i.e., they are concurrent-unstructured. Within each of these tasks, structured concurrency is rigorously applied. + +TODO why is this useful? + +TODO would be great to have a case study here. + + +### Structured and unstructured idioms + +This subsection covers a grab-bag of idioms which work well with a structured approach to concurrency, and a few which make structuring concurrency more difficult. + +The easiest way to follow structured concurrency is to use futures and [concurrent composition](../part-guide/concurrency-primitives.md) rather than tasks and spawning. If you need tasks for parallelism, then you will need to use `JoinHandle`s or `JoinSet`s. You must take care that child tasks can clean up properly if the parent task panics or is cancelled. Handles must be checked for errors to ensure errors in child tasks are properly handled. + +One way to work around the lack of cancellation propagation is to avoid abruptly cancelling (dropping) any task which may have children. Instead use a signal (e.g., a cancellation token) so that the task can cancel it's children before terminating. Unfortunately this is incompatible with `select`. + +To handle shutting down a program (or component), use an explicit shutdown method rather than dropping the component, so that the shutdown function can wait for child tasks to terminate or cancel them (since `drop` cannot be async). + +A few idioms do not play well with structured concurrency: + +- Spawning tasks without awaiting their completion via a join handle, or dropping those join handles. +- Select or race macros/functions. These are not inherently structured, but since they abruptly cancel futures, it's a common source of unstructured cancellation. +- Worker tasks or pools. For async tasks the overheads of starting/shutting down tasks is so low that there is likely to be very little benefit of using a pool of tasks rather than a pool of 'data', e.g., a connection pool. +- Data with no clear ownership structure - this isn't necessarily in contradiction with structured concurrency, but often leads to design issues. + + +### Crates for structured concurrency + +TODO + +- crates: [moro](https://github.com/nikomatsakis/moro), [async-nursery](https://github.com/najamelan/async_nursery) +- futures-concurrency + + +## Related topics + +This section is not necessary to know to use structured concurrency with async Rust, but is useful context included for the curious. + +### Scoped threads + +Structured concurrency with Rust threads works pretty well. Although you can't prevent spawning threads with unscoped lifetime, this is easy to avoid. Instead, restrict yourself to using scoped threads, see the [`scope`](https://doc.rust-lang.org/stable/std/thread/fn.scope.html) function docs for how. Using scoped threads limits child lifetimes and automatically propagates panics back to the parent thread. The parent thread must check the results of child threads to handle errors though. You can even pass around the [`Scope`](https://doc.rust-lang.org/stable/std/thread/struct.Scope.html) object like a Trio nursery. Cancellation is not usually an issue for Rust threads, but if you do make use of thread cancellation, you'll have to integrate that with scoped threads manually. + +Specific to Rust, scoped threads allow child threads to borrow data from the parent thread, something not possible with concurrent-unstructured threads. This can be very useful and shows how well structured concurrency and Rust-ownership-style resource management can work together. + + +### Async drop and scoped tasks + +In Rust, destructors (`drop`) are used to ensure resources are cleaned up when an object's lifetime ends. Since futures are just objects, their destructor would be an obvious place to ensure cancellation of child futures. However, in an async program it is very often desirable for cleanup actions to be asynchronous (not doing so can block other tasks). Unfortunately Rust does not currently support asynchronous destructors (async drop). There is ongoing work to support them, but it is difficult for a number of reasons, including that an object with an async destructor might be dropped from non-async context, and that since calling `drop` is implicit, there is nowhere to write an explicit `await`. + +Given how useful scoped threads are (both in general and for structured concurrency), another good question is why there is no similar construct for async programming ('scoped tasks')? TODO answer this + + +### References + +If you're interested, here are some good blog posts for further reading: + +- [Structured Concurrency](https://www.250bpm.com/p/structured-concurrency) +- [Tree-structured concurrency](https://blog.yoshuawuyts.com/tree-structured-concurrency/)