feat: effect::immediate::batch (#4344)

* feat: `ImmediateEffect::new_mut_scoped`

* fix: `ImmediateEffect` debug info

* feat: `effect::immediate::batch`
This commit is contained in:
Greg Johnston
2025-10-10 11:31:57 -04:00
committed by GitHub
2 changed files with 107 additions and 1 deletions

View File

@@ -65,6 +65,7 @@ impl Dispose for ImmediateEffect {
impl ImmediateEffect {
/// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
/// (Unless [batch] is used.)
///
/// NOTE: this requires a `Fn` function because it might recurse.
/// Use [Self::new_mut] to pass a `FnMut` function, it'll panic on recursion.
@@ -82,6 +83,7 @@ impl ImmediateEffect {
Self { inner: Some(inner) }
}
/// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
/// (Unless [batch] is used.)
///
/// # Panics
/// Panics on recursion or if triggered in parallel. Also see [Self::new]
@@ -93,8 +95,10 @@ impl ImmediateEffect {
Self::new(move || fun.try_lock().expect(MSG)())
}
/// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
/// (Unless [batch] is used.)
///
/// NOTE: this requires a `Fn` function because it might recurse.
/// Use [Self::new_mut_scoped] to pass a `FnMut` function, it'll panic on recursion.
/// NOTE: this effect is automatically cleaned up when the current owner is cleared or disposed.
#[track_caller]
pub fn new_scoped(fun: impl Fn() + Send + Sync + 'static) {
@@ -102,6 +106,19 @@ impl ImmediateEffect {
on_cleanup(move || effect.dispose());
}
/// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
/// (Unless [batch] is used.)
///
/// NOTE: this effect is automatically cleaned up when the current owner is cleared or disposed.
///
/// # Panics
/// Panics on recursion or if triggered in parallel. Also see [Self::new_scoped]
#[track_caller]
pub fn new_mut_scoped(fun: impl FnMut() + Send + Sync + 'static) {
let effect = Self::new_mut(fun);
on_cleanup(move || effect.dispose());
}
/// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
///
@@ -130,6 +147,41 @@ impl DefinedAt for ImmediateEffect {
}
}
/// Defers any [ImmediateEffect]s from running until the end of the function.
///
/// NOTE: this affects only [ImmediateEffect]s, not other effects.
///
/// NOTE: this is rarely needed, but it is useful for example when multiple signals
/// need to be updated atomically (for example a double-bound signal tree).
pub fn batch<T>(f: impl FnOnce() -> T) -> T {
struct ExecuteOnDrop;
impl Drop for ExecuteOnDrop {
fn drop(&mut self) {
let effects = {
let mut batch = inner::BATCH.write().or_poisoned();
batch.take().unwrap().into_inner().expect("lock poisoned")
};
// TODO: Should we skip the effects if it's panicking?
for effect in effects {
effect.update_if_necessary();
}
}
}
let mut execute_on_drop = None;
{
let mut batch = inner::BATCH.write().or_poisoned();
if batch.is_none() {
execute_on_drop = Some(ExecuteOnDrop);
} else {
// Nested batching has no effect.
}
*batch = Some(batch.take().unwrap_or_default());
}
let ret = f();
drop(execute_on_drop);
ret
}
mod inner {
use crate::{
graph::{
@@ -140,6 +192,7 @@ mod inner {
owner::Owner,
traits::DefinedAt,
};
use indexmap::IndexSet;
use or_poisoned::OrPoisoned;
use std::{
panic::Location,
@@ -147,6 +200,11 @@ mod inner {
thread::{self, ThreadId},
};
/// Only the [super::batch] function ever writes to the outer RwLock.
/// While the effects will write to the inner one.
pub(super) static BATCH: RwLock<Option<RwLock<IndexSet<AnySubscriber>>>> =
RwLock::new(None);
/// Handles subscription logic for effects.
///
/// To handle parallelism and recursion we assign ordered (1..) ids to each run.
@@ -202,6 +260,8 @@ mod inner {
fun: impl Fn() + Send + Sync + 'static,
) -> Arc<RwLock<EffectInner>> {
let owner = Owner::new();
#[cfg(any(debug_assertions, leptos_debuginfo))]
let defined_at = Location::caller();
Arc::new_cyclic(|weak| {
let any_subscriber = AnySubscriber(
@@ -211,7 +271,7 @@ mod inner {
RwLock::new(EffectInner {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
defined_at,
owner,
state: ReactiveNodeState::Dirty,
run_count_start: 0,
@@ -260,6 +320,17 @@ mod inner {
ReactiveNodeState::Dirty => true,
};
{
if let Some(batch) = &*BATCH.read().or_poisoned() {
let mut batch = batch.write().or_poisoned();
let subscriber =
self.read().or_poisoned().any_subscriber.clone();
batch.insert(subscriber);
return needs_update;
}
}
if needs_update {
let mut guard = self.write().or_poisoned();

View File

@@ -225,3 +225,38 @@ fn threaded_chaos_effect() {
let values: Vec<_> = signals.iter().map(|s| s.get_untracked()).collect();
println!("FINAL: {values:?}");
}
#[cfg(feature = "effects")]
#[test]
fn test_batch() {
use imports::*;
use reactive_graph::{effect::batch, owner::StoredValue};
let owner = Owner::new();
owner.set();
let a = RwSignal::new(0);
let b = RwSignal::new(0);
let values = StoredValue::new(Vec::new());
ImmediateEffect::new_scoped(move || {
println!("{} = {}", a.get(), b.get());
values.write_value().push((a.get(), b.get()));
});
a.set(1);
b.set(1);
batch(move || {
a.set(2);
b.set(2);
batch(move || {
a.set(3);
b.set(3);
});
});
assert_eq!(values.get_value(), vec![(0, 0), (1, 0), (1, 1), (3, 3)]);
}