Ch. 17 §06: Start discussing future/task/thread tradeoffs

- Reintroduce accidentally-dropped content. I meant to simply carry this
  over in 74df84ea, but failed to, perhaps because I had `mdbook serve`
  running and it tries to be helpful about generating files which are
  referenced in `SUMMARY.md` but do not exist on disk. Either way, this
  is back now and we can use it to explain these concepts.

- Start on an example showing how `thread::spawn` and `spawn_task` are
  basically interchangeable from an API POV, so that we can then see how
  they differ in terms of runtime consequences.
This commit is contained in:
Chris Krycho
2024-06-25 15:10:22 -06:00
parent 3c6c32cbe3
commit bea8557708
4 changed files with 441 additions and 78 deletions

View File

@@ -0,0 +1,292 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "async_await"
version = "0.1.0"
dependencies = [
"trpl",
]
[[package]]
name = "autocfg"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
[[package]]
name = "backtrace"
version = "0.3.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]]
name = "cc"
version = "1.0.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96c51067fd44124faa7f870b4b1c969379ad32b2ba805aa959430ceaa384f695"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]]
name = "futures-task"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-util"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "gimli"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd"
[[package]]
name = "hermit-abi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "libc"
version = "0.2.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
[[package]]
name = "memchr"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "miniz_oxide"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08"
dependencies = [
"adler",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "object"
version = "0.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576dfe1fc8f9df304abb159d767a29d0476f7750fbf8aa7ad07816004a207434"
dependencies = [
"memchr",
]
[[package]]
name = "pin-project-lite"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro2"
version = "1.0.85"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "slab"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
[[package]]
name = "syn"
version = "2.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "tokio"
version = "1.38.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a"
dependencies = [
"backtrace",
"num_cpus",
"pin-project-lite",
]
[[package]]
name = "tokio-stream"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "trpl"
version = "0.1.0"
dependencies = [
"futures",
"tokio",
"tokio-stream",
]
[[package]]
name = "unicode-ident"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"

View File

@@ -0,0 +1,8 @@
[package]
name = "async_await"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
trpl = { path = "../../../packages/trpl" }

View File

@@ -0,0 +1,64 @@
use std::{pin::pin, thread, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::block_on(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval #{count}"))
.throttle(Duration::from_millis(500))
.timeout(Duration::from_secs(10));
let mut merged = pin!(messages.merge(intervals).take(20));
while let Some(result) = merged.next().await {
match result {
Ok(item) => println!("{item}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
// ANCHOR: thread
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
thread::spawn(move || {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
thread::sleep(Duration::from_millis(time_to_sleep));
if let Err(send_error) =
tx.send(format!("Message: '{message}' after {time_to_sleep}ms"))
{
eprintln!("Cannot send message '{message}': {send_error}");
break;
}
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
thread::spawn(move || {
let mut count = 0;
loop {
thread::sleep(Duration::from_millis(1));
count += 1;
if let Err(send_error) = tx.send(count) {
eprintln!("Could not send interval {count}: {send_error}");
break;
};
}
});
ReceiverStream::new(rx)
}
// ANCHOR_END: thread

View File

@@ -1,87 +1,86 @@
# Futures, Tasks, and Threads
Working code from end of previous section:
As we saw in the previous chapter, threads provide one approach to concurrency,
and they let us solve some of these issues. However, they also have some
tradeoffs. On many operating systems, they use a fair bit of memory for each
thread, and they come with some overhead for starting up and shutting down.
Threads are also only an option when your operating system and hardware support
them! While mainstream desktop and mobile operating systems have all had
threading for many years, many embedded operating systems, like those used on
some microcontrollers, do not.
The async model provides a different—and ultimately complementary—set of
tradeoffs. In the async model, concurrent operations do not require their own
threads. Instead, they can run on *tasks*. A task is a bit like a thread, but
instead of being managed by the operating system, it is managed by code that
lives at the level of libraries.
In the previous section, we saw that we could build a `Stream` by using a
channel and spawning an async task which we could call from synchronous code.
We could do the exact same thing with a thread! Well use a simpler version of
the streams example so we can focus on the differences. Back in Listing 17-45, we used
`trpl::spawn_task` and `trpl::sleep`. In Listing 17-51, we replace those with
the `thread::spawn` and `thread::sleep` APIs from the standard library.
<!-- TODO: Can we *just* show the get_intervals one? It is simpler. -->
<Listing number="17-51" caption="TODO" file-name="src/main.rs">
```rust
use std::{pin::pin, time::Duration};
use trpl::{interval, IntervalStream, ReceiverStream, Stream, StreamExt};
fn main() {
trpl::block_on(async {
let messages = pin!(get_messages());
let deciseconds =
pin!(IntervalStream::new(interval(Duration::from_millis(1)))
.throttle(Duration::from_millis(100))
.map(|interval| {
let duration = interval.elapsed();
format!("milliseconds elapsed: {}", duration.as_millis())
}));
let mut merged = messages.merge(deciseconds).take(10);
while let Some(alternative) = merged.next().await {
println!("Got: {alternative:?}");
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
for message in [String::from("Hello"), String::from("Goodbye")] {
tx.send(message).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
});
ReceiverStream::new(rx)
}
{{#rustdoc_include ../listings/ch17-async-await/listing-17-51/src/main.rs:thread}}
```
We can also do this using `std::thread`:
```rust
use std::{pin::pin, thread, time::Duration};
use trpl::{interval, IntervalStream, ReceiverStream, Stream, StreamExt};
fn main() {
trpl::block_on(async {
let messages = pin!(get_messages());
let deciseconds =
pin!(IntervalStream::new(interval(Duration::from_millis(1)))
.throttle(Duration::from_millis(100))
.map(|interval| {
let duration = interval.elapsed();
format!("milliseconds elapsed: {}", duration.as_millis())
}));
let mut merged = messages.merge(deciseconds).take(10);
while let Some(alternative) = merged.next().await {
println!("Got: {alternative:?}");
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
// ANCHOR: thread
thread::spawn(move || {
for message in [String::from("Hello"), String::from("Goodbye")] {
tx.send(message).unwrap();
thread::sleep(Duration::from_millis(500));
}
});
// ANCHOR_END: thread
ReceiverStream::new(rx)
}
```
</Listing>
Notice that very little changes here from the perspective of the calling code!
That is as we might expect, given that async tasks are kind of like lightweight,
runtime-managed threads.
runtime-managed threads. However, there is a meaningful difference in the way
this system behaves, though we might have a hard time measuring it in this very
simple example. A run <!-- TODO: keep going from here -->
### Parallelism and Concurrency
<!-- TODO: phrasing makes less sense given new home here at end of chapter -->
First, though, we need to dig a little deeper into the differences between
parallelism and concurrency. In the previous chapter we treated them as mostly
interchangeable. Now we need to distinguish between the two a little more,
because the differences will show up as we start working:
* *Parallelism* is when operations can happen simultaneously.
* *Concurrency* is when operations can make progress without having to wait for
all other operations to complete.
One common analogy for thinking about the difference between concurrency and
parallelism is cooking in a kitchen. Parallelism is like having two cooks: one
working on cooking eggs, and the other working on preparing fruit bowls. Those
can happen at the same time, without either affecting the other. Concurrency is
like having a single cook who can start cooking some eggs, start dicing up some
vegetables to use in the omelette, adding seasoning and whatever vegetables are
ready to the eggs at certain points, and switching back and forth between those
tasks.
(This analogy breaks down if you think about it too hard. The eggs keep cooking
while the cook is chopping up the vegetables, after all. That is parallelism,
not just concurrency! The focus of the analogy is the *cook*, not the food,
though, and as long as you keep that in mind, it mostly works.)
On a machine with multiple CPU cores, we can actually do work in parallel. One
core can be doing one thing while another core does something completely
unrelated, and those actually happen at the same time. On a machine with a
single CPU core, the CPU can only do one operation at a time, but we can still
have concurrency. Using tools like threads, processes, and async, the computer
can pause one activity and switch to others before eventually cycling back to
that first activity again. So all parallel operations are also concurrent, but
not all concurrent operations happen in parallel!
When working with async in Rust, we are always dealing with concurrency.
Depending on the hardware, the operating system, and the async runtime we are
using, that concurrency may use some degree of parallelism under the hood, or it
may not. (More about async runtimes later!)
A big difference between the cooking analogy and Rusts async model for
concurrency is that in the cooking example, the cook makes the decision about
when to switch tasks. In Rusts async model, the tasks are in control of that.
To see how, lets look at how Rust actually uses async.