Add support for user-supplied executors (#3091)

This commit is contained in:
stefnotch
2024-10-16 15:24:07 +02:00
committed by GitHub
parent eba08ad592
commit ee66f6c395
3 changed files with 132 additions and 58 deletions

View File

@@ -32,8 +32,10 @@
use std::{future::Future, pin::Pin, sync::OnceLock};
use thiserror::Error;
pub(crate) type PinnedFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
pub(crate) type PinnedLocalFuture<T> = Pin<Box<dyn Future<Output = T>>>;
/// A future that has been pinned.
pub type PinnedFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
/// A future that has been pinned.
pub type PinnedLocalFuture<T> = Pin<Box<dyn Future<Output = T>>>;
static SPAWN: OnceLock<fn(PinnedFuture<()>)> = OnceLock::new();
static SPAWN_LOCAL: OnceLock<fn(PinnedLocalFuture<()>)> = OnceLock::new();
@@ -284,63 +286,42 @@ impl Executor {
.map_err(|_| ExecutorError::AlreadySet)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
#[cfg(feature = "futures-executor")]
#[test]
fn can_spawn_local_future() {
use crate::Executor;
use std::rc::Rc;
_ = Executor::init_futures_executor();
let rc = Rc::new(());
Executor::spawn_local(async {
_ = rc;
});
Executor::spawn(async {});
}
/// Globally sets a custom executor as the executor used to spawn tasks.
///
/// Returns `Err(_)` if an executor has already been set.
pub fn init_custom_executor(
custom_executor: impl CustomExecutor + 'static,
) -> Result<(), ExecutorError> {
static EXECUTOR: OnceLock<Box<dyn CustomExecutor>> = OnceLock::new();
EXECUTOR
.set(Box::new(custom_executor))
.map_err(|_| ExecutorError::AlreadySet)?;
#[cfg(feature = "futures-executor")]
#[test]
fn can_make_threaded_progress() {
use crate::Executor;
use std::sync::{atomic::AtomicUsize, Arc};
_ = Executor::init_futures_executor();
let counter = Arc::new(AtomicUsize::new(0));
Executor::spawn({
let counter = Arc::clone(&counter);
async move {
assert_eq!(
counter.fetch_add(1, std::sync::atomic::Ordering::AcqRel),
0
);
}
});
futures::executor::block_on(Executor::tick());
assert_eq!(counter.load(std::sync::atomic::Ordering::Acquire), 1);
}
#[cfg(feature = "futures-executor")]
#[test]
fn can_make_local_progress() {
use crate::Executor;
use std::sync::{atomic::AtomicUsize, Arc};
_ = Executor::init_futures_executor();
let counter = Arc::new(AtomicUsize::new(0));
Executor::spawn_local({
let counter = Arc::clone(&counter);
async move {
assert_eq!(
counter.fetch_add(1, std::sync::atomic::Ordering::AcqRel),
0
);
Executor::spawn_local(async {
// Should not crash
});
}
});
Executor::poll_local();
assert_eq!(counter.load(std::sync::atomic::Ordering::Acquire), 1);
SPAWN
.set(|fut| {
EXECUTOR.get().unwrap().spawn(fut);
})
.map_err(|_| ExecutorError::AlreadySet)?;
SPAWN_LOCAL
.set(|fut| EXECUTOR.get().unwrap().spawn_local(fut))
.map_err(|_| ExecutorError::AlreadySet)?;
POLL_LOCAL
.set(|| EXECUTOR.get().unwrap().poll_local())
.map_err(|_| ExecutorError::AlreadySet)?;
Ok(())
}
}
/// A trait for custom executors.
/// Custom executors can be used to integrate with any executor that supports spawning futures.
///
/// All methods can be called recursively.
pub trait CustomExecutor: Send + Sync {
/// Spawns a future, usually on a thread pool.
fn spawn(&self, fut: PinnedFuture<()>);
/// Spawns a local future. May require calling `poll_local` to make progress.
fn spawn_local(&self, fut: PinnedLocalFuture<()>);
/// Polls the executor, if it supports polling.
fn poll_local(&self);
}

View File

@@ -0,0 +1,55 @@
#[cfg(feature = "futures-executor")]
use any_spawner::{CustomExecutor, Executor, PinnedFuture, PinnedLocalFuture};
#[cfg(feature = "futures-executor")]
#[test]
fn can_create_custom_executor() {
use futures::{
executor::{LocalPool, LocalSpawner},
task::LocalSpawnExt,
};
use std::{
cell::RefCell,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
thread_local! {
static LOCAL_POOL: RefCell<LocalPool> = RefCell::new(LocalPool::new());
static SPAWNER: LocalSpawner = LOCAL_POOL.with(|pool| pool.borrow().spawner());
}
struct CustomFutureExecutor;
impl CustomExecutor for CustomFutureExecutor {
fn spawn(&self, _fut: PinnedFuture<()>) {
panic!("not supported in this test");
}
fn spawn_local(&self, fut: PinnedLocalFuture<()>) {
SPAWNER.with(|spawner| {
spawner.spawn_local(fut).expect("failed to spawn future");
});
}
fn poll_local(&self) {
LOCAL_POOL.with(|pool| {
if let Ok(mut pool) = pool.try_borrow_mut() {
pool.run_until_stalled();
}
// If we couldn't borrow_mut, we're in a nested call to poll, so we don't need to do anything.
});
}
}
Executor::init_custom_executor(CustomFutureExecutor)
.expect("couldn't set executor");
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
Executor::spawn_local(async move {
counter_clone.store(1, Ordering::Release);
});
Executor::poll_local();
assert_eq!(counter.load(Ordering::Acquire), 1);
}

View File

@@ -0,0 +1,38 @@
#[cfg(feature = "futures-executor")]
use any_spawner::Executor;
// All tests in this file use the same executor.
#[cfg(feature = "futures-executor")]
#[test]
fn can_spawn_local_future() {
use std::rc::Rc;
let _ = Executor::init_futures_executor();
let rc = Rc::new(());
Executor::spawn_local(async {
_ = rc;
});
Executor::spawn(async {});
}
#[cfg(feature = "futures-executor")]
#[test]
fn can_make_local_progress() {
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
let _ = Executor::init_futures_executor();
let counter = Arc::new(AtomicUsize::new(0));
Executor::spawn_local({
let counter = Arc::clone(&counter);
async move {
assert_eq!(counter.fetch_add(1, Ordering::AcqRel), 0);
Executor::spawn_local(async {
// Should not crash
});
}
});
Executor::poll_local();
assert_eq!(counter.load(Ordering::Acquire), 1);
}