From 846ff2fefbb5812cefee260a0443e61513d057c8 Mon Sep 17 00:00:00 2001 From: Greg Johnston Date: Fri, 31 May 2024 11:39:32 -0400 Subject: [PATCH] feat: return an async guard from .await rather than cloning the value every time --- examples/fetch/src/lib.rs | 6 +- examples/hackernews/src/routes/stories.rs | 31 +++++---- examples/hackernews/src/routes/story.rs | 10 +-- examples/hackernews/src/routes/users.rs | 10 +-- examples/ssr_modes_axum/src/app.rs | 10 +-- examples/todo_app_sqlite_axum/Todos.db | Bin 16384 -> 16384 bytes examples/todo_app_sqlite_axum/src/todo.rs | 62 +++++++++-------- leptos_server/src/resource.rs | 7 +- .../async_derived/arc_async_derived.rs | 63 +++++++++++++++--- .../computed/async_derived/async_derived.rs | 4 +- .../computed/async_derived/future_impls.rs | 39 +++++++---- reactive_graph/src/signal/guards.rs | 58 +++++++++++++++- 12 files changed, 209 insertions(+), 91 deletions(-) diff --git a/examples/fetch/src/lib.rs b/examples/fetch/src/lib.rs index 79aa90efa..d648788eb 100644 --- a/examples/fetch/src/lib.rs +++ b/examples/fetch/src/lib.rs @@ -87,17 +87,19 @@ pub fn fetch_example() -> impl IntoView { diff --git a/examples/hackernews/src/routes/stories.rs b/examples/hackernews/src/routes/stories.rs index 6f6823e3c..5099f7b82 100644 --- a/examples/hackernews/src/routes/stories.rs +++ b/examples/hackernews/src/routes/stories.rs @@ -30,7 +30,6 @@ pub fn Stories() -> impl IntoView { params .read() .get("stories") - .map(ToOwned::to_owned) .unwrap_or_else(|| "top".to_string()) }; let stories = Resource::new_serde( @@ -44,7 +43,12 @@ pub fn Stories() -> impl IntoView { let hide_more_link = move || { Suspend(async move { - stories.await.unwrap_or_default().len() < 28 || pending.get() + stories + .await + .as_ref() + .map(|vec| vec.len() < 28) + .unwrap_or(true) + || pending.get() }) }; @@ -91,16 +95,19 @@ pub fn Stories() -> impl IntoView { // TODO set_pending on Transition //set_pending > - {move || Suspend(async move { match stories.await { - None => Either::Left(view! {

"Error loading stories."

}), - Some(stories) => { - Either::Right(view! { - - }) - } - }})} + + > +

"Error loading stories."

