From a42fa452fc3079d2ce535010e70e37263180edfc Mon Sep 17 00:00:00 2001 From: Greg Johnston Date: Fri, 16 May 2025 09:23:28 -0400 Subject: [PATCH] feat: add missing `Resource::write()` and similar functions (see #3959) (#3984) --- leptos_server/src/local_resource.rs | 56 ++++++++++++++++++- leptos_server/src/resource.rs | 54 +++++++++++++++++- .../async_derived/arc_async_derived.rs | 32 +++++++++-- .../computed/async_derived/async_derived.rs | 23 ++++++++ .../src/computed/async_derived/inner.rs | 2 + reactive_graph/src/lib.rs | 10 +--- 6 files changed, 160 insertions(+), 17 deletions(-) diff --git a/leptos_server/src/local_resource.rs b/leptos_server/src/local_resource.rs index e518f3f27..4f9af28f6 100644 --- a/leptos_server/src/local_resource.rs +++ b/leptos_server/src/local_resource.rs @@ -14,11 +14,13 @@ use reactive_graph::{ ArcRwSignal, RwSignal, }, traits::{ - DefinedAt, IsDisposed, ReadUntracked, Track, Update, With, Write, + DefinedAt, IsDisposed, Notify, ReadUntracked, Track, UntrackableGuard, + Update, With, Write, }, }; use std::{ future::{pending, Future, IntoFuture}, + ops::DerefMut, panic::Location, }; @@ -157,6 +159,32 @@ impl DefinedAt for ArcLocalResource { } } +impl Notify for ArcLocalResource +where + T: 'static, +{ + fn notify(&self) { + self.data.notify() + } +} + +impl Write for ArcLocalResource +where + T: 'static, +{ + type Value = Option; + + fn try_write(&self) -> Option> { + self.data.try_write() + } + + fn try_write_untracked( + &self, + ) -> Option> { + self.data.try_write_untracked() + } +} + impl ReadUntracked for ArcLocalResource where T: 'static, @@ -364,6 +392,32 @@ impl DefinedAt for LocalResource { } } +impl Notify for LocalResource +where + T: 'static, +{ + fn notify(&self) { + self.data.notify() + } +} + +impl Write for LocalResource +where + T: 'static, +{ + type Value = Option; + + fn try_write(&self) -> Option> { + self.data.try_write() + } + + fn try_write_untracked( + &self, + ) -> Option> { + self.data.try_write_untracked() + } +} + impl ReadUntracked for LocalResource where T: 'static, diff --git a/leptos_server/src/resource.rs b/leptos_server/src/resource.rs index 9bedf8dd3..5dc4da144 100644 --- a/leptos_server/src/resource.rs +++ b/leptos_server/src/resource.rs @@ -26,7 +26,7 @@ use reactive_graph::{ }; use std::{ future::{pending, IntoFuture}, - ops::Deref, + ops::{Deref, DerefMut}, panic::Location, sync::{ atomic::{AtomicBool, Ordering}, @@ -162,6 +162,32 @@ where } } +impl Notify for ArcResource +where + T: 'static, +{ + fn notify(&self) { + self.data.notify() + } +} + +impl Write for ArcResource +where + T: 'static, +{ + type Value = Option; + + fn try_write(&self) -> Option> { + self.data.try_write() + } + + fn try_write_untracked( + &self, + ) -> Option> { + self.data.try_write_untracked() + } +} + impl ReadUntracked for ArcResource where T: 'static, @@ -842,6 +868,32 @@ where } } +impl Notify for Resource +where + T: Send + Sync + 'static, +{ + fn notify(&self) { + self.data.notify() + } +} + +impl Write for Resource +where + T: Send + Sync + 'static, +{ + type Value = Option; + + fn try_write(&self) -> Option> { + self.data.try_write() + } + + fn try_write_untracked( + &self, + ) -> Option> { + self.data.try_write_untracked() + } +} + impl ReadUntracked for Resource where T: Send + Sync + 'static, diff --git a/reactive_graph/src/computed/async_derived/arc_async_derived.rs b/reactive_graph/src/computed/async_derived/arc_async_derived.rs index 6501c7082..34f13494f 100644 --- a/reactive_graph/src/computed/async_derived/arc_async_derived.rs +++ b/reactive_graph/src/computed/async_derived/arc_async_derived.rs @@ -234,7 +234,8 @@ macro_rules! spawn_derived { subscribers: SubscriberSet::new(), state: AsyncDerivedState::Clean, version: 0, - suspenses: Vec::new() + suspenses: Vec::new(), + pending_suspenses: Vec::new() })); let value = Arc::new(AsyncRwLock::new($initial)); let wakers = Arc::new(RwLock::new(Vec::new())); @@ -364,7 +365,7 @@ macro_rules! spawn_derived { // generate and assign new value loading.store(true, Ordering::Relaxed); - let (this_version, suspense_ids) = { + let this_version = { let mut guard = inner.write().or_poisoned(); guard.version += 1; let version = guard.version; @@ -372,14 +373,17 @@ macro_rules! spawn_derived { .into_iter() .map(|sc| sc.task_id()) .collect::>(); - (version, suspense_ids) + guard.pending_suspenses.extend(suspense_ids); + version }; let new_value = fut.await; - drop(suspense_ids); - - let latest_version = inner.read().or_poisoned().version; + let latest_version = { + let mut guard = inner.write().or_poisoned(); + drop(mem::take(&mut guard.pending_suspenses)); + guard.version + }; if latest_version == this_version { Self::set_inner_value(new_value, value, wakers, inner, loading, Some(ready_tx)).await; @@ -641,6 +645,14 @@ impl Write for ArcAsyncDerived { type Value = Option; fn try_write(&self) -> Option> { + // increment the version, such that a rerun triggered previously does not overwrite this + // new value + let mut guard = self.inner.write().or_poisoned(); + guard.version += 1; + + // tell any suspenses to stop waiting for this + drop(mem::take(&mut guard.pending_suspenses)); + Some(MappedMut::new( WriteGuard::new(self.clone(), self.value.blocking_write()), |v| v.deref(), @@ -651,6 +663,14 @@ impl Write for ArcAsyncDerived { fn try_write_untracked( &self, ) -> Option> { + // increment the version, such that a rerun triggered previously does not overwrite this + // new value + let mut guard = self.inner.write().or_poisoned(); + guard.version += 1; + + // tell any suspenses to stop waiting for this + drop(mem::take(&mut guard.pending_suspenses)); + Some(MappedMut::new( self.value.blocking_write(), |v| v.deref(), diff --git a/reactive_graph/src/computed/async_derived/async_derived.rs b/reactive_graph/src/computed/async_derived/async_derived.rs index c0535dd3f..6b4d2cc32 100644 --- a/reactive_graph/src/computed/async_derived/async_derived.rs +++ b/reactive_graph/src/computed/async_derived/async_derived.rs @@ -14,8 +14,10 @@ use crate::{ unwrap_signal, }; use core::fmt::Debug; +use or_poisoned::OrPoisoned; use std::{ future::Future, + mem, ops::{Deref, DerefMut}, panic::Location, }; @@ -349,6 +351,17 @@ where let guard = self .inner .try_with_value(|n| n.value.blocking_write_arc())?; + + self.inner.try_with_value(|n| { + let mut guard = n.inner.write().or_poisoned(); + // increment the version, such that a rerun triggered previously does not overwrite this + // new value + guard.version += 1; + + // tell any suspenses to stop waiting for this + drop(mem::take(&mut guard.pending_suspenses)); + }); + Some(MappedMut::new( WriteGuard::new(*self, guard), |v| v.deref(), @@ -359,6 +372,16 @@ where fn try_write_untracked( &self, ) -> Option> { + self.inner.try_with_value(|n| { + let mut guard = n.inner.write().or_poisoned(); + // increment the version, such that a rerun triggered previously does not overwrite this + // new value + guard.version += 1; + + // tell any suspenses to stop waiting for this + drop(mem::take(&mut guard.pending_suspenses)); + }); + self.inner .try_with_value(|n| n.value.blocking_write_arc()) .map(|inner| { diff --git a/reactive_graph/src/computed/async_derived/inner.rs b/reactive_graph/src/computed/async_derived/inner.rs index c22640621..d25e36eb8 100644 --- a/reactive_graph/src/computed/async_derived/inner.rs +++ b/reactive_graph/src/computed/async_derived/inner.rs @@ -1,3 +1,4 @@ +use super::suspense::TaskHandle; use crate::{ channel::Sender, computed::suspense::SuspenseContext, @@ -22,6 +23,7 @@ pub(crate) struct ArcAsyncDerivedInner { pub state: AsyncDerivedState, pub version: usize, pub suspenses: Vec, + pub pending_suspenses: Vec, } #[derive(Debug, PartialEq, Eq)] diff --git a/reactive_graph/src/lib.rs b/reactive_graph/src/lib.rs index 039565534..fd1018961 100644 --- a/reactive_graph/src/lib.rs +++ b/reactive_graph/src/lib.rs @@ -104,15 +104,7 @@ pub mod prelude { #[allow(unused)] #[doc(hidden)] pub fn log_warning(text: Arguments) { - #[cfg(feature = "tracing")] - { - tracing::warn!(text); - } - #[cfg(all( - not(feature = "tracing"), - target_arch = "wasm32", - target_os = "unknown" - ))] + #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] { web_sys::console::warn_1(&text.to_string().into()); }