feat: add missing Resource::write() and similar functions (see #3959) (#3984)

This commit is contained in:
Greg Johnston
2025-05-16 09:23:28 -04:00
committed by GitHub
parent cd48a6ac8c
commit a42fa452fc
6 changed files with 160 additions and 17 deletions

View File

@@ -14,11 +14,13 @@ use reactive_graph::{
ArcRwSignal, RwSignal,
},
traits::{
DefinedAt, IsDisposed, ReadUntracked, Track, Update, With, Write,
DefinedAt, IsDisposed, Notify, ReadUntracked, Track, UntrackableGuard,
Update, With, Write,
},
};
use std::{
future::{pending, Future, IntoFuture},
ops::DerefMut,
panic::Location,
};
@@ -157,6 +159,32 @@ impl<T> DefinedAt for ArcLocalResource<T> {
}
}
impl<T> Notify for ArcLocalResource<T>
where
T: 'static,
{
fn notify(&self) {
self.data.notify()
}
}
impl<T> Write for ArcLocalResource<T>
where
T: 'static,
{
type Value = Option<T>;
fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
self.data.try_write()
}
fn try_write_untracked(
&self,
) -> Option<impl DerefMut<Target = Self::Value>> {
self.data.try_write_untracked()
}
}
impl<T> ReadUntracked for ArcLocalResource<T>
where
T: 'static,
@@ -364,6 +392,32 @@ impl<T> DefinedAt for LocalResource<T> {
}
}
impl<T> Notify for LocalResource<T>
where
T: 'static,
{
fn notify(&self) {
self.data.notify()
}
}
impl<T> Write for LocalResource<T>
where
T: 'static,
{
type Value = Option<T>;
fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
self.data.try_write()
}
fn try_write_untracked(
&self,
) -> Option<impl DerefMut<Target = Self::Value>> {
self.data.try_write_untracked()
}
}
impl<T> ReadUntracked for LocalResource<T>
where
T: 'static,

View File

@@ -26,7 +26,7 @@ use reactive_graph::{
};
use std::{
future::{pending, IntoFuture},
ops::Deref,
ops::{Deref, DerefMut},
panic::Location,
sync::{
atomic::{AtomicBool, Ordering},
@@ -162,6 +162,32 @@ where
}
}
impl<T, Ser> Notify for ArcResource<T, Ser>
where
T: 'static,
{
fn notify(&self) {
self.data.notify()
}
}
impl<T, Ser> Write for ArcResource<T, Ser>
where
T: 'static,
{
type Value = Option<T>;
fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
self.data.try_write()
}
fn try_write_untracked(
&self,
) -> Option<impl DerefMut<Target = Self::Value>> {
self.data.try_write_untracked()
}
}
impl<T, Ser> ReadUntracked for ArcResource<T, Ser>
where
T: 'static,
@@ -842,6 +868,32 @@ where
}
}
impl<T, Ser> Notify for Resource<T, Ser>
where
T: Send + Sync + 'static,
{
fn notify(&self) {
self.data.notify()
}
}
impl<T, Ser> Write for Resource<T, Ser>
where
T: Send + Sync + 'static,
{
type Value = Option<T>;
fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
self.data.try_write()
}
fn try_write_untracked(
&self,
) -> Option<impl DerefMut<Target = Self::Value>> {
self.data.try_write_untracked()
}
}
impl<T, Ser> ReadUntracked for Resource<T, Ser>
where
T: Send + Sync + 'static,

View File

