From bea85577081fefda53271593a793e59cd224f96a Mon Sep 17 00:00:00 2001 From: Chris Krycho Date: Tue, 25 Jun 2024 15:10:22 -0600 Subject: [PATCH] =?UTF-8?q?Ch.=2017=20=C2=A706:=20Start=20discussing=20fut?= =?UTF-8?q?ure/task/thread=20tradeoffs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Reintroduce accidentally-dropped content. I meant to simply carry this over in 74df84ea, but failed to, perhaps because I had `mdbook serve` running and it tries to be helpful about generating files which are referenced in `SUMMARY.md` but do not exist on disk. Either way, this is back now and we can use it to explain these concepts. - Start on an example showing how `thread::spawn` and `spawn_task` are basically interchangeable from an API POV, so that we can then see how they differ in terms of runtime consequences. --- .../ch17-async-await/listing-17-51/Cargo.lock | 292 ++++++++++++++++++ .../ch17-async-await/listing-17-51/Cargo.toml | 8 + .../listing-17-51/src/main.rs | 64 ++++ src/ch17-06-futures-tasks-threads.md | 155 +++++----- 4 files changed, 441 insertions(+), 78 deletions(-) create mode 100644 listings/ch17-async-await/listing-17-51/Cargo.lock create mode 100644 listings/ch17-async-await/listing-17-51/Cargo.toml create mode 100644 listings/ch17-async-await/listing-17-51/src/main.rs diff --git a/listings/ch17-async-await/listing-17-51/Cargo.lock b/listings/ch17-async-await/listing-17-51/Cargo.lock new file mode 100644 index 000000000..36905af42 --- /dev/null +++ b/listings/ch17-async-await/listing-17-51/Cargo.lock @@ -0,0 +1,292 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "addr2line" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "async_await" +version = "0.1.0" +dependencies = [ + "trpl", +] + +[[package]] +name = "autocfg" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" + +[[package]] +name = "backtrace" +version = "0.3.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] +name = "cc" +version = "1.0.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c51067fd44124faa7f870b4b1c969379ad32b2ba805aa959430ceaa384f695" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + +[[package]] +name = "futures-task" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" + +[[package]] +name = "futures-util" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "gimli" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" + +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + +[[package]] +name = "libc" +version = "0.2.155" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" +dependencies = [ + "adler", +] + +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "object" +version = "0.36.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "576dfe1fc8f9df304abb159d767a29d0476f7750fbf8aa7ad07816004a207434" +dependencies = [ + "memchr", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro2" +version = "1.0.85" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "syn" +version = "2.0.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tokio" +version = "1.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" +dependencies = [ + "backtrace", + "num_cpus", + "pin-project-lite", +] + +[[package]] +name = "tokio-stream" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "trpl" +version = "0.1.0" +dependencies = [ + "futures", + "tokio", + "tokio-stream", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" diff --git a/listings/ch17-async-await/listing-17-51/Cargo.toml b/listings/ch17-async-await/listing-17-51/Cargo.toml new file mode 100644 index 000000000..e094f067f --- /dev/null +++ b/listings/ch17-async-await/listing-17-51/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "async_await" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[dependencies] +trpl = { path = "../../../packages/trpl" } diff --git a/listings/ch17-async-await/listing-17-51/src/main.rs b/listings/ch17-async-await/listing-17-51/src/main.rs new file mode 100644 index 000000000..729836252 --- /dev/null +++ b/listings/ch17-async-await/listing-17-51/src/main.rs @@ -0,0 +1,64 @@ +use std::{pin::pin, thread, time::Duration}; + +use trpl::{ReceiverStream, Stream, StreamExt}; + +fn main() { + trpl::block_on(async { + let messages = get_messages().timeout(Duration::from_millis(200)); + let intervals = get_intervals() + .map(|count| format!("Interval #{count}")) + .throttle(Duration::from_millis(500)) + .timeout(Duration::from_secs(10)); + + let mut merged = pin!(messages.merge(intervals).take(20)); + + while let Some(result) = merged.next().await { + match result { + Ok(item) => println!("{item}"), + Err(reason) => eprintln!("Problem: {reason:?}"), + } + } + }) +} + +// ANCHOR: thread +fn get_messages() -> impl Stream { + let (tx, rx) = trpl::channel(); + + thread::spawn(move || { + let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; + + for (index, message) in messages.into_iter().enumerate() { + let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; + thread::sleep(Duration::from_millis(time_to_sleep)); + + if let Err(send_error) = + tx.send(format!("Message: '{message}' after {time_to_sleep}ms")) + { + eprintln!("Cannot send message '{message}': {send_error}"); + break; + } + } + }); + + ReceiverStream::new(rx) +} + +fn get_intervals() -> impl Stream { + let (tx, rx) = trpl::channel(); + + thread::spawn(move || { + let mut count = 0; + loop { + thread::sleep(Duration::from_millis(1)); + count += 1; + if let Err(send_error) = tx.send(count) { + eprintln!("Could not send interval {count}: {send_error}"); + break; + }; + } + }); + + ReceiverStream::new(rx) +} +// ANCHOR_END: thread diff --git a/src/ch17-06-futures-tasks-threads.md b/src/ch17-06-futures-tasks-threads.md index 70baf0ce6..dbfbc2a46 100644 --- a/src/ch17-06-futures-tasks-threads.md +++ b/src/ch17-06-futures-tasks-threads.md @@ -1,87 +1,86 @@ # Futures, Tasks, and Threads -Working code from end of previous section: +As we saw in the previous chapter, threads provide one approach to concurrency, +and they let us solve some of these issues. However, they also have some +tradeoffs. On many operating systems, they use a fair bit of memory for each +thread, and they come with some overhead for starting up and shutting down. +Threads are also only an option when your operating system and hardware support +them! While mainstream desktop and mobile operating systems have all had +threading for many years, many embedded operating systems, like those used on +some microcontrollers, do not. + +The async model provides a different—and ultimately complementary—set of +tradeoffs. In the async model, concurrent operations do not require their own +threads. Instead, they can run on *tasks*. A task is a bit like a thread, but +instead of being managed by the operating system, it is managed by code that +lives at the level of libraries. + +In the previous section, we saw that we could build a `Stream` by using a +channel and spawning an async task which we could call from synchronous code. +We could do the exact same thing with a thread! We’ll use a simpler version of +the streams example so we can focus on the differences. Back in Listing 17-45, we used +`trpl::spawn_task` and `trpl::sleep`. In Listing 17-51, we replace those with +the `thread::spawn` and `thread::sleep` APIs from the standard library. + + + + ```rust -use std::{pin::pin, time::Duration}; - -use trpl::{interval, IntervalStream, ReceiverStream, Stream, StreamExt}; - -fn main() { - trpl::block_on(async { - let messages = pin!(get_messages()); - - let deciseconds = - pin!(IntervalStream::new(interval(Duration::from_millis(1))) - .throttle(Duration::from_millis(100)) - .map(|interval| { - let duration = interval.elapsed(); - format!("milliseconds elapsed: {}", duration.as_millis()) - })); - - let mut merged = messages.merge(deciseconds).take(10); - while let Some(alternative) = merged.next().await { - println!("Got: {alternative:?}"); - } - }) -} - -fn get_messages() -> impl Stream { - let (tx, rx) = trpl::channel(); - - trpl::spawn_task(async move { - for message in [String::from("Hello"), String::from("Goodbye")] { - tx.send(message).unwrap(); - trpl::sleep(Duration::from_millis(500)).await; - } - }); - - ReceiverStream::new(rx) -} +{{#rustdoc_include ../listings/ch17-async-await/listing-17-51/src/main.rs:thread}} ``` -We can also do this using `std::thread`: - -```rust -use std::{pin::pin, thread, time::Duration}; - -use trpl::{interval, IntervalStream, ReceiverStream, Stream, StreamExt}; - -fn main() { - trpl::block_on(async { - let messages = pin!(get_messages()); - - let deciseconds = - pin!(IntervalStream::new(interval(Duration::from_millis(1))) - .throttle(Duration::from_millis(100)) - .map(|interval| { - let duration = interval.elapsed(); - format!("milliseconds elapsed: {}", duration.as_millis()) - })); - - let mut merged = messages.merge(deciseconds).take(10); - while let Some(alternative) = merged.next().await { - println!("Got: {alternative:?}"); - } - }) -} - -fn get_messages() -> impl Stream { - let (tx, rx) = trpl::channel(); - - // ANCHOR: thread - thread::spawn(move || { - for message in [String::from("Hello"), String::from("Goodbye")] { - tx.send(message).unwrap(); - thread::sleep(Duration::from_millis(500)); - } - }); - // ANCHOR_END: thread - - ReceiverStream::new(rx) -} -``` + Notice that very little changes here from the perspective of the calling code! That is as we might expect, given that async tasks are kind of like lightweight, -runtime-managed threads. +runtime-managed threads. However, there is a meaningful difference in the way +this system behaves, though we might have a hard time measuring it in this very +simple example. A run + +### Parallelism and Concurrency + + + +First, though, we need to dig a little deeper into the differences between +parallelism and concurrency. In the previous chapter we treated them as mostly +interchangeable. Now we need to distinguish between the two a little more, +because the differences will show up as we start working: + +* *Parallelism* is when operations can happen simultaneously. + +* *Concurrency* is when operations can make progress without having to wait for + all other operations to complete. + +One common analogy for thinking about the difference between concurrency and +parallelism is cooking in a kitchen. Parallelism is like having two cooks: one +working on cooking eggs, and the other working on preparing fruit bowls. Those +can happen at the same time, without either affecting the other. Concurrency is +like having a single cook who can start cooking some eggs, start dicing up some +vegetables to use in the omelette, adding seasoning and whatever vegetables are +ready to the eggs at certain points, and switching back and forth between those +tasks. + +(This analogy breaks down if you think about it too hard. The eggs keep cooking +while the cook is chopping up the vegetables, after all. That is parallelism, +not just concurrency! The focus of the analogy is the *cook*, not the food, +though, and as long as you keep that in mind, it mostly works.) + +On a machine with multiple CPU cores, we can actually do work in parallel. One +core can be doing one thing while another core does something completely +unrelated, and those actually happen at the same time. On a machine with a +single CPU core, the CPU can only do one operation at a time, but we can still +have concurrency. Using tools like threads, processes, and async, the computer +can pause one activity and switch to others before eventually cycling back to +that first activity again. So all parallel operations are also concurrent, but +not all concurrent operations happen in parallel! + +When working with async in Rust, we are always dealing with concurrency. +Depending on the hardware, the operating system, and the async runtime we are +using, that concurrency may use some degree of parallelism under the hood, or it +may not. (More about async runtimes later!) + +A big difference between the cooking analogy and Rust’s async model for +concurrency is that in the cooking example, the cook makes the decision about +when to switch tasks. In Rust’s async model, the tasks are in control of that. +To see how, let’s look at how Rust actually uses async.