+
+ diff --git a/examples/hackernews/src/routes/story.rs b/examples/hackernews/src/routes/story.rs index 8cc1b7896..c530ca2e8 100644 --- a/examples/hackernews/src/routes/story.rs +++ b/examples/hackernews/src/routes/story.rs @@ -9,13 +9,7 @@ use leptos_router::hooks::use_params_map; pub fn Story() -> impl IntoView { let params = use_params_map(); let story = Resource::new_serde( - move || { - params - .get() - .get("id") - .map(ToOwned::to_owned) - .unwrap_or_default() - }, + move || params.read().get("id").unwrap_or_default(), move |id| async move { if id.is_empty() { None @@ -27,7 +21,7 @@ pub fn Story() -> impl IntoView { ); Suspense(SuspenseProps::builder().fallback(|| "Loading...").children(ToChildren::to_children(move || Suspend(async move { - match story.await { + match story.await.clone() { None => Either::Left("Story not found."), Some(story) => { Either::Right(view! { diff --git a/examples/hackernews/src/routes/users.rs b/examples/hackernews/src/routes/users.rs index 418f7b46c..7686938ce 100644 --- a/examples/hackernews/src/routes/users.rs +++ b/examples/hackernews/src/routes/users.rs @@ -7,13 +7,7 @@ use leptos_router::{hooks::use_params_map, *}; pub fn User() -> impl IntoView { let params = use_params_map(); let user = Resource::new_serde( - move || { - params - .read() - .get("id") - .map(ToOwned::to_owned) - .unwrap_or_default() - }, + move || params.read().get("id").unwrap_or_default(), move |id| async move { if id.is_empty() { None @@ -25,7 +19,7 @@ pub fn User() -> impl IntoView { view! {
- {move || Suspend(async move { match user.await { + {move || Suspend(async move { match user.await.clone() { None => Either::Left(view! {

"User not found."

}), Some(user) => Either::Right(view! {
diff --git a/examples/ssr_modes_axum/src/app.rs b/examples/ssr_modes_axum/src/app.rs index b17c5246c..65c90d879 100644 --- a/examples/ssr_modes_axum/src/app.rs +++ b/examples/ssr_modes_axum/src/app.rs @@ -60,7 +60,7 @@ fn HomePage() -> impl IntoView { let posts2 = Resource::new_serde(|| (), |_| list_post_metadata()); let posts2 = Resource::new( || (), - move |_| async move { posts2.await.unwrap_or_default().len() }, + move |_| async move { posts2.await.as_ref().map(Vec::len).unwrap_or(0) }, ); /*let posts_view = Suspend(async move { @@ -80,7 +80,7 @@ fn HomePage() -> impl IntoView { view! {

"My Great Blog"

"Loading posts..."

}> -

"number of posts: " {Suspend(posts2.into_future())}

+

"number of posts: " {Suspend(async move { *posts2.await })}

"Loading posts..."

}> //
    {posts_view}
@@ -119,7 +119,7 @@ fn Post() -> impl IntoView { }); let post_view = Suspend(async move { - match post_resource.await { + match post_resource.await.to_owned() { Ok(Ok(post)) => Ok(view! {

{post.title.clone()}

{post.content.clone()}

@@ -204,7 +204,7 @@ pub struct PostMetadata { #[server] pub async fn list_post_metadata() -> Result, ServerFnError> { - //tokio::time::sleep(std::time::Duration::from_secs(1)).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; Ok(POSTS .iter() .map(|data| PostMetadata { @@ -216,6 +216,6 @@ pub async fn list_post_metadata() -> Result, ServerFnError> { #[server] pub async fn get_post(id: usize) -> Result, ServerFnError> { - //tokio::time::sleep(std::time::Duration::from_secs(1)).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; Ok(POSTS.iter().find(|post| post.id == id).cloned()) } diff --git a/examples/todo_app_sqlite_axum/Todos.db b/examples/todo_app_sqlite_axum/Todos.db index 80928e45c04a376f435027ddd97631ea0e5a15c9..555f904895d35c606d799f8c76229d0578693186 100644 GIT binary patch delta 60 zcmZo@U~Fh$oFL7(V4{pOlI delta 60 zcmZo@U~Fh$oFL6OZ=#Gd impl IntoView { move |_| get_todos(), ); + let existing_todos = move || { + Suspend(async move { + todos + .await + .as_ref() + .map(|todos| { + if todos.is_empty() { + Either::Left(view! {

"No tasks were found."

}) + } else { + Either::Right( + todos + .iter() + .map(move |todo| { + let id = todo.id; + view! { +
  • + {todo.title.clone()} + + + + +
  • + } + }) + .collect::>(), + ) + } + }) + .map_err(Clone::clone) + }) + }; + view! { @@ -125,36 +157,8 @@ pub fn Todos() -> impl IntoView {
    "Loading..."

    }> }> - // {existing_todos}
      - {move || { - async move { - todos - .await - .map(|todos| { - if todos.is_empty() { - Either::Left(view! {

      "No tasks were found."

      }) - } else { - Either::Right( - todos - .into_iter() - .map(move |todo| { - view! { -
    • - {todo.title} - - - -
    • - } - }) - .collect::>(), - ) - } - }) - } - .wait() - }} + {existing_todos} {move || { submissions .get() diff --git a/leptos_server/src/resource.rs b/leptos_server/src/resource.rs index 10f822df8..aa2e71573 100644 --- a/leptos_server/src/resource.rs +++ b/leptos_server/src/resource.rs @@ -13,11 +13,12 @@ use hydration_context::SerializedDataId; use reactive_graph::{ computed::{ ArcAsyncDerived, ArcAsyncDerivedFuture, ArcMemo, AsyncDerived, - AsyncDerivedFuture, + AsyncDerivedFuture, AsyncDerivedGuard, }, graph::{Source, ToAnySource, ToAnySubscriber}, owner::Owner, prelude::*, + signal::guards::{AsyncPlain, Mapped, ReadGuard}, }; use std::{future::IntoFuture, ops::Deref}; @@ -242,7 +243,7 @@ impl IntoFuture for ArcResource where T: Clone + 'static, { - type Output = T; + type Output = AsyncDerivedGuard; type IntoFuture = ArcAsyncDerivedFuture; fn into_future(self) -> Self::IntoFuture { @@ -431,7 +432,7 @@ impl IntoFuture for Resource where T: Clone + Send + Sync + 'static, { - type Output = T; + type Output = AsyncDerivedGuard; type IntoFuture = AsyncDerivedFuture; #[track_caller] diff --git a/reactive_graph/src/computed/async_derived/arc_async_derived.rs b/reactive_graph/src/computed/async_derived/arc_async_derived.rs index 6cbacf15f..7c0517dba 100644 --- a/reactive_graph/src/computed/async_derived/arc_async_derived.rs +++ b/reactive_graph/src/computed/async_derived/arc_async_derived.rs @@ -12,11 +12,12 @@ use crate::{ SubscriberSet, ToAnySource, ToAnySubscriber, }, owner::{use_context, Owner}, - signal::guards::{Plain, ReadGuard}, + signal::guards::{AsyncPlain, Plain, ReadGuard}, traits::{DefinedAt, ReadUntracked}, transition::AsyncTransition, }; use any_spawner::Executor; +use async_lock::RwLock as AsyncRwLock; use core::fmt::Debug; use futures::{channel::oneshot, FutureExt, StreamExt}; use or_poisoned::OrPoisoned; @@ -35,13 +36,59 @@ pub struct ArcAsyncDerived { #[cfg(debug_assertions)] pub(crate) defined_at: &'static Location<'static>, // the current state of this signal - pub(crate) value: Arc>>, + pub(crate) value: Arc>>, // holds wakers generated when you .await this pub(crate) wakers: Arc>>, pub(crate) inner: Arc>, pub(crate) loading: Arc, } +pub(crate) trait BlockingLock { + fn blocking_read_arc(self: &Arc) + -> async_lock::RwLockReadGuardArc; + + fn blocking_read(&self) -> async_lock::RwLockReadGuard<'_, T>; + + fn blocking_write(&self) -> async_lock::RwLockWriteGuard<'_, T>; +} + +impl BlockingLock for AsyncRwLock { + fn blocking_read_arc( + self: &Arc, + ) -> async_lock::RwLockReadGuardArc { + #[cfg(not(target_family = "wasm"))] + { + self.read_arc_blocking() + } + #[cfg(target_family = "wasm")] + { + self.read_arc().now_or_never().unwrap() + } + } + + fn blocking_read(&self) -> async_lock::RwLockReadGuard<'_, T> { + #[cfg(not(target_family = "wasm"))] + { + self.read_blocking() + } + #[cfg(target_family = "wasm")] + { + self.read().now_or_never().unwrap() + } + } + + fn blocking_write(&self) -> async_lock::RwLockWriteGuard<'_, T> { + #[cfg(not(target_family = "wasm"))] + { + self.write_blocking() + } + #[cfg(target_family = "wasm")] + { + self.write().now_or_never().unwrap() + } + } +} + impl Clone for ArcAsyncDerived { fn clone(&self) -> Self { Self { @@ -96,7 +143,7 @@ macro_rules! spawn_derived { subscribers: SubscriberSet::new(), dirty: false })); - let value = Arc::new(RwLock::new($initial)); + let value = Arc::new(AsyncRwLock::new($initial)); let wakers = Arc::new(RwLock::new(Vec::new())); let this = ArcAsyncDerived { @@ -129,7 +176,7 @@ macro_rules! spawn_derived { let mut guard = this.inner.write().or_poisoned(); guard.dirty = false; - *value.write().or_poisoned() = Some(orig_value); + *value.blocking_write() = Some(orig_value); this.loading.store(false, Ordering::Relaxed); true } @@ -185,7 +232,7 @@ macro_rules! spawn_derived { // generate and assign new value let new_value = fut.await; loading.store(false, Ordering::Relaxed); - *value.write().or_poisoned() = Some(new_value); + *value.write().await = Some(new_value); inner.write().or_poisoned().dirty = true; ready_tx.send(()); @@ -271,11 +318,11 @@ impl ArcAsyncDerived { } impl ReadUntracked for ArcAsyncDerived { - type Value = ReadGuard, Plain>>; + type Value = ReadGuard, AsyncPlain>>; fn try_read_untracked(&self) -> Option { if let Some(suspense_context) = use_context::() { - if self.value.read().or_poisoned().is_none() { + if self.value.blocking_read().is_none() { let handle = suspense_context.task_id(); let ready = SpecialNonReactiveFuture::new(self.ready()); Executor::spawn(async move { @@ -284,7 +331,7 @@ impl ReadUntracked for ArcAsyncDerived { }); } } - Plain::try_new(Arc::clone(&self.value)).map(ReadGuard::new) + AsyncPlain::try_new(&self.value).map(ReadGuard::new) } } diff --git a/reactive_graph/src/computed/async_derived/async_derived.rs b/reactive_graph/src/computed/async_derived/async_derived.rs index 463c25dc5..285aa292b 100644 --- a/reactive_graph/src/computed/async_derived/async_derived.rs +++ b/reactive_graph/src/computed/async_derived/async_derived.rs @@ -7,7 +7,7 @@ use crate::{ ToAnySource, ToAnySubscriber, }, owner::StoredValue, - signal::guards::{Plain, ReadGuard}, + signal::guards::{AsyncPlain, Plain, ReadGuard}, traits::{DefinedAt, Dispose, ReadUntracked}, unwrap_signal, }; @@ -142,7 +142,7 @@ impl DefinedAt for AsyncDerived { } impl ReadUntracked for AsyncDerived { - type Value = ReadGuard, Plain>>; + type Value = ReadGuard, AsyncPlain>>; fn try_read_untracked(&self) -> Option { self.inner.get().map(|inner| inner.read_untracked()) diff --git a/reactive_graph/src/computed/async_derived/future_impls.rs b/reactive_graph/src/computed/async_derived/future_impls.rs index d11057db2..07b5ab41d 100644 --- a/reactive_graph/src/computed/async_derived/future_impls.rs +++ b/reactive_graph/src/computed/async_derived/future_impls.rs @@ -2,10 +2,11 @@ use super::{suspense::SuspenseContext, ArcAsyncDerived, AsyncDerived}; use crate::{ graph::{AnySource, ToAnySource}, owner::use_context, - signal::guards::Plain, + signal::guards::{AsyncPlain, Mapped, Plain, ReadGuard}, traits::{DefinedAt, Track}, unwrap_signal, }; +use futures::pin_mut; use or_poisoned::OrPoisoned; use pin_project_lite::pin_project; use std::{ @@ -18,6 +19,12 @@ use std::{ task::{Context, Poll, Waker}, }; +/// A read guard that holds access to an async derived resource. +/// +/// Implements [`Deref`](std::ops::Deref) to access the inner value. This should not be held longer +/// than it is needed, as it prevents updates to the inner value. +pub type AsyncDerivedGuard = ReadGuard>, T>>; + /// A [`Future`] that is ready when an [`ArcAsyncDerived`] is finished loading or reloading, /// but does not contain its value. pub struct ArcAsyncDerivedReadyFuture { @@ -45,7 +52,7 @@ impl Future for ArcAsyncDerivedReadyFuture { /// and contains its value. pub struct ArcAsyncDerivedFuture { source: AnySource, - value: Arc>>, + value: Arc>>, loading: Arc, wakers: Arc>>, } @@ -54,7 +61,7 @@ impl IntoFuture for ArcAsyncDerived where T: Clone + 'static, { - type Output = T; + type Output = AsyncDerivedGuard; type IntoFuture = ArcAsyncDerivedFuture; fn into_future(self) -> Self::IntoFuture { @@ -73,16 +80,24 @@ impl Future for ArcAsyncDerivedFuture where T: Clone + 'static, { - type Output = T; + type Output = AsyncDerivedGuard; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let waker = cx.waker(); self.source.track(); - if self.loading.load(Ordering::Relaxed) { - self.wakers.write().or_poisoned().push(waker.clone()); - Poll::Pending - } else { - Poll::Ready(self.value.read().or_poisoned().clone().unwrap()) + let value = self.value.read_arc(); + pin_mut!(value); + match (self.loading.load(Ordering::Relaxed), value.poll(cx)) { + (true, _) => { + self.wakers.write().or_poisoned().push(waker.clone()); + Poll::Pending + } + (_, Poll::Pending) => Poll::Pending, + (_, Poll::Ready(guard)) => Poll::Ready(ReadGuard::new( + Mapped::new_with_guard(AsyncPlain { guard }, |guard| { + guard.as_ref().unwrap() + }), + )), } } } @@ -101,7 +116,7 @@ impl IntoFuture for AsyncDerived where T: Send + Sync + Clone + 'static, { - type Output = T; + type Output = AsyncDerivedGuard; type IntoFuture = AsyncDerivedFuture; fn into_future(self) -> Self::IntoFuture { @@ -112,13 +127,11 @@ where } } -// this is implemented to output T by cloning it because read guards should not be held across -// .await points, and it's way too easy to trip up by doing that! impl Future for AsyncDerivedFuture where T: Send + Sync + Clone + 'static, { - type Output = T; + type Output = AsyncDerivedGuard; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); diff --git a/reactive_graph/src/signal/guards.rs b/reactive_graph/src/signal/guards.rs index 94537f975..be846cf2d 100644 --- a/reactive_graph/src/signal/guards.rs +++ b/reactive_graph/src/signal/guards.rs @@ -1,4 +1,4 @@ -use crate::traits::Trigger; +use crate::{computed::BlockingLock, traits::Trigger}; use core::fmt::Debug; use guardian::ArcRwLockReadGuardian; use std::{ @@ -120,6 +120,50 @@ impl Display for Plain { } } +pub struct AsyncPlain { + pub(crate) guard: async_lock::RwLockReadGuardArc, +} + +impl Debug for AsyncPlain { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AsyncPlain").finish() + } +} + +impl AsyncPlain { + pub(crate) fn try_new(inner: &Arc>) -> Option { + Some(Self { + guard: inner.blocking_read_arc(), + }) + } +} + +impl Deref for AsyncPlain { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.guard.deref() + } +} + +impl PartialEq for AsyncPlain { + fn eq(&self, other: &Self) -> bool { + **self == **other + } +} + +impl PartialEq for AsyncPlain { + fn eq(&self, other: &T) -> bool { + **self == *other + } +} + +impl Display for AsyncPlain { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&**self, f) + } +} + #[derive(Debug)] pub struct Mapped where @@ -139,6 +183,18 @@ impl Mapped, U> { } } +impl Mapped +where + Inner: Deref, +{ + pub(crate) fn new_with_guard( + inner: Inner, + map_fn: fn(&Inner::Target) -> &U, + ) -> Self { + Self { inner, map_fn } + } +} + impl Deref for Mapped where Inner: Deref,