@@ -234,7 +234,8 @@ macro_rules! spawn_derived {
subscribers: SubscriberSet::new(),
state: AsyncDerivedState::Clean,
version: 0,
suspenses: Vec::new()
suspenses: Vec::new(),
pending_suspenses: Vec::new()
}));
let value = Arc::new(AsyncRwLock::new($initial));
let wakers = Arc::new(RwLock::new(Vec::new()));
@@ -364,7 +365,7 @@ macro_rules! spawn_derived {
// generate and assign new value
loading.store(true, Ordering::Relaxed);
let (this_version, suspense_ids) = {
let this_version = {
let mut guard = inner.write().or_poisoned();
guard.version += 1;
let version = guard.version;
@@ -372,14 +373,17 @@ macro_rules! spawn_derived {
.into_iter()
.map(|sc| sc.task_id())
.collect::<Vec<_>>();
(version, suspense_ids)
guard.pending_suspenses.extend(suspense_ids);
version
};
let new_value = fut.await;
drop(suspense_ids);
let latest_version = inner.read().or_poisoned().version;
let latest_version = {
let mut guard = inner.write().or_poisoned();
drop(mem::take(&mut guard.pending_suspenses));
guard.version
};
if latest_version == this_version {
Self::set_inner_value(new_value, value, wakers, inner, loading, Some(ready_tx)).await;
@@ -641,6 +645,14 @@ impl<T: 'static> Write for ArcAsyncDerived<T> {
type Value = Option<T>;
fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
// increment the version, such that a rerun triggered previously does not overwrite this
// new value
let mut guard = self.inner.write().or_poisoned();
guard.version += 1;
// tell any suspenses to stop waiting for this
drop(mem::take(&mut guard.pending_suspenses));
Some(MappedMut::new(
WriteGuard::new(self.clone(), self.value.blocking_write()),
|v| v.deref(),
@@ -651,6 +663,14 @@ impl<T: 'static> Write for ArcAsyncDerived<T> {
fn try_write_untracked(
&self,
) -> Option<impl DerefMut<Target = Self::Value>> {
// increment the version, such that a rerun triggered previously does not overwrite this
// new value
let mut guard = self.inner.write().or_poisoned();
guard.version += 1;
// tell any suspenses to stop waiting for this
drop(mem::take(&mut guard.pending_suspenses));
Some(MappedMut::new(
self.value.blocking_write(),
|v| v.deref(),

View File

@@ -14,8 +14,10 @@ use crate::{
unwrap_signal,
};
use core::fmt::Debug;
use or_poisoned::OrPoisoned;
use std::{
future::Future,
mem,
ops::{Deref, DerefMut},
panic::Location,
};
@@ -349,6 +351,17 @@ where
let guard = self
.inner
.try_with_value(|n| n.value.blocking_write_arc())?;
self.inner.try_with_value(|n| {
let mut guard = n.inner.write().or_poisoned();
// increment the version, such that a rerun triggered previously does not overwrite this
// new value
guard.version += 1;
// tell any suspenses to stop waiting for this
drop(mem::take(&mut guard.pending_suspenses));
});
Some(MappedMut::new(
WriteGuard::new(*self, guard),
|v| v.deref(),
@@ -359,6 +372,16 @@ where
fn try_write_untracked(
&self,
) -> Option<impl DerefMut<Target = Self::Value>> {
self.inner.try_with_value(|n| {
let mut guard = n.inner.write().or_poisoned();
// increment the version, such that a rerun triggered previously does not overwrite this
// new value
guard.version += 1;
// tell any suspenses to stop waiting for this
drop(mem::take(&mut guard.pending_suspenses));
});
self.inner
.try_with_value(|n| n.value.blocking_write_arc())
.map(|inner| {

View File

@@ -1,3 +1,4 @@
use super::suspense::TaskHandle;
use crate::{
channel::Sender,
computed::suspense::SuspenseContext,
@@ -22,6 +23,7 @@ pub(crate) struct ArcAsyncDerivedInner {
pub state: AsyncDerivedState,
pub version: usize,
pub suspenses: Vec<SuspenseContext>,
pub pending_suspenses: Vec<TaskHandle>,
}
#[derive(Debug, PartialEq, Eq)]

View File

@@ -104,15 +104,7 @@ pub mod prelude {
#[allow(unused)]
#[doc(hidden)]
pub fn log_warning(text: Arguments) {
#[cfg(feature = "tracing")]
{
tracing::warn!(text);
}
#[cfg(all(
not(feature = "tracing"),
target_arch = "wasm32",
target_os = "unknown"
))]
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
{
web_sys::console::warn_1(&text.to_string().into());
}