mirror of
https://github.com/leptos-rs/leptos.git
synced 2025-12-27 07:34:35 -05:00
fix: prevent race condition in executor + docs, optimization and tests
This commit is contained in:
79
Cargo.lock
generated
79
Cargo.lock
generated
@@ -272,6 +272,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tracing",
|
||||
"wasm-bindgen-futures",
|
||||
"wasm-bindgen-test",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -831,9 +832,9 @@ checksum = "575f75dfd25738df5b91b8e43e14d44bda14637a58fae779fd2b064f8bf3e010"
|
||||
|
||||
[[package]]
|
||||
name = "deranged"
|
||||
version = "0.4.0"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e"
|
||||
checksum = "28cfac68e08048ae1883171632c2aef3ebc555621ae56fbccce1cbf22dd7f058"
|
||||
dependencies = [
|
||||
"powerfmt",
|
||||
]
|
||||
@@ -986,9 +987,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "event-listener-strategy"
|
||||
version = "0.5.3"
|
||||
version = "0.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2"
|
||||
checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93"
|
||||
dependencies = [
|
||||
"event-listener",
|
||||
"pin-project-lite",
|
||||
@@ -1588,9 +1589,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "icu_locid_transform_data"
|
||||
version = "1.5.0"
|
||||
version = "1.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e"
|
||||
checksum = "7515e6d781098bf9f7205ab3fc7e9709d34554ae0b21ddbcb5febfa4bc7df11d"
|
||||
|
||||
[[package]]
|
||||
name = "icu_normalizer"
|
||||
@@ -1612,9 +1613,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "icu_normalizer_data"
|
||||
version = "1.5.0"
|
||||
version = "1.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516"
|
||||
checksum = "c5e8338228bdc8ab83303f16b797e177953730f601a96c25d10cb3ab0daa0cb7"
|
||||
|
||||
[[package]]
|
||||
name = "icu_properties"
|
||||
@@ -1633,9 +1634,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "icu_properties_data"
|
||||
version = "1.5.0"
|
||||
version = "1.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569"
|
||||
checksum = "85fb8799753b75aee8d2a21d7c14d9f38921b54b3dbda10f5a3c7a7b82dba5e2"
|
||||
|
||||
[[package]]
|
||||
name = "icu_provider"
|
||||
@@ -2099,9 +2100,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.26"
|
||||
version = "0.4.27"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e"
|
||||
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
|
||||
|
||||
[[package]]
|
||||
name = "manyhow"
|
||||
@@ -2165,6 +2166,16 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "minicov"
|
||||
version = "0.3.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f27fe9f1cc3c22e1687f9446c2083c4c5fc7f0bcf1c7a86bdbded14985895b4b"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "miniserde"
|
||||
version = "0.1.42"
|
||||
@@ -2300,9 +2311,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.21.1"
|
||||
version = "1.21.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d75b0bedcc4fe52caa0e03d9f1151a323e4aa5e2d78ba3580400cd3c9e2bc4bc"
|
||||
checksum = "c2806eaa3524762875e21c3dcd057bc4b7bfa01ce4da8d46be1cd43649e1cc6b"
|
||||
|
||||
[[package]]
|
||||
name = "openssl"
|
||||
@@ -2625,9 +2636,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "quinn-udp"
|
||||
version = "0.5.10"
|
||||
version = "0.5.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e46f3055866785f6b92bc6164b76be02ca8f2eb4b002c0354b28cf4c119e5944"
|
||||
checksum = "541d0f57c6ec747a90738a52741d3221f7960e8ac2f0ff4b1a63680e033b4ab5"
|
||||
dependencies = [
|
||||
"cfg_aliases",
|
||||
"libc",
|
||||
@@ -3051,9 +3062,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.103.0"
|
||||
version = "0.103.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0aa4eeac2588ffff23e9d7a7e9b3f971c5fb5b7ebc9452745e0c232c64f83b2f"
|
||||
checksum = "fef8b8769aaccf73098557a87cd1816b4f9c7c16811c9c77142aa695c16f2c03"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
@@ -3643,9 +3654,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "time"
|
||||
version = "0.3.40"
|
||||
version = "0.3.41"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9d9c75b47bdff86fa3334a3db91356b8d7d86a9b839dab7d0bdc5c3d3a077618"
|
||||
checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40"
|
||||
dependencies = [
|
||||
"deranged",
|
||||
"itoa",
|
||||
@@ -3664,9 +3675,9 @@ checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c"
|
||||
|
||||
[[package]]
|
||||
name = "time-macros"
|
||||
version = "0.2.21"
|
||||
version = "0.2.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "29aa485584182073ed57fd5004aa09c371f021325014694e432313345865fd04"
|
||||
checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49"
|
||||
dependencies = [
|
||||
"num-conv",
|
||||
"time-core",
|
||||
@@ -4182,6 +4193,30 @@ dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-test"
|
||||
version = "0.3.50"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "66c8d5e33ca3b6d9fa3b4676d774c5778031d27a578c2b007f905acf816152c3"
|
||||
dependencies = [
|
||||
"js-sys",
|
||||
"minicov",
|
||||
"wasm-bindgen",
|
||||
"wasm-bindgen-futures",
|
||||
"wasm-bindgen-test-macro",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-test-macro"
|
||||
version = "0.3.50"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "17d5042cc5fa009658f9a7333ef24291b1291a25b6382dd68862a7f3b969f69b"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasm-streams"
|
||||
version = "0.4.2"
|
||||
|
||||
@@ -19,6 +19,14 @@ tokio = { version = "1.41", optional = true, default-features = false, features
|
||||
tracing = { version = "0.1.41", optional = true }
|
||||
wasm-bindgen-futures = { version = "0.4.50", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1.41", default-features = false, features = [
|
||||
"rt",
|
||||
"macros",
|
||||
"time",
|
||||
] }
|
||||
wasm-bindgen-test = { version = "0.3.50" }
|
||||
|
||||
[features]
|
||||
async-executor = ["dep:async-executor"]
|
||||
tracing = ["dep:tracing"]
|
||||
|
||||
@@ -1 +1,4 @@
|
||||
extend = { path = "../cargo-make/main.toml" }
|
||||
extend = [
|
||||
{ path = "../cargo-make/main.toml" },
|
||||
{ path = "../cargo-make/wasm-test.toml" },
|
||||
]
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
//! This crate makes it easier to write asynchronous code that is executor-agnostic, by providing a
|
||||
//! utility that can be used to spawn tasks in a variety of executors.
|
||||
//!
|
||||
//! It only supports single executor per program, but that executor can be set at runtime, anywhere
|
||||
//! in your crate (or an application that depends on it).
|
||||
//! It supports a global executor, set once per program.
|
||||
//!
|
||||
//! This can be extended to support any executor or runtime that supports spawning [`Future`]s.
|
||||
//!
|
||||
//! This is a least common denominator implementation in many ways. Limitations include:
|
||||
//! - setting an executor is a one-time, global action
|
||||
//! - setting the *global* executor is a one-time action
|
||||
//! - no "join handle" or other result is returned from the spawn
|
||||
//! - the `Future` must output `()`
|
||||
//!
|
||||
@@ -15,13 +14,102 @@
|
||||
//! use any_spawner::Executor;
|
||||
//!
|
||||
//! // make sure an Executor has been initialized with one of the init_ functions
|
||||
//! // (e.g., Executor::init_tokio().unwrap(); )
|
||||
//!
|
||||
//! # if false {
|
||||
//! // spawn a thread-safe Future
|
||||
//! Executor::spawn(async { /* ... */ });
|
||||
//! # #[cfg(feature = "tokio")]
|
||||
//! # {
|
||||
//! # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
|
||||
//! # rt.block_on(async {
|
||||
//! # any_spawner::Executor::init_tokio().unwrap();
|
||||
//! // spawn a thread-safe Future (uses the global executor)
|
||||
//! Executor::spawn(async { println!("Global spawn") });
|
||||
//!
|
||||
//! // spawn a Future that is !Send
|
||||
//! Executor::spawn_local(async { /* ... */ });
|
||||
//! // spawn a Future that is !Send (uses the global executor)
|
||||
//! Executor::spawn_local(async { println!("Global spawn_local") });
|
||||
//! # });
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! ## Custom Executors
|
||||
//!
|
||||
//! You can define and globally set a custom executor implementation:
|
||||
//!
|
||||
//! ```rust
|
||||
//! # #[cfg(feature = "futures-executor")] // Example uses LocalPool
|
||||
//! # {
|
||||
//! use any_spawner::{
|
||||
//! CustomExecutor, Executor, PinnedFuture, PinnedLocalFuture,
|
||||
//! };
|
||||
//! use futures::{executor::LocalPool, task::LocalSpawnExt};
|
||||
//! use std::cell::RefCell;
|
||||
//! use std::sync::{Mutex, Arc};
|
||||
//!
|
||||
//! // Use Mutex for Send+Sync compliance if shared across threads globally
|
||||
//! struct MyGlobalExecutor {
|
||||
//! // For simplicity, using futures LocalPool (not Send/Sync) wrapped correctly
|
||||
//! // Note: A real global executor would likely use a Send+Sync pool like futures::ThreadPool
|
||||
//! // or manage thread-specific local pools internally.
|
||||
//! // This example demonstrates the interface, not a robust global executor.
|
||||
//! // We'll use a simple local pool here, assuming access is synchronized.
|
||||
//! // For a truly global, thread-safe example, consider futures::ThreadPool.
|
||||
//! pool: Arc<Mutex<RefCell<LocalPool>>>, // Simplistic example synchronization
|
||||
//! }
|
||||
//!
|
||||
//! impl MyGlobalExecutor {
|
||||
//! fn new() -> Self {
|
||||
//! Self {
|
||||
//! pool: Arc::new(Mutex::new(RefCell::new(LocalPool::new()))),
|
||||
//! }
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! impl CustomExecutor for MyGlobalExecutor {
|
||||
//! // spawn needs to handle Send futures, potentially across threads
|
||||
//! fn spawn(&self, fut: PinnedFuture<()>) {
|
||||
//! println!("MyGlobalExecutor::spawn called - spawning locally for demo");
|
||||
//! // A real implementation might use a ThreadPool here.
|
||||
//! // For this demo, we delegate to spawn_local, assuming synchronized access.
|
||||
//! self.spawn_local(fut);
|
||||
//! }
|
||||
//!
|
||||
//! // spawn_local handles !Send futures, typically on the current thread's context
|
||||
//! fn spawn_local(&self, fut: PinnedLocalFuture<()>) {
|
||||
//! println!("MyGlobalExecutor::spawn_local called");
|
||||
//! // Lock mutex, then borrow RefCell
|
||||
//! let lock = self.pool.lock().unwrap();
|
||||
//! let pool_ref = lock.borrow();
|
||||
//! pool_ref
|
||||
//! .spawner()
|
||||
//! .spawn_local_obj(fut.into())
|
||||
//! .expect("Failed to spawn local");
|
||||
//! }
|
||||
//!
|
||||
//! // poll_local might need to drive the local part of the executor
|
||||
//! fn poll_local(&self) {
|
||||
//! println!("MyGlobalExecutor::poll_local called");
|
||||
//! // Lock mutex, then try_borrow_mut RefCell
|
||||
//! let lock = self.pool.lock().unwrap();
|
||||
//! if let Ok(mut pool_mut) = lock.try_borrow_mut() {
|
||||
//! pool_mut.run_until_stalled();
|
||||
//! } else {
|
||||
//! // Already borrowed, likely a nested poll, do nothing.
|
||||
//! println!("MyGlobalExecutor::poll_local - pool already borrowed");
|
||||
//! }
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! let global_exec = MyGlobalExecutor::new();
|
||||
//!
|
||||
//! // Set the executor *globally*
|
||||
//! Executor::init_custom_executor(global_exec)
|
||||
//! .expect("Failed to set global executor");
|
||||
//!
|
||||
//! // These spawns will now use MyGlobalExecutor
|
||||
//! Executor::spawn(async { println!("Global custom spawn") });
|
||||
//! Executor::spawn_local(async { println!("Global custom spawn_local") });
|
||||
//!
|
||||
//! // Need to drive the executor (if it requires polling like LocalPool)
|
||||
//! Executor::poll_local(); // Runs the tasks spawned above
|
||||
//! # }
|
||||
//! ```
|
||||
|
||||
@@ -37,15 +125,67 @@ pub type PinnedFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
|
||||
/// A future that has been pinned.
|
||||
pub type PinnedLocalFuture<T> = Pin<Box<dyn Future<Output = T>>>;
|
||||
|
||||
static SPAWN: OnceLock<fn(PinnedFuture<()>)> = OnceLock::new();
|
||||
static SPAWN_LOCAL: OnceLock<fn(PinnedLocalFuture<()>)> = OnceLock::new();
|
||||
static POLL_LOCAL: OnceLock<fn()> = OnceLock::new();
|
||||
// Type alias for the spawn function pointer.
|
||||
type SpawnFn = fn(PinnedFuture<()>);
|
||||
// Type alias for the spawn_local function pointer.
|
||||
type SpawnLocalFn = fn(PinnedLocalFuture<()>);
|
||||
// Type alias for the poll_local function pointer.
|
||||
type PollLocalFn = fn();
|
||||
|
||||
/// Holds the function pointers for the current global executor.
|
||||
#[derive(Clone, Copy)]
|
||||
struct ExecutorFns {
|
||||
spawn: SpawnFn,
|
||||
spawn_local: SpawnLocalFn,
|
||||
poll_local: PollLocalFn,
|
||||
}
|
||||
|
||||
// Use a single OnceLock to ensure atomic initialization of all functions.
|
||||
static EXECUTOR_FNS: OnceLock<ExecutorFns> = OnceLock::new();
|
||||
|
||||
// No-op functions to use when an executor doesn't support a specific operation.
|
||||
#[cfg(any(feature = "tokio", feature = "wasm-bindgen", feature = "glib"))]
|
||||
#[cold]
|
||||
#[inline(never)]
|
||||
fn no_op_poll() {}
|
||||
|
||||
#[cfg(all(not(feature = "wasm-bindgen"), not(debug_assertions)))]
|
||||
#[cold]
|
||||
#[inline(never)]
|
||||
fn no_op_spawn(_: PinnedFuture<()>) {
|
||||
#[cfg(debug_assertions)]
|
||||
eprintln!(
|
||||
"Warning: Executor::spawn called, but no global 'spawn' function is \
|
||||
configured (perhaps only spawn_local is supported, e.g., on wasm \
|
||||
without threading?)."
|
||||
);
|
||||
}
|
||||
|
||||
// Wasm panics if you spawn without an executor
|
||||
#[cfg(feature = "wasm-bindgen")]
|
||||
#[cold]
|
||||
#[inline(never)]
|
||||
fn no_op_spawn(_: PinnedFuture<()>) {
|
||||
panic!(
|
||||
"Executor::spawn called, but no global 'spawn' function is configured."
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
#[cold]
|
||||
#[inline(never)]
|
||||
fn no_op_spawn_local(_: PinnedLocalFuture<()>) {
|
||||
panic!(
|
||||
"Executor::spawn_local called, but no global 'spawn_local' function \
|
||||
is configured."
|
||||
);
|
||||
}
|
||||
|
||||
/// Errors that can occur when using the executor.
|
||||
#[derive(Error, Debug)]
|
||||
pub enum ExecutorError {
|
||||
/// The executor has already been set.
|
||||
#[error("Executor has already been set.")]
|
||||
#[error("Global executor has already been set.")]
|
||||
AlreadySet,
|
||||
}
|
||||
|
||||
@@ -54,150 +194,142 @@ pub struct Executor;
|
||||
|
||||
impl Executor {
|
||||
/// Spawns a thread-safe [`Future`].
|
||||
/// ```rust
|
||||
/// use any_spawner::Executor;
|
||||
/// # if false {
|
||||
/// // spawn a thread-safe Future
|
||||
/// Executor::spawn(async { /* ... */ });
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
/// Uses the globally configured executor.
|
||||
/// Panics if no global executor has been initialized.
|
||||
#[inline(always)]
|
||||
#[track_caller]
|
||||
pub fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
|
||||
if let Some(spawner) = SPAWN.get() {
|
||||
spawner(Box::pin(fut))
|
||||
let pinned_fut = Box::pin(fut);
|
||||
|
||||
if let Some(fns) = EXECUTOR_FNS.get() {
|
||||
(fns.spawn)(pinned_fut)
|
||||
} else {
|
||||
#[cfg(all(debug_assertions, feature = "tracing"))]
|
||||
tracing::error!(
|
||||
"At {}, tried to spawn a Future with Executor::spawn() before \
|
||||
the Executor had been set.",
|
||||
std::panic::Location::caller()
|
||||
);
|
||||
#[cfg(all(debug_assertions, not(feature = "tracing")))]
|
||||
panic!(
|
||||
"At {}, tried to spawn a Future with Executor::spawn() before \
|
||||
the Executor had been set.",
|
||||
std::panic::Location::caller()
|
||||
);
|
||||
// No global executor set.
|
||||
handle_uninitialized_spawn(pinned_fut);
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns a [`Future`] that cannot be sent across threads.
|
||||
/// ```rust
|
||||
/// use any_spawner::Executor;
|
||||
///
|
||||
/// # if false {
|
||||
/// // spawn a thread-safe Future
|
||||
/// Executor::spawn_local(async { /* ... */ });
|
||||
/// # }
|
||||
/// ```
|
||||
/// Uses the globally configured executor.
|
||||
/// Panics if no global executor has been initialized.
|
||||
#[inline(always)]
|
||||
#[track_caller]
|
||||
pub fn spawn_local(fut: impl Future<Output = ()> + 'static) {
|
||||
if let Some(spawner) = SPAWN_LOCAL.get() {
|
||||
spawner(Box::pin(fut))
|
||||
let pinned_fut = Box::pin(fut);
|
||||
|
||||
if let Some(fns) = EXECUTOR_FNS.get() {
|
||||
(fns.spawn_local)(pinned_fut)
|
||||
} else {
|
||||
#[cfg(all(debug_assertions, feature = "tracing"))]
|
||||
tracing::error!(
|
||||
"At {}, tried to spawn a Future with Executor::spawn_local() \
|
||||
before the Executor had been set.",
|
||||
std::panic::Location::caller()
|
||||
);
|
||||
#[cfg(all(debug_assertions, not(feature = "tracing")))]
|
||||
panic!(
|
||||
"At {}, tried to spawn a Future with Executor::spawn_local() \
|
||||
before the Executor had been set.",
|
||||
std::panic::Location::caller()
|
||||
);
|
||||
// No global executor set.
|
||||
handle_uninitialized_spawn_local(pinned_fut);
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits until the next "tick" of the current async executor.
|
||||
/// Uses `Executor::spawn`, respecting the global executor.
|
||||
pub async fn tick() {
|
||||
let (tx, rx) = futures::channel::oneshot::channel();
|
||||
#[cfg(not(feature = "wasm-bindgen"))]
|
||||
Executor::spawn(async move {
|
||||
_ = tx.send(());
|
||||
});
|
||||
#[cfg(feature = "wasm-bindgen")]
|
||||
Executor::spawn_local(async move {
|
||||
_ = tx.send(());
|
||||
});
|
||||
|
||||
_ = rx.await;
|
||||
}
|
||||
|
||||
/// Polls the current async executor.
|
||||
/// Not all async executors support polling, so this function may not do anything.
|
||||
/// Polls the global async executor.
|
||||
///
|
||||
/// Uses the globally configured executor.
|
||||
/// Does nothing if the global executor does not support polling.
|
||||
#[inline(always)]
|
||||
pub fn poll_local() {
|
||||
if let Some(poller) = POLL_LOCAL.get() {
|
||||
poller()
|
||||
if let Some(fns) = EXECUTOR_FNS.get() {
|
||||
(fns.poll_local)()
|
||||
}
|
||||
// If not initialized or doesn't support polling, do nothing gracefully.
|
||||
}
|
||||
}
|
||||
|
||||
impl Executor {
|
||||
/// Globally sets the [`tokio`] runtime as the executor used to spawn tasks.
|
||||
///
|
||||
/// Returns `Err(_)` if an executor has already been set.
|
||||
/// Returns `Err(_)` if a global executor has already been set.
|
||||
///
|
||||
/// Requires the `tokio` feature to be activated on this crate.
|
||||
#[cfg(feature = "tokio")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
|
||||
pub fn init_tokio() -> Result<(), ExecutorError> {
|
||||
SPAWN
|
||||
.set(|fut| {
|
||||
let executor_impl = ExecutorFns {
|
||||
spawn: |fut| {
|
||||
tokio::spawn(fut);
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
SPAWN_LOCAL
|
||||
.set(|fut| {
|
||||
},
|
||||
spawn_local: |fut| {
|
||||
tokio::task::spawn_local(fut);
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
Ok(())
|
||||
},
|
||||
// Tokio doesn't have an explicit global poll function like LocalPool::run_until_stalled
|
||||
poll_local: no_op_poll,
|
||||
};
|
||||
EXECUTOR_FNS
|
||||
.set(executor_impl)
|
||||
.map_err(|_| ExecutorError::AlreadySet)
|
||||
}
|
||||
|
||||
/// Globally sets the [`wasm-bindgen-futures`] runtime as the executor used to spawn tasks.
|
||||
///
|
||||
/// Returns `Err(_)` if an executor has already been set.
|
||||
/// Returns `Err(_)` if a global executor has already been set.
|
||||
///
|
||||
/// Requires the `wasm-bindgen` feature to be activated on this crate.
|
||||
#[cfg(feature = "wasm-bindgen")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "wasm-bindgen")))]
|
||||
pub fn init_wasm_bindgen() -> Result<(), ExecutorError> {
|
||||
SPAWN
|
||||
.set(|fut| {
|
||||
let executor_impl = ExecutorFns {
|
||||
// wasm-bindgen-futures only supports spawn_local
|
||||
spawn: no_op_spawn,
|
||||
spawn_local: |fut| {
|
||||
wasm_bindgen_futures::spawn_local(fut);
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
SPAWN_LOCAL
|
||||
.set(|fut| {
|
||||
wasm_bindgen_futures::spawn_local(fut);
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
Ok(())
|
||||
},
|
||||
poll_local: no_op_poll,
|
||||
};
|
||||
EXECUTOR_FNS
|
||||
.set(executor_impl)
|
||||
.map_err(|_| ExecutorError::AlreadySet)
|
||||
}
|
||||
|
||||
/// Globally sets the [`glib`] runtime as the executor used to spawn tasks.
|
||||
///
|
||||
/// Returns `Err(_)` if an executor has already been set.
|
||||
/// Returns `Err(_)` if a global executor has already been set.
|
||||
///
|
||||
/// Requires the `glib` feature to be activated on this crate.
|
||||
#[cfg(feature = "glib")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "glib")))]
|
||||
pub fn init_glib() -> Result<(), ExecutorError> {
|
||||
SPAWN
|
||||
.set(|fut| {
|
||||
let executor_impl = ExecutorFns {
|
||||
spawn: |fut| {
|
||||
let main_context = glib::MainContext::default();
|
||||
main_context.spawn(fut);
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
SPAWN_LOCAL
|
||||
.set(|fut| {
|
||||
},
|
||||
spawn_local: |fut| {
|
||||
let main_context = glib::MainContext::default();
|
||||
main_context.spawn_local(fut);
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
Ok(())
|
||||
},
|
||||
// Glib needs event loop integration, explicit polling isn't the standard model here.
|
||||
poll_local: no_op_poll,
|
||||
};
|
||||
EXECUTOR_FNS
|
||||
.set(executor_impl)
|
||||
.map_err(|_| ExecutorError::AlreadySet)
|
||||
}
|
||||
|
||||
/// Globally sets the [`futures`] executor as the executor used to spawn tasks,
|
||||
/// lazily creating a thread pool to spawn tasks into.
|
||||
///
|
||||
/// Returns `Err(_)` if an executor has already been set.
|
||||
/// Returns `Err(_)` if a global executor has already been set.
|
||||
///
|
||||
/// Requires the `futures-executor` feature to be activated on this crate.
|
||||
#[cfg(feature = "futures-executor")]
|
||||
@@ -209,9 +341,11 @@ impl Executor {
|
||||
};
|
||||
use std::cell::RefCell;
|
||||
|
||||
// Keep the lazy-init ThreadPool and thread-local LocalPool for spawn_local impl
|
||||
static THREAD_POOL: OnceLock<ThreadPool> = OnceLock::new();
|
||||
thread_local! {
|
||||
static LOCAL_POOL: RefCell<LocalPool> = RefCell::new(LocalPool::new());
|
||||
// SPAWNER is derived from LOCAL_POOL, keep it for efficiency inside the closure
|
||||
static SPAWNER: LocalSpawner = LOCAL_POOL.with(|pool| pool.borrow().spawner());
|
||||
}
|
||||
|
||||
@@ -222,140 +356,250 @@ impl Executor {
|
||||
})
|
||||
}
|
||||
|
||||
SPAWN
|
||||
.set(|fut| {
|
||||
let executor_impl = ExecutorFns {
|
||||
spawn: |fut| {
|
||||
get_thread_pool()
|
||||
.spawn(fut)
|
||||
.expect("failed to spawn future");
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
SPAWN_LOCAL
|
||||
.set(|fut| {
|
||||
.expect("failed to spawn future on ThreadPool");
|
||||
},
|
||||
spawn_local: |fut| {
|
||||
// Use the thread_local SPAWNER derived from LOCAL_POOL
|
||||
SPAWNER.with(|spawner| {
|
||||
spawner.spawn_local(fut).expect("failed to spawn future");
|
||||
spawner
|
||||
.spawn_local(fut)
|
||||
.expect("failed to spawn local future");
|
||||
});
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
POLL_LOCAL
|
||||
.set(|| {
|
||||
},
|
||||
poll_local: || {
|
||||
// Use the thread_local LOCAL_POOL
|
||||
LOCAL_POOL.with(|pool| {
|
||||
// Use try_borrow_mut to prevent panic during re-entrant calls
|
||||
if let Ok(mut pool) = pool.try_borrow_mut() {
|
||||
pool.run_until_stalled();
|
||||
}
|
||||
// If we couldn't borrow_mut, we're in a nested call to poll, so we don't need to do anything.
|
||||
// If already borrowed, we're likely in a nested poll, so do nothing.
|
||||
});
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
Ok(())
|
||||
},
|
||||
};
|
||||
|
||||
EXECUTOR_FNS
|
||||
.set(executor_impl)
|
||||
.map_err(|_| ExecutorError::AlreadySet)
|
||||
}
|
||||
|
||||
/// Globally sets the [`async_executor`] executor as the executor used to spawn tasks,
|
||||
/// lazily creating a thread pool to spawn tasks into.
|
||||
///
|
||||
/// Returns `Err(_)` if an executor has already been set.
|
||||
/// Returns `Err(_)` if a global executor has already been set.
|
||||
///
|
||||
/// Requires the `async-executor` feature to be activated on this crate.
|
||||
#[cfg(feature = "async-executor")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "async-executor")))]
|
||||
pub fn init_async_executor() -> Result<(), ExecutorError> {
|
||||
use async_executor::{Executor, LocalExecutor};
|
||||
use async_executor::{Executor as AsyncExecutor, LocalExecutor};
|
||||
|
||||
static THREAD_POOL: OnceLock<Executor> = OnceLock::new();
|
||||
// Keep the lazy-init global Executor and thread-local LocalExecutor for spawn_local impl
|
||||
static ASYNC_EXECUTOR: OnceLock<AsyncExecutor<'static>> =
|
||||
OnceLock::new();
|
||||
thread_local! {
|
||||
static LOCAL_POOL: LocalExecutor<'static> = const { LocalExecutor::new() };
|
||||
static LOCAL_EXECUTOR_POOL: LocalExecutor<'static> = const { LocalExecutor::new() };
|
||||
}
|
||||
|
||||
fn get_thread_pool() -> &'static Executor<'static> {
|
||||
THREAD_POOL.get_or_init(Executor::new)
|
||||
fn get_async_executor() -> &'static AsyncExecutor<'static> {
|
||||
ASYNC_EXECUTOR.get_or_init(AsyncExecutor::new)
|
||||
}
|
||||
|
||||
SPAWN
|
||||
.set(|fut| {
|
||||
get_thread_pool().spawn(fut).detach();
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
SPAWN_LOCAL
|
||||
.set(|fut| {
|
||||
LOCAL_POOL.with(|pool| pool.spawn(fut).detach());
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
POLL_LOCAL
|
||||
.set(|| {
|
||||
LOCAL_POOL.with(|pool| pool.try_tick());
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
Ok(())
|
||||
let executor_impl = ExecutorFns {
|
||||
spawn: |fut| {
|
||||
get_async_executor().spawn(fut).detach();
|
||||
},
|
||||
spawn_local: |fut| {
|
||||
LOCAL_EXECUTOR_POOL.with(|pool| pool.spawn(fut).detach());
|
||||
},
|
||||
poll_local: || {
|
||||
LOCAL_EXECUTOR_POOL.with(|pool| {
|
||||
// try_tick polls the local executor without blocking
|
||||
// This prevents issues if called recursively or from within a task.
|
||||
pool.try_tick();
|
||||
});
|
||||
},
|
||||
};
|
||||
EXECUTOR_FNS
|
||||
.set(executor_impl)
|
||||
.map_err(|_| ExecutorError::AlreadySet)
|
||||
}
|
||||
|
||||
/// Globally sets a custom executor as the executor used to spawn tasks.
|
||||
///
|
||||
/// Returns `Err(_)` if an executor has already been set.
|
||||
/// Requires the custom executor to be `Send + Sync` as it will be stored statically.
|
||||
///
|
||||
/// Returns `Err(_)` if a global executor has already been set.
|
||||
pub fn init_custom_executor(
|
||||
custom_executor: impl CustomExecutor + Send + Sync + 'static,
|
||||
) -> Result<(), ExecutorError> {
|
||||
static EXECUTOR: OnceLock<Box<dyn CustomExecutor + Send + Sync>> =
|
||||
OnceLock::new();
|
||||
EXECUTOR
|
||||
// Store the custom executor instance itself to call its methods.
|
||||
// Use Box for dynamic dispatch.
|
||||
static CUSTOM_EXECUTOR_INSTANCE: OnceLock<
|
||||
Box<dyn CustomExecutor + Send + Sync>,
|
||||
> = OnceLock::new();
|
||||
|
||||
CUSTOM_EXECUTOR_INSTANCE
|
||||
.set(Box::new(custom_executor))
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
|
||||
SPAWN
|
||||
.set(|fut| {
|
||||
EXECUTOR.get().unwrap().spawn(fut);
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
SPAWN_LOCAL
|
||||
.set(|fut| EXECUTOR.get().unwrap().spawn_local(fut))
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
POLL_LOCAL
|
||||
.set(|| EXECUTOR.get().unwrap().poll_local())
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
Ok(())
|
||||
// Now set the ExecutorFns using the stored instance
|
||||
let executor_impl = ExecutorFns {
|
||||
spawn: |fut| {
|
||||
// Unwrap is safe because we just set it successfully or returned Err.
|
||||
CUSTOM_EXECUTOR_INSTANCE.get().unwrap().spawn(fut);
|
||||
},
|
||||
spawn_local: |fut| {
|
||||
CUSTOM_EXECUTOR_INSTANCE.get().unwrap().spawn_local(fut);
|
||||
},
|
||||
poll_local: || {
|
||||
CUSTOM_EXECUTOR_INSTANCE.get().unwrap().poll_local();
|
||||
},
|
||||
};
|
||||
|
||||
EXECUTOR_FNS
|
||||
.set(executor_impl)
|
||||
.map_err(|_| ExecutorError::AlreadySet)
|
||||
// If setting EXECUTOR_FNS fails (extremely unlikely race if called *concurrently*
|
||||
// with another init_* after CUSTOM_EXECUTOR_INSTANCE was set), we technically
|
||||
// leave CUSTOM_EXECUTOR_INSTANCE set but EXECUTOR_FNS not. This is an edge case,
|
||||
// but the primary race condition is solved.
|
||||
}
|
||||
|
||||
/// Locally sets a custom executor as the executor used to spawn tasks
|
||||
/// in the current thread.
|
||||
/// Sets a custom executor *for the current thread only*.
|
||||
///
|
||||
/// Returns `Err(_)` if an executor has already been set.
|
||||
/// This overrides the global executor for calls to `spawn`, `spawn_local`, and `poll_local`
|
||||
/// made *from the current thread*. It does not affect other threads or the global state.
|
||||
///
|
||||
/// The provided `custom_executor` must implement [`CustomExecutor`] and `'static`, but does
|
||||
/// **not** need to be `Send` or `Sync`.
|
||||
///
|
||||
/// Returns `Err(ExecutorError::AlreadySet)` if a *local* executor has already been set
|
||||
/// *for this thread*.
|
||||
pub fn init_local_custom_executor(
|
||||
custom_executor: impl CustomExecutor + 'static,
|
||||
) -> Result<(), ExecutorError> {
|
||||
// Store the custom executor instance itself to call its methods.
|
||||
// Use Box for dynamic dispatch.
|
||||
thread_local! {
|
||||
static EXECUTOR: OnceLock<Box<dyn CustomExecutor>> = OnceLock::new();
|
||||
}
|
||||
EXECUTOR.with(|this| {
|
||||
static CUSTOM_EXECUTOR_INSTANCE: OnceLock<
|
||||
Box<dyn CustomExecutor>,
|
||||
> = OnceLock::new();
|
||||
};
|
||||
|
||||
CUSTOM_EXECUTOR_INSTANCE.with(|this| {
|
||||
this.set(Box::new(custom_executor))
|
||||
.map_err(|_| ExecutorError::AlreadySet)
|
||||
})?;
|
||||
|
||||
SPAWN
|
||||
.set(|fut| {
|
||||
EXECUTOR.with(|this| this.get().unwrap().spawn(fut));
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
SPAWN_LOCAL
|
||||
.set(|fut| {
|
||||
EXECUTOR.with(|this| this.get().unwrap().spawn_local(fut));
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
POLL_LOCAL
|
||||
.set(|| {
|
||||
EXECUTOR.with(|this| this.get().unwrap().poll_local());
|
||||
})
|
||||
.map_err(|_| ExecutorError::AlreadySet)?;
|
||||
Ok(())
|
||||
// Now set the ExecutorFns using the stored instance
|
||||
let executor_impl = ExecutorFns {
|
||||
spawn: |fut| {
|
||||
// Unwrap is safe because we just set it successfully or returned Err.
|
||||
CUSTOM_EXECUTOR_INSTANCE
|
||||
.with(|this| this.get().unwrap().spawn(fut));
|
||||
},
|
||||
spawn_local: |fut| {
|
||||
CUSTOM_EXECUTOR_INSTANCE
|
||||
.with(|this| this.get().unwrap().spawn_local(fut));
|
||||
},
|
||||
poll_local: || {
|
||||
CUSTOM_EXECUTOR_INSTANCE
|
||||
.with(|this| this.get().unwrap().poll_local());
|
||||
},
|
||||
};
|
||||
|
||||
EXECUTOR_FNS
|
||||
.set(executor_impl)
|
||||
.map_err(|_| ExecutorError::AlreadySet)
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait for custom executors.
|
||||
/// Custom executors can be used to integrate with any executor that supports spawning futures.
|
||||
///
|
||||
/// All methods can be called recursively.
|
||||
///
|
||||
/// If used with `init_custom_executor`, the implementation must be `Send + Sync + 'static`.
|
||||
///
|
||||
/// All methods can be called recursively. Implementors should be mindful of potential
|
||||
/// deadlocks or excessive resource consumption if recursive calls are not handled carefully
|
||||
/// (e.g., using `try_borrow_mut` or non-blocking polls within implementations).
|
||||
pub trait CustomExecutor {
|
||||
/// Spawns a future, usually on a thread pool.
|
||||
fn spawn(&self, fut: PinnedFuture<()>);
|
||||
/// Spawns a local future. May require calling `poll_local` to make progress.
|
||||
fn spawn_local(&self, fut: PinnedLocalFuture<()>);
|
||||
/// Polls the executor, if it supports polling.
|
||||
/// Polls the executor, if it supports polling. Implementations should ideally be
|
||||
/// non-blocking or use mechanisms like `try_tick` or `try_borrow_mut` to handle
|
||||
/// re-entrant calls safely.
|
||||
fn poll_local(&self);
|
||||
}
|
||||
|
||||
// Ensure CustomExecutor is object-safe
|
||||
#[allow(dead_code)]
|
||||
fn test_object_safety(_: Box<dyn CustomExecutor + Send + Sync>) {} // Added Send + Sync constraint here for global usage
|
||||
|
||||
/// Handles the case where `Executor::spawn` is called without an initialized executor.
|
||||
#[cold] // Less likely path
|
||||
#[inline(never)]
|
||||
#[track_caller]
|
||||
fn handle_uninitialized_spawn(_fut: PinnedFuture<()>) {
|
||||
let caller = std::panic::Location::caller();
|
||||
#[cfg(all(debug_assertions, feature = "tracing"))]
|
||||
{
|
||||
tracing::error!(
|
||||
target: "any_spawner",
|
||||
spawn_caller=%caller,
|
||||
"Executor::spawn called before a global executor was initialized. Task dropped."
|
||||
);
|
||||
// Drop the future implicitly after logging
|
||||
drop(_fut);
|
||||
}
|
||||
#[cfg(all(debug_assertions, not(feature = "tracing")))]
|
||||
{
|
||||
panic!(
|
||||
"At {}, tried to spawn a Future with Executor::spawn() before a \
|
||||
global executor was initialized.",
|
||||
caller
|
||||
);
|
||||
}
|
||||
// In release builds (without tracing), call the specific no-op function.
|
||||
#[cfg(not(debug_assertions))]
|
||||
{
|
||||
no_op_spawn(_fut);
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles the case where `Executor::spawn_local` is called without an initialized executor.
|
||||
#[cold] // Less likely path
|
||||
#[inline(never)]
|
||||
#[track_caller]
|
||||
fn handle_uninitialized_spawn_local(_fut: PinnedLocalFuture<()>) {
|
||||
let caller = std::panic::Location::caller();
|
||||
#[cfg(all(debug_assertions, feature = "tracing"))]
|
||||
{
|
||||
tracing::error!(
|
||||
target: "any_spawner",
|
||||
spawn_caller=%caller,
|
||||
"Executor::spawn_local called before a global executor was initialized. \
|
||||
Task likely dropped or panicked."
|
||||
);
|
||||
// Fall through to panic or no-op depending on build/target
|
||||
}
|
||||
#[cfg(all(debug_assertions, not(feature = "tracing")))]
|
||||
{
|
||||
panic!(
|
||||
"At {}, tried to spawn a Future with Executor::spawn_local() \
|
||||
before a global executor was initialized.",
|
||||
caller
|
||||
);
|
||||
}
|
||||
// In release builds (without tracing), call the specific no-op function (which usually panics).
|
||||
#[cfg(not(debug_assertions))]
|
||||
{
|
||||
no_op_spawn_local(_fut);
|
||||
}
|
||||
}
|
||||
|
||||
24
any_spawner/tests/already_set_error.rs
Normal file
24
any_spawner/tests/already_set_error.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
use any_spawner::{Executor, ExecutorError};
|
||||
|
||||
#[test]
|
||||
fn test_already_set_error() {
|
||||
struct SimpleExecutor;
|
||||
|
||||
impl any_spawner::CustomExecutor for SimpleExecutor {
|
||||
fn spawn(&self, _fut: any_spawner::PinnedFuture<()>) {}
|
||||
fn spawn_local(&self, _fut: any_spawner::PinnedLocalFuture<()>) {}
|
||||
fn poll_local(&self) {}
|
||||
}
|
||||
|
||||
// First initialization should succeed
|
||||
Executor::init_custom_executor(SimpleExecutor)
|
||||
.expect("First initialization failed");
|
||||
|
||||
// Second initialization should fail with AlreadySet error
|
||||
let result = Executor::init_custom_executor(SimpleExecutor);
|
||||
assert!(matches!(result, Err(ExecutorError::AlreadySet)));
|
||||
|
||||
// First local initialization should fail
|
||||
let result = Executor::init_local_custom_executor(SimpleExecutor);
|
||||
assert!(matches!(result, Err(ExecutorError::AlreadySet)));
|
||||
}
|
||||
74
any_spawner/tests/async_executor.rs
Normal file
74
any_spawner/tests/async_executor.rs
Normal file
@@ -0,0 +1,74 @@
|
||||
#![cfg(feature = "async-executor")]
|
||||
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
// A simple async executor for testing
|
||||
struct TestExecutor {
|
||||
tasks: Mutex<Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>,
|
||||
}
|
||||
|
||||
impl TestExecutor {
|
||||
fn new() -> Self {
|
||||
TestExecutor {
|
||||
tasks: Mutex::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn<F>(&self, future: F)
|
||||
where
|
||||
F: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
self.tasks.lock().unwrap().push(Box::pin(future));
|
||||
}
|
||||
|
||||
fn run_all(&self) {
|
||||
// Take all tasks out to process them
|
||||
let tasks = self.tasks.lock().unwrap().drain(..).collect::<Vec<_>>();
|
||||
|
||||
// Use a basic future executor to run each task to completion
|
||||
for mut task in tasks {
|
||||
// Use futures-lite's block_on to complete the future
|
||||
futures::executor::block_on(async {
|
||||
unsafe {
|
||||
let task_mut = Pin::new_unchecked(&mut task);
|
||||
let _ = std::future::Future::poll(
|
||||
task_mut,
|
||||
&mut std::task::Context::from_waker(
|
||||
futures::task::noop_waker_ref(),
|
||||
),
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_async_executor() {
|
||||
let executor = Arc::new(TestExecutor::new());
|
||||
let executor_clone = executor.clone();
|
||||
|
||||
// Create a spawner function that will use our test executor
|
||||
let spawner = move |future| {
|
||||
executor_clone.spawn(future);
|
||||
};
|
||||
|
||||
// Prepare test data
|
||||
let counter = Arc::new(Mutex::new(0));
|
||||
let counter_clone = counter.clone();
|
||||
|
||||
// Use the spawner to spawn a task
|
||||
spawner(async move {
|
||||
*counter_clone.lock().unwrap() += 1;
|
||||
});
|
||||
|
||||
// Run all tasks
|
||||
executor.run_all();
|
||||
|
||||
// Check if the task completed correctly
|
||||
assert_eq!(*counter.lock().unwrap(), 1);
|
||||
}
|
||||
63
any_spawner/tests/custom_executor.rs
Normal file
63
any_spawner/tests/custom_executor.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
use any_spawner::Executor;
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_custom_executor() {
|
||||
// Define a simple custom executor
|
||||
struct TestExecutor {
|
||||
spawn_called: Arc<AtomicBool>,
|
||||
spawn_local_called: Arc<AtomicBool>,
|
||||
poll_local_called: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl any_spawner::CustomExecutor for TestExecutor {
|
||||
fn spawn(&self, fut: any_spawner::PinnedFuture<()>) {
|
||||
self.spawn_called.store(true, Ordering::SeqCst);
|
||||
// Execute the future immediately (this works for simple test futures)
|
||||
futures::executor::block_on(fut);
|
||||
}
|
||||
|
||||
fn spawn_local(&self, fut: any_spawner::PinnedLocalFuture<()>) {
|
||||
self.spawn_local_called.store(true, Ordering::SeqCst);
|
||||
// Execute the future immediately
|
||||
futures::executor::block_on(fut);
|
||||
}
|
||||
|
||||
fn poll_local(&self) {
|
||||
self.poll_local_called.store(true, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
let spawn_called = Arc::new(AtomicBool::new(false));
|
||||
let spawn_local_called = Arc::new(AtomicBool::new(false));
|
||||
let poll_local_called = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let executor = TestExecutor {
|
||||
spawn_called: spawn_called.clone(),
|
||||
spawn_local_called: spawn_local_called.clone(),
|
||||
poll_local_called: poll_local_called.clone(),
|
||||
};
|
||||
|
||||
// Initialize with our custom executor
|
||||
Executor::init_custom_executor(executor)
|
||||
.expect("Failed to initialize custom executor");
|
||||
|
||||
// Test spawn
|
||||
Executor::spawn(async {
|
||||
// Simple task
|
||||
});
|
||||
assert!(spawn_called.load(Ordering::SeqCst));
|
||||
|
||||
// Test spawn_local
|
||||
Executor::spawn_local(async {
|
||||
// Simple local task
|
||||
});
|
||||
assert!(spawn_local_called.load(Ordering::SeqCst));
|
||||
|
||||
// Test poll_local
|
||||
Executor::poll_local();
|
||||
assert!(poll_local_called.load(Ordering::SeqCst));
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
#[cfg(feature = "futures-executor")]
|
||||
#![cfg(feature = "futures-executor")]
|
||||
|
||||
use any_spawner::{CustomExecutor, Executor, PinnedFuture, PinnedLocalFuture};
|
||||
#[cfg(feature = "futures-executor")]
|
||||
|
||||
#[test]
|
||||
fn can_create_custom_executor() {
|
||||
use futures::{
|
||||
|
||||
28
any_spawner/tests/executor_tick.rs
Normal file
28
any_spawner/tests/executor_tick.rs
Normal file
@@ -0,0 +1,28 @@
|
||||
#![cfg(feature = "tokio")]
|
||||
|
||||
use any_spawner::Executor;
|
||||
use std::{
|
||||
sync::{Arc, Mutex},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_executor_tick() {
|
||||
// Initialize the tokio executor
|
||||
Executor::init_tokio().expect("Failed to initialize tokio executor");
|
||||
|
||||
let value = Arc::new(Mutex::new(false));
|
||||
let value_clone = value.clone();
|
||||
|
||||
// Spawn a task that sets the value after a tick
|
||||
Executor::spawn(async move {
|
||||
Executor::tick().await;
|
||||
*value_clone.lock().unwrap() = true;
|
||||
});
|
||||
|
||||
// Allow some time for the task to complete
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// Check that the value was set
|
||||
assert!(*value.lock().unwrap());
|
||||
}
|
||||
44
any_spawner/tests/futures_executor.rs
Normal file
44
any_spawner/tests/futures_executor.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
#![cfg(feature = "futures-executor")]
|
||||
|
||||
use any_spawner::Executor;
|
||||
use futures::channel::oneshot;
|
||||
use std::{
|
||||
sync::{Arc, Mutex},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_futures_executor() {
|
||||
// Initialize the futures executor
|
||||
Executor::init_futures_executor()
|
||||
.expect("Failed to initialize futures executor");
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let result = Arc::new(Mutex::new(None));
|
||||
let result_clone = result.clone();
|
||||
|
||||
// Spawn a task
|
||||
Executor::spawn(async move {
|
||||
tx.send(84).expect("Failed to send value");
|
||||
});
|
||||
|
||||
// Spawn a task that waits for the result
|
||||
Executor::spawn(async move {
|
||||
match rx.await {
|
||||
Ok(val) => *result_clone.lock().unwrap() = Some(val),
|
||||
Err(_) => panic!("Failed to receive value"),
|
||||
}
|
||||
});
|
||||
|
||||
// Poll a few times to ensure the task completes
|
||||
for _ in 0..10 {
|
||||
Executor::poll_local();
|
||||
std::thread::sleep(Duration::from_millis(10));
|
||||
|
||||
if result.lock().unwrap().is_some() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(*result.lock().unwrap(), Some(84));
|
||||
}
|
||||
@@ -1,8 +1,8 @@
|
||||
#[cfg(feature = "futures-executor")]
|
||||
#![cfg(feature = "futures-executor")]
|
||||
|
||||
use any_spawner::Executor;
|
||||
// All tests in this file use the same executor.
|
||||
|
||||
#[cfg(feature = "futures-executor")]
|
||||
#[test]
|
||||
fn can_spawn_local_future() {
|
||||
use std::rc::Rc;
|
||||
@@ -15,7 +15,6 @@ fn can_spawn_local_future() {
|
||||
Executor::spawn(async {});
|
||||
}
|
||||
|
||||
#[cfg(feature = "futures-executor")]
|
||||
#[test]
|
||||
fn can_make_local_progress() {
|
||||
use std::sync::{
|
||||
|
||||
179
any_spawner/tests/glib.rs
Normal file
179
any_spawner/tests/glib.rs
Normal file
@@ -0,0 +1,179 @@
|
||||
#![cfg(feature = "glib")]
|
||||
|
||||
use any_spawner::Executor;
|
||||
use glib::{MainContext, MainLoop};
|
||||
use std::{
|
||||
cell::Cell,
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
rc::Rc,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, Mutex,
|
||||
},
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
// Helper to run a future to completion on a dedicated glib MainContext.
|
||||
// Returns true if the future completed within the timeout, false otherwise.
|
||||
fn run_on_glib_context<F>(fut: F)
|
||||
where
|
||||
F: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
let _ = Executor::init_glib();
|
||||
|
||||
let context = MainContext::default();
|
||||
let main_loop = MainLoop::new(Some(&context), false);
|
||||
let main_loop_clone = main_loop.clone();
|
||||
|
||||
Executor::spawn(async move {
|
||||
fut.await;
|
||||
main_loop_clone.quit();
|
||||
});
|
||||
|
||||
main_loop.run();
|
||||
}
|
||||
|
||||
// Helper to run a local (!Send) future on the glib context.
|
||||
fn run_local_on_glib_context<F>(fut: F)
|
||||
where
|
||||
F: Future<Output = ()> + 'static,
|
||||
{
|
||||
let _ = Executor::init_glib();
|
||||
|
||||
let context = MainContext::default();
|
||||
let main_loop = MainLoop::new(Some(&context), false);
|
||||
let main_loop_clone = main_loop.clone();
|
||||
|
||||
Executor::spawn_local(async move {
|
||||
fut.await;
|
||||
main_loop_clone.quit();
|
||||
});
|
||||
|
||||
main_loop.run();
|
||||
}
|
||||
|
||||
// This test must run after a test that successfully initializes glib,
|
||||
// or within its own process.
|
||||
#[test]
|
||||
fn test_glib_spawn() {
|
||||
let success_flag = Arc::new(AtomicBool::new(false));
|
||||
let flag_clone = success_flag.clone();
|
||||
|
||||
run_on_glib_context(async move {
|
||||
// Simulate async work
|
||||
YieldFuture::new().await;
|
||||
flag_clone.store(true, Ordering::SeqCst);
|
||||
|
||||
// We need to give the spawned task time to run.
|
||||
// The run_on_glib_context handles the main loop.
|
||||
// We just need to ensure spawn happened correctly.
|
||||
// Let's wait a tiny bit within the driving future to ensure spawn gets processed.
|
||||
glib::timeout_future(Duration::from_millis(10)).await;
|
||||
});
|
||||
|
||||
assert!(
|
||||
success_flag.load(Ordering::SeqCst),
|
||||
"Spawned future did not complete successfully"
|
||||
);
|
||||
}
|
||||
|
||||
// Similar conditions as test_glib_spawn regarding initialization state.
|
||||
#[test]
|
||||
fn test_glib_spawn_local() {
|
||||
let success_flag = Rc::new(Cell::new(false));
|
||||
let flag_clone = success_flag.clone();
|
||||
|
||||
run_local_on_glib_context(async move {
|
||||
// Use Rc to make the future !Send
|
||||
let non_send_data = Rc::new(Cell::new(10));
|
||||
|
||||
let data = non_send_data.get();
|
||||
assert_eq!(data, 10, "Rc data should be accessible");
|
||||
non_send_data.set(20); // Modify non-Send data
|
||||
|
||||
// Simulate async work
|
||||
YieldFuture::new().await;
|
||||
|
||||
assert_eq!(
|
||||
non_send_data.get(),
|
||||
20,
|
||||
"Rc data should persist modification"
|
||||
);
|
||||
flag_clone.set(true);
|
||||
|
||||
// Wait a tiny bit
|
||||
glib::timeout_future(Duration::from_millis(10)).await;
|
||||
});
|
||||
|
||||
assert!(
|
||||
success_flag.get(),
|
||||
"Spawned local future did not complete successfully"
|
||||
);
|
||||
}
|
||||
|
||||
// Test Executor::tick with glib backend
|
||||
#[test]
|
||||
fn test_glib_tick() {
|
||||
run_on_glib_context(async {
|
||||
let value = Arc::new(Mutex::new(false));
|
||||
let value_clone = value.clone();
|
||||
|
||||
// Spawn a task that sets the value after a tick
|
||||
Executor::spawn(async move {
|
||||
Executor::tick().await;
|
||||
*value_clone.lock().unwrap() = true;
|
||||
});
|
||||
|
||||
// Allow some time for the task to complete
|
||||
glib::timeout_future(Duration::from_millis(10)).await;
|
||||
|
||||
// Check that the value was set
|
||||
assert!(*value.lock().unwrap());
|
||||
});
|
||||
}
|
||||
|
||||
// Test Executor::poll_local with glib backend (should be a no-op)
|
||||
#[test]
|
||||
fn test_glib_poll_local_is_no_op() {
|
||||
// Ensure glib executor is initialized
|
||||
let _ = Executor::init_glib();
|
||||
// poll_local for glib is configured as a no-op
|
||||
// Calling it should not panic or cause issues.
|
||||
Executor::poll_local();
|
||||
Executor::poll_local();
|
||||
|
||||
println!("Executor::poll_local called successfully (expected no-op).");
|
||||
}
|
||||
|
||||
// --- Helper Future ---
|
||||
|
||||
// A simple future that yields once and then completes.
|
||||
struct YieldFuture {
|
||||
yielded: bool,
|
||||
}
|
||||
|
||||
impl YieldFuture {
|
||||
fn new() -> Self {
|
||||
YieldFuture { yielded: false }
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for YieldFuture {
|
||||
type Output = ();
|
||||
|
||||
fn poll(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Self::Output> {
|
||||
if self.yielded {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
self.yielded = true;
|
||||
// Wake immediately to re-poll
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
54
any_spawner/tests/local_custom_executor.rs
Normal file
54
any_spawner/tests/local_custom_executor.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
use any_spawner::Executor;
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_local_custom_executor() {
|
||||
// Define a thread-local custom executor
|
||||
struct LocalTestExecutor {
|
||||
spawn_called: Arc<AtomicBool>,
|
||||
spawn_local_called: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl any_spawner::CustomExecutor for LocalTestExecutor {
|
||||
fn spawn(&self, fut: any_spawner::PinnedFuture<()>) {
|
||||
self.spawn_called.store(true, Ordering::SeqCst);
|
||||
futures::executor::block_on(fut);
|
||||
}
|
||||
|
||||
fn spawn_local(&self, fut: any_spawner::PinnedLocalFuture<()>) {
|
||||
self.spawn_local_called.store(true, Ordering::SeqCst);
|
||||
futures::executor::block_on(fut);
|
||||
}
|
||||
|
||||
fn poll_local(&self) {
|
||||
// No-op for this test
|
||||
}
|
||||
}
|
||||
|
||||
let local_spawn_called = Arc::new(AtomicBool::new(false));
|
||||
let local_spawn_local_called = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let local_executor = LocalTestExecutor {
|
||||
spawn_called: local_spawn_called.clone(),
|
||||
spawn_local_called: local_spawn_local_called.clone(),
|
||||
};
|
||||
|
||||
// Initialize a thread-local executor
|
||||
Executor::init_local_custom_executor(local_executor)
|
||||
.expect("Failed to initialize local custom executor");
|
||||
|
||||
// Test spawn - should use the thread-local executor
|
||||
Executor::spawn(async {
|
||||
// Simple task
|
||||
});
|
||||
assert!(local_spawn_called.load(Ordering::SeqCst));
|
||||
|
||||
// Test spawn_local - should use the thread-local executor
|
||||
Executor::spawn_local(async {
|
||||
// Simple local task
|
||||
});
|
||||
assert!(local_spawn_local_called.load(Ordering::SeqCst));
|
||||
}
|
||||
35
any_spawner/tests/multiple_tasks.rs
Normal file
35
any_spawner/tests/multiple_tasks.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
#![cfg(feature = "tokio")]
|
||||
|
||||
use any_spawner::Executor;
|
||||
use futures::channel::oneshot;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multiple_tasks() {
|
||||
Executor::init_tokio().expect("Failed to initialize tokio executor");
|
||||
|
||||
let counter = Arc::new(Mutex::new(0));
|
||||
let tasks = 10;
|
||||
let mut handles = Vec::new();
|
||||
|
||||
// Spawn multiple tasks that increment the counter
|
||||
for _ in 0..tasks {
|
||||
let counter_clone = counter.clone();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
Executor::spawn(async move {
|
||||
*counter_clone.lock().unwrap() += 1;
|
||||
tx.send(()).expect("Failed to send completion signal");
|
||||
});
|
||||
|
||||
handles.push(rx);
|
||||
}
|
||||
|
||||
// Wait for all tasks to complete
|
||||
for handle in handles {
|
||||
handle.await.expect("Task failed");
|
||||
}
|
||||
|
||||
// Verify that all tasks incremented the counter
|
||||
assert_eq!(*counter.lock().unwrap(), tasks);
|
||||
}
|
||||
20
any_spawner/tests/tokio_executor.rs
Normal file
20
any_spawner/tests/tokio_executor.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
#![cfg(feature = "tokio")]
|
||||
|
||||
use any_spawner::Executor;
|
||||
use futures::channel::oneshot;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tokio_executor() {
|
||||
// Initialize the tokio executor
|
||||
Executor::init_tokio().expect("Failed to initialize tokio executor");
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
// Spawn a task that sends a value
|
||||
Executor::spawn(async move {
|
||||
tx.send(42).expect("Failed to send value");
|
||||
});
|
||||
|
||||
// Wait for the spawned task to complete
|
||||
assert_eq!(rx.await.unwrap(), 42);
|
||||
}
|
||||
96
any_spawner/tests/wasm_bindgen_tests.rs
Normal file
96
any_spawner/tests/wasm_bindgen_tests.rs
Normal file
@@ -0,0 +1,96 @@
|
||||
#![cfg(feature = "wasm-bindgen")]
|
||||
|
||||
use any_spawner::Executor;
|
||||
use futures::channel::oneshot;
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
};
|
||||
use wasm_bindgen_test::*;
|
||||
|
||||
wasm_bindgen_test_configure!(run_in_browser);
|
||||
|
||||
#[wasm_bindgen_test]
|
||||
async fn test_wasm_bindgen_spawn_local() {
|
||||
// Initialize the wasm-bindgen executor
|
||||
let _ = Executor::init_wasm_bindgen();
|
||||
|
||||
// Create a channel to verify the task completes
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
// Spawn a local task (wasm doesn't support sending futures between threads)
|
||||
Executor::spawn_local(async move {
|
||||
// Simulate some async work
|
||||
Executor::tick().await;
|
||||
tx.send(42).expect("Failed to send result");
|
||||
});
|
||||
|
||||
// Wait for the task to complete
|
||||
let result = rx.await.expect("Failed to receive result");
|
||||
assert_eq!(result, 42);
|
||||
}
|
||||
|
||||
#[wasm_bindgen_test]
|
||||
async fn test_wasm_bindgen_tick() {
|
||||
// Initialize the wasm-bindgen executor if not already initialized
|
||||
let _ = Executor::init_wasm_bindgen();
|
||||
|
||||
let flag = Arc::new(AtomicBool::new(false));
|
||||
let flag_clone = flag.clone();
|
||||
|
||||
// Spawn a task that will set the flag
|
||||
Executor::spawn_local(async move {
|
||||
flag_clone.store(true, Ordering::SeqCst);
|
||||
});
|
||||
|
||||
// Wait for a tick, which should allow the spawned task to run
|
||||
Executor::tick().await;
|
||||
|
||||
// Verify the flag was set
|
||||
assert!(flag.load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
#[wasm_bindgen_test]
|
||||
async fn test_multiple_wasm_bindgen_tasks() {
|
||||
// Initialize once for all tests
|
||||
let _ = Executor::init_wasm_bindgen();
|
||||
|
||||
// Create channels for multiple tasks
|
||||
let (tx1, rx1) = oneshot::channel();
|
||||
let (tx2, rx2) = oneshot::channel();
|
||||
|
||||
// Spawn multiple tasks
|
||||
Executor::spawn_local(async move {
|
||||
tx1.send("task1").expect("Failed to send from task1");
|
||||
});
|
||||
|
||||
Executor::spawn_local(async move {
|
||||
tx2.send("task2").expect("Failed to send from task2");
|
||||
});
|
||||
|
||||
// Wait for both tasks to complete
|
||||
let (result1, result2) = futures::join!(rx1, rx2);
|
||||
|
||||
assert_eq!(result1.unwrap(), "task1");
|
||||
assert_eq!(result2.unwrap(), "task2");
|
||||
}
|
||||
|
||||
// This test verifies that spawn (not local) fails on wasm as expected
|
||||
#[wasm_bindgen_test]
|
||||
#[should_panic]
|
||||
fn test_wasm_bindgen_spawn_errors() {
|
||||
let _ = Executor::init_wasm_bindgen();
|
||||
|
||||
// Using should_panic to test that Executor::spawn panics in wasm
|
||||
// Note: We can't use #[should_panic] with async tests, so we test synchronously
|
||||
let result = std::panic::catch_unwind(|| {
|
||||
Executor::spawn(async {
|
||||
// This should panic since wasm-bindgen doesn't support Send futures
|
||||
});
|
||||
});
|
||||
|
||||
assert!(
|
||||
result.is_err(),
|
||||
"Expected Executor::spawn to panic in wasm environment"
|
||||
);
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
extend = [
|
||||
{ path = "./check.toml" },
|
||||
{ path = "./lint.toml" },
|
||||
{ path = "./test.toml" },
|
||||
{ path = "./check.toml" },
|
||||
{ path = "./lint.toml" },
|
||||
{ path = "./test.toml" },
|
||||
]
|
||||
|
||||
[env]
|
||||
@@ -12,4 +12,4 @@ LEPTOS_OUTPUT_NAME = "ci" # allows examples to check/build without cargo-leptos
|
||||
RUSTFLAGS = "-D warnings"
|
||||
|
||||
[tasks.ci]
|
||||
dependencies = ["lint", "test"]
|
||||
dependencies = ["lint", "test-flow"]
|
||||
|
||||
7
cargo-make/wasm-test.toml
Normal file
7
cargo-make/wasm-test.toml
Normal file
@@ -0,0 +1,7 @@
|
||||
[tasks.post-test]
|
||||
dependencies = ["test-wasm"]
|
||||
|
||||
[tasks.test-wasm]
|
||||
env = { CARGO_MAKE_WASM_TEST_ARGS = "--headless --chrome --features=wasm-bindgen" }
|
||||
command = "cargo"
|
||||
args = ["make", "wasm-pack-test"]
|
||||
Reference in New Issue
Block a user