Compare commits

...

5 Commits

Author SHA1 Message Date
Greg Johnston
9f704f91fe Use local observers 2023-01-27 13:52:59 -05:00
Greg Johnston
4b0e212f1c Rc/RefCell => Arc/RwLock 2023-01-27 13:07:34 -05:00
Greg Johnston
8fa89cffef Make tests work on stable 2023-01-27 12:54:44 -05:00
Greg Johnston
18ceb4c4f8 Remove unnecessary log dependency 2023-01-27 12:54:11 -05:00
Greg Johnston
0d3e2baa9b remove unused base64 dependency 2023-01-27 12:52:08 -05:00
10 changed files with 198 additions and 176 deletions

View File

@@ -8,16 +8,16 @@ repository = "https://github.com/leptos-rs/leptos"
description = "Reactive system for the Leptos web framework."
[dependencies]
log = "0.4"
slotmap = { version = "1", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde-lite = { version = "0.3", optional = true }
futures = { version = "0.3" }
js-sys = "0.3"
lazy_static = "1"
miniserde = { version = "0.1", optional = true }
parking_lot = "0.12"
serde-wasm-bindgen = "0.4"
serde_json = "1"
base64 = "0.21"
thiserror = "1"
tokio = { version = "1", features = ["rt"], optional = true }
tracing = "0.1"

View File

@@ -57,7 +57,7 @@ where
let id = value.type_id();
_ = with_runtime(cx.runtime, |runtime| {
let mut contexts = runtime.scope_contexts.borrow_mut();
let mut contexts = runtime.scope_contexts.write();
let context = contexts.entry(cx.id).unwrap().or_insert_with(HashMap::new);
context.insert(id, Box::new(value) as Box<dyn Any>);
});
@@ -116,7 +116,7 @@ where
let id = TypeId::of::<T>();
with_runtime(cx.runtime, |runtime| {
let local_value = {
let contexts = runtime.scope_contexts.borrow();
let contexts = runtime.scope_contexts.read();
let context = contexts.get(cx.id);
context
.and_then(|context| context.get(&id).and_then(|val| val.downcast_ref::<T>()))
@@ -124,16 +124,12 @@ where
};
match local_value {
Some(val) => Some(val),
None => runtime
.scope_parents
.borrow()
.get(cx.id)
.and_then(|parent| {
use_context::<T>(Scope {
runtime: cx.runtime,
id: *parent,
})
}),
None => runtime.scope_parents.read().get(cx.id).and_then(|parent| {
use_context::<T>(Scope {
runtime: cx.runtime,
id: *parent,
})
}),
}
})
.ok()

View File

@@ -1,8 +1,8 @@
#![forbid(unsafe_code)]
use crate::runtime::{with_runtime, RuntimeId};
use crate::runtime::{with_runtime, LocalObserver, RuntimeId};
use crate::{debug_warn, Runtime, Scope, ScopeProperty};
use cfg_if::cfg_if;
use std::cell::RefCell;
use parking_lot::RwLock;
use std::fmt::Debug;
/// Effects run a certain chunk of code whenever the signals they depend on change.
@@ -23,7 +23,6 @@ use std::fmt::Debug;
/// the server, use [create_isomorphic_effect].
/// ```
/// # use leptos_reactive::*;
/// # use log::*;
/// # create_scope(create_runtime(), |cx| {
/// let (a, set_a) = create_signal(cx, 0);
/// let (b, set_b) = create_signal(cx, 0);
@@ -31,7 +30,7 @@ use std::fmt::Debug;
/// // ✅ use effects to interact between reactive state and the outside world
/// create_effect(cx, move |_| {
/// // immediately prints "Value: 0" and subscribes to `a`
/// log::debug!("Value: {}", a());
/// println!("Value: {}", a());
/// });
///
/// set_a(1);
@@ -80,7 +79,6 @@ where
/// the server as well as the client.
/// ```
/// # use leptos_reactive::*;
/// # use log::*;
/// # create_scope(create_runtime(), |cx| {
/// let (a, set_a) = create_signal(cx, 0);
/// let (b, set_b) = create_signal(cx, 0);
@@ -88,7 +86,7 @@ where
/// // ✅ use effects to interact between reactive state and the outside world
/// create_isomorphic_effect(cx, move |_| {
/// // immediately prints "Value: 0" and subscribes to `a`
/// log::debug!("Value: {}", a());
/// println!("Value: {}", a());
/// });
///
/// set_a(1);
@@ -152,7 +150,7 @@ where
F: Fn(Option<T>) -> T,
{
pub(crate) f: F,
pub(crate) value: RefCell<Option<T>>,
pub(crate) value: RwLock<Option<T>>,
#[cfg(debug_assertions)]
pub(crate) defined_at: &'static std::panic::Location<'static>,
}
@@ -179,22 +177,22 @@ where
)
)
)]
fn run(&self, id: EffectId, runtime: RuntimeId) {
_ = with_runtime(runtime, |runtime| {
fn run(&self, id: EffectId, runtime_id: RuntimeId) {
_ = with_runtime(runtime_id, |runtime| {
// clear previous dependencies
id.cleanup(runtime);
// set this as the current observer
let prev_observer = runtime.observer.take();
runtime.observer.set(Some(id));
let prev_observer = LocalObserver::take(runtime_id);
LocalObserver::set(runtime_id, Some(id));
// run the effect
let value = self.value.take();
let value = std::mem::take(&mut *self.value.write());
let new_value = (self.f)(value);
*self.value.borrow_mut() = Some(new_value);
*self.value.write() = Some(new_value);
// restore the previous observer
runtime.observer.set(prev_observer);
LocalObserver::set(runtime_id, prev_observer);
})
}
}
@@ -203,7 +201,7 @@ impl EffectId {
pub(crate) fn run<T>(&self, runtime_id: RuntimeId) {
_ = with_runtime(runtime_id, |runtime| {
let effect = {
let effects = runtime.effects.borrow();
let effects = runtime.effects.read();
effects.get(*self).cloned()
};
if let Some(effect) = effect {
@@ -226,12 +224,12 @@ impl EffectId {
)
)]
pub(crate) fn cleanup(&self, runtime: &Runtime) {
let sources = runtime.effect_sources.borrow();
let sources = runtime.effect_sources.read();
if let Some(sources) = sources.get(*self) {
let subs = runtime.signal_subscribers.borrow();
for source in sources.borrow().iter() {
let subs = runtime.signal_subscribers.read();
for source in sources.read().iter() {
if let Some(source) = subs.get(*source) {
source.borrow_mut().remove(self);
source.write().remove(self);
}
}
}

View File

@@ -146,7 +146,7 @@ macro_rules! debug_warn {
{
#[cfg(debug_assertions)]
{
log::warn!($($x)*)
tracing::warn!($($x)*)
}
#[cfg(not(debug_assertions))]
{ }

View File

@@ -33,7 +33,7 @@ use std::fmt::Debug;
/// let expensive = move || really_expensive_computation(value()); // lazy: doesn't run until called
/// create_effect(cx, move |_| {
/// // 🆗 run #1: calls `really_expensive_computation` the first time
/// log::debug!("expensive = {}", expensive());
/// println!("expensive = {}", expensive());
/// });
/// create_effect(cx, move |_| {
/// // ❌ run #2: this calls `really_expensive_computation` a second time!
@@ -46,7 +46,7 @@ use std::fmt::Debug;
/// let memoized = create_memo(cx, move |_| really_expensive_computation(value()));
/// create_effect(cx, move |_| {
/// // 🆗 reads the current value of the memo
/// log::debug!("memoized = {}", memoized());
/// println!("memoized = {}", memoized());
/// });
/// create_effect(cx, move |_| {
/// // ✅ reads the current value **without re-running the calculation**
@@ -103,7 +103,7 @@ where
/// let expensive = move || really_expensive_computation(value()); // lazy: doesn't run until called
/// create_effect(cx, move |_| {
/// // 🆗 run #1: calls `really_expensive_computation` the first time
/// log::debug!("expensive = {}", expensive());
/// println!("expensive = {}", expensive());
/// });
/// create_effect(cx, move |_| {
/// // ❌ run #2: this calls `really_expensive_computation` a second time!
@@ -116,7 +116,7 @@ where
/// let memoized = create_memo(cx, move |_| really_expensive_computation(value()));
/// create_effect(cx, move |_| {
/// // 🆗 reads the current value of the memo
/// log::debug!("memoized = {}", memoized());
/// println!("memoized = {}", memoized());
/// });
/// create_effect(cx, move |_| {
/// // ✅ reads the current value **without re-running the calculation**

View File

@@ -6,15 +6,10 @@ use crate::{
spawn::spawn_local,
use_context, Memo, ReadSignal, Scope, ScopeProperty, SuspenseContext, WriteSignal,
};
use parking_lot::RwLock;
use std::{
any::Any,
cell::{Cell, RefCell},
collections::HashSet,
fmt::Debug,
future::Future,
marker::PhantomData,
pin::Pin,
rc::Rc,
any::Any, collections::HashSet, fmt::Debug, future::Future, marker::PhantomData, pin::Pin,
sync::Arc,
};
/// Creates [Resource](crate::Resource), which is a signal that reflects the
@@ -113,10 +108,10 @@ where
let (loading, set_loading) = create_signal(cx, false);
let fetcher = Rc::new(move |s| Box::pin(fetcher(s)) as Pin<Box<dyn Future<Output = T>>>);
let fetcher = Arc::new(move |s| Box::pin(fetcher(s)) as Pin<Box<dyn Future<Output = T>>>);
let source = create_memo(cx, move |_| source());
let r = Rc::new(ResourceState {
let r = Arc::new(ResourceState {
scope: cx,
value,
set_value,
@@ -124,18 +119,18 @@ where
set_loading,
source,
fetcher,
resolved: Rc::new(Cell::new(resolved)),
scheduled: Rc::new(Cell::new(false)),
resolved: Arc::new(RwLock::new(resolved)),
scheduled: Arc::new(RwLock::new(false)),
suspense_contexts: Default::default(),
});
let id = with_runtime(cx.runtime, |runtime| {
runtime.create_serializable_resource(Rc::clone(&r))
runtime.create_serializable_resource(Arc::clone(&r))
})
.expect("tried to create a Resource in a Runtime that has been disposed.");
create_isomorphic_effect(cx, {
let r = Rc::clone(&r);
let r = Arc::clone(&r);
move |_| {
load_resource(cx, id, r.clone());
}
@@ -233,10 +228,10 @@ where
let (loading, set_loading) = create_signal(cx, false);
let fetcher = Rc::new(move |s| Box::pin(fetcher(s)) as Pin<Box<dyn Future<Output = T>>>);
let fetcher = Arc::new(move |s| Box::pin(fetcher(s)) as Pin<Box<dyn Future<Output = T>>>);
let source = create_memo(cx, move |_| source());
let r = Rc::new(ResourceState {
let r = Arc::new(ResourceState {
scope: cx,
value,
set_value,
@@ -244,18 +239,18 @@ where
set_loading,
source,
fetcher,
resolved: Rc::new(Cell::new(resolved)),
scheduled: Rc::new(Cell::new(false)),
resolved: Arc::new(RwLock::new(resolved)),
scheduled: Arc::new(RwLock::new(false)),
suspense_contexts: Default::default(),
});
let id = with_runtime(cx.runtime, |runtime| {
runtime.create_unserializable_resource(Rc::clone(&r))
runtime.create_unserializable_resource(Arc::clone(&r))
})
.expect("tried to create a Resource in a runtime that has been disposed.");
create_effect(cx, {
let r = Rc::clone(&r);
let r = Arc::clone(&r);
// This is a local resource, so we're always going to handle it on the
// client
move |_| r.load(false)
@@ -274,7 +269,7 @@ where
}
#[cfg(not(feature = "hydrate"))]
fn load_resource<S, T>(_cx: Scope, _id: ResourceId, r: Rc<ResourceState<S, T>>)
fn load_resource<S, T>(_cx: Scope, _id: ResourceId, r: Arc<ResourceState<S, T>>)
where
S: PartialEq + Debug + Clone + 'static,
T: 'static,
@@ -283,7 +278,7 @@ where
}
#[cfg(feature = "hydrate")]
fn load_resource<S, T>(cx: Scope, id: ResourceId, r: Rc<ResourceState<S, T>>)
fn load_resource<S, T>(cx: Scope, id: ResourceId, r: Arc<ResourceState<S, T>>)
where
S: PartialEq + Debug + Clone + 'static,
T: Serializable + 'static,
@@ -291,12 +286,12 @@ where
use wasm_bindgen::{JsCast, UnwrapThrowExt};
_ = with_runtime(cx.runtime, |runtime| {
let mut context = runtime.shared_context.borrow_mut();
let mut context = runtime.shared_context.write();
if let Some(data) = context.resolved_resources.remove(&id) {
// The server already sent us the serialized resource value, so
// deserialize & set it now
context.pending_resources.remove(&id); // no longer pending
r.resolved.set(true);
*r.resolved.write() = true;
let res = T::from_json(&data).expect_throw("could not deserialize Resource JSON");
@@ -318,7 +313,7 @@ where
move |res: String| {
let res =
T::from_json(&res).expect_throw("could not deserialize Resource JSON");
resolved.set(true);
*resolved.write() = true;
set_value.update(|n| *n = Some(res));
set_loading.update(|n| *n = false);
}
@@ -551,10 +546,10 @@ where
set_loading: WriteSignal<bool>,
source: Memo<S>,
#[allow(clippy::type_complexity)]
fetcher: Rc<dyn Fn(S) -> Pin<Box<dyn Future<Output = T>>>>,
resolved: Rc<Cell<bool>>,
scheduled: Rc<Cell<bool>>,
suspense_contexts: Rc<RefCell<HashSet<SuspenseContext>>>,
fetcher: Arc<dyn Fn(S) -> Pin<Box<dyn Future<Output = T>>>>,
resolved: Arc<RwLock<bool>>,
scheduled: Arc<RwLock<bool>>,
suspense_contexts: Arc<RwLock<HashSet<SuspenseContext>>>,
}
impl<S, T> ResourceState<S, T>
@@ -583,7 +578,7 @@ where
let increment = move |_: Option<()>| {
if let Some(s) = &suspense_cx {
let mut contexts = suspense_contexts.borrow_mut();
let mut contexts = suspense_contexts.write();
if !contexts.contains(s) {
contexts.insert(*s);
@@ -607,21 +602,22 @@ where
fn load(&self, refetching: bool) {
// doesn't refetch if already refetching
if refetching && self.scheduled.get() {
let mut scheduled = self.scheduled.write();
if refetching && *scheduled {
return;
}
self.scheduled.set(false);
*scheduled = false;
_ = self.source.try_with(|source| {
let fut = (self.fetcher)(source.clone());
// `scheduled` is true for the rest of this code only
self.scheduled.set(true);
*self.scheduled.write() = true;
queue_microtask({
let scheduled = Rc::clone(&self.scheduled);
let scheduled = Arc::clone(&self.scheduled);
move || {
scheduled.set(false);
*scheduled.write() = false;
}
});
@@ -630,7 +626,7 @@ where
// increment counter everywhere it's read
let suspense_contexts = self.suspense_contexts.clone();
for suspense_context in suspense_contexts.borrow().iter() {
for suspense_context in suspense_contexts.read().iter() {
suspense_context.increment();
}
@@ -642,12 +638,12 @@ where
async move {
let res = fut.await;
resolved.set(true);
*resolved.write() = true;
set_value.update(|n| *n = Some(res));
set_loading.update(|n| *n = false);
for suspense_context in suspense_contexts.borrow().iter() {
for suspense_context in suspense_contexts.read().iter() {
suspense_context.decrement();
}
}
@@ -686,8 +682,8 @@ where
}
pub(crate) enum AnyResource {
Unserializable(Rc<dyn UnserializableResource>),
Serializable(Rc<dyn SerializableResource>),
Unserializable(Arc<dyn UnserializableResource + Send + Sync>),
Serializable(Arc<dyn SerializableResource + Send + Sync>),
}
pub(crate) trait SerializableResource {

View File

@@ -6,28 +6,30 @@ use crate::{
};
use cfg_if::cfg_if;
use futures::stream::FuturesUnordered;
use lazy_static::lazy_static;
use parking_lot::RwLock;
use slotmap::{SecondaryMap, SlotMap, SparseSecondaryMap};
use std::{
any::{Any, TypeId},
cell::{Cell, RefCell},
cell::RefCell,
collections::{HashMap, HashSet},
fmt::Debug,
future::Future,
marker::PhantomData,
pin::Pin,
rc::Rc,
sync::Arc,
};
pub(crate) type PinnedFuture<T> = Pin<Box<dyn Future<Output = T>>>;
cfg_if! {
if #[cfg(any(feature = "csr", feature = "hydrate"))] {
thread_local! {
pub(crate) static RUNTIME: Runtime = Runtime::new();
lazy_static! {
pub(crate) static ref RUNTIME: Runtime = Runtime::new();
}
} else {
thread_local! {
pub(crate) static RUNTIMES: RefCell<SlotMap<RuntimeId, Runtime>> = Default::default();
lazy_static! {
pub(crate) static ref RUNTIMES: RwLock<SlotMap<RuntimeId, Runtime>> = Default::default();
}
}
}
@@ -39,15 +41,13 @@ pub(crate) fn with_runtime<T>(id: RuntimeId, f: impl FnOnce(&Runtime) -> T) -> R
cfg_if! {
if #[cfg(any(feature = "csr", feature = "hydrate"))] {
_ = id;
Ok(RUNTIME.with(|runtime| f(runtime)))
Ok(f(&RUNTIME))
} else {
RUNTIMES.with(|runtimes| {
let runtimes = runtimes.borrow();
match runtimes.get(id) {
None => Err(()),
Some(runtime) => Ok(f(runtime))
}
})
let runtimes = RUNTIMES.read();
match runtimes.get(id) {
None => Err(()),
Some(runtime) => Ok(f(runtime))
}
}
}
}
@@ -60,7 +60,7 @@ pub fn create_runtime() -> RuntimeId {
if #[cfg(any(feature = "csr", feature = "hydrate"))] {
Default::default()
} else {
RUNTIMES.with(|runtimes| runtimes.borrow_mut().insert(Runtime::new()))
RUNTIMES.with(|runtimes| runtimes.write().insert(Runtime::new()))
}
}
}
@@ -75,7 +75,7 @@ impl RuntimeId {
pub fn dispose(self) {
cfg_if! {
if #[cfg(not(any(feature = "csr", feature = "hydrate")))] {
let runtime = RUNTIMES.with(move |runtimes| runtimes.borrow_mut().remove(self));
let runtime = RUNTIMES.with(move |runtimes| runtimes.write().remove(self));
drop(runtime);
}
}
@@ -83,7 +83,7 @@ impl RuntimeId {
pub(crate) fn raw_scope_and_disposer(self) -> (Scope, ScopeDisposer) {
with_runtime(self, |runtime| {
let id = { runtime.scopes.borrow_mut().insert(Default::default()) };
let id = { runtime.scopes.write().insert(Default::default()) };
let scope = Scope { runtime: self, id };
let disposer = ScopeDisposer(Box::new(move || scope.dispose()));
(scope, disposer)
@@ -97,9 +97,9 @@ impl RuntimeId {
parent: Option<Scope>,
) -> (T, ScopeId, ScopeDisposer) {
with_runtime(self, |runtime| {
let id = { runtime.scopes.borrow_mut().insert(Default::default()) };
let id = { runtime.scopes.write().insert(Default::default()) };
if let Some(parent) = parent {
runtime.scope_parents.borrow_mut().insert(id, parent.id);
runtime.scope_parents.write().insert(id, parent.id);
}
let scope = Scope { runtime: self, id };
let val = f(scope);
@@ -121,10 +121,7 @@ impl RuntimeId {
T: Any + 'static,
{
let id = with_runtime(self, |runtime| {
runtime
.signals
.borrow_mut()
.insert(Rc::new(RefCell::new(value)))
runtime.signals.write().insert(Arc::new(RwLock::new(value)))
})
.expect("tried to create a signal in a runtime that has been disposed");
(
@@ -150,10 +147,7 @@ impl RuntimeId {
T: Any + 'static,
{
let id = with_runtime(self, |runtime| {
runtime
.signals
.borrow_mut()
.insert(Rc::new(RefCell::new(value)))
runtime.signals.write().insert(Arc::new(RwLock::new(value)))
})
.expect("tried to create a signal in a runtime that has been disposed");
RwSignal {
@@ -176,11 +170,11 @@ impl RuntimeId {
with_runtime(self, |runtime| {
let effect = Effect {
f,
value: RefCell::new(None),
value: RwLock::new(None),
#[cfg(debug_assertions)]
defined_at,
};
let id = { runtime.effects.borrow_mut().insert(Rc::new(effect)) };
let id = { runtime.effects.write().insert(Arc::new(effect)) };
id.run::<T>(self);
id
})
@@ -219,33 +213,63 @@ impl RuntimeId {
#[derive(Default)]
pub(crate) struct Runtime {
pub shared_context: RefCell<SharedContext>,
pub observer: Cell<Option<EffectId>>,
pub scopes: RefCell<SlotMap<ScopeId, RefCell<Vec<ScopeProperty>>>>,
pub scope_parents: RefCell<SparseSecondaryMap<ScopeId, ScopeId>>,
pub scope_children: RefCell<SparseSecondaryMap<ScopeId, Vec<ScopeId>>>,
pub shared_context: RwLock<SharedContext>,
pub scopes: RwLock<SlotMap<ScopeId, RwLock<Vec<ScopeProperty>>>>,
pub scope_parents: RwLock<SparseSecondaryMap<ScopeId, ScopeId>>,
pub scope_children: RwLock<SparseSecondaryMap<ScopeId, Vec<ScopeId>>>,
#[allow(clippy::type_complexity)]
pub scope_contexts: RefCell<SparseSecondaryMap<ScopeId, HashMap<TypeId, Box<dyn Any>>>>,
pub scope_contexts: RwLock<SparseSecondaryMap<ScopeId, HashMap<TypeId, Box<dyn Any>>>>,
#[allow(clippy::type_complexity)]
pub scope_cleanups: RefCell<SparseSecondaryMap<ScopeId, Vec<Box<dyn FnOnce()>>>>,
pub signals: RefCell<SlotMap<SignalId, Rc<RefCell<dyn Any>>>>,
pub signal_subscribers: RefCell<SecondaryMap<SignalId, RefCell<HashSet<EffectId>>>>,
pub effects: RefCell<SlotMap<EffectId, Rc<dyn AnyEffect>>>,
pub effect_sources: RefCell<SecondaryMap<EffectId, RefCell<HashSet<SignalId>>>>,
pub resources: RefCell<SlotMap<ResourceId, AnyResource>>,
pub scope_cleanups: RwLock<SparseSecondaryMap<ScopeId, Vec<Box<dyn FnOnce()>>>>,
pub signals: RwLock<SlotMap<SignalId, Arc<RwLock<dyn Any>>>>,
pub signal_subscribers: RwLock<SecondaryMap<SignalId, RwLock<HashSet<EffectId>>>>,
pub effects: RwLock<SlotMap<EffectId, Arc<dyn AnyEffect>>>,
pub effect_sources: RwLock<SecondaryMap<EffectId, RwLock<HashSet<SignalId>>>>,
pub resources: RwLock<SlotMap<ResourceId, AnyResource>>,
}
// track current observer thread-locally
// because effects run synchronously, the current observer
// *in this thread* will not change during the execution of an effect.
// but if we track this across threads, it's possible for overlapping
// executions to cause the stack to be out of order
// so we store at most one current observer per runtime, per thread
thread_local! {
static OBSERVER: RefCell<HashMap<RuntimeId, EffectId>> = Default::default();
}
pub(crate) struct LocalObserver {}
impl LocalObserver {
pub fn take(runtime: RuntimeId) -> Option<EffectId> {
OBSERVER.with(|observer| observer.borrow_mut().remove(&runtime))
}
pub fn get(runtime: RuntimeId) -> Option<EffectId> {
OBSERVER.with(|observer| observer.borrow().get(&runtime).copied())
}
pub fn set(runtime: RuntimeId, effect: Option<EffectId>) {
OBSERVER.with(|observer| {
if let Some(value) = effect {
observer.borrow_mut().insert(runtime, value)
} else {
observer.borrow_mut().remove(&runtime)
}
});
}
}
impl Debug for Runtime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Runtime")
.field("shared_context", &self.shared_context)
.field("observer", &self.observer)
.field("scopes", &self.scopes)
.field("scope_parents", &self.scope_parents)
.field("scope_children", &self.scope_children)
.field("signals", &self.signals)
.field("signal_subscribers", &self.signal_subscribers)
.field("effects", &self.effects.borrow().len())
.field("effects", &self.effects.read().len())
.field("effect_sources", &self.effect_sources)
.finish()
}
@@ -258,27 +282,27 @@ impl Runtime {
pub(crate) fn create_unserializable_resource<S, T>(
&self,
state: Rc<ResourceState<S, T>>,
state: Arc<ResourceState<S, T>>,
) -> ResourceId
where
S: Clone + 'static,
T: 'static,
{
self.resources
.borrow_mut()
.write()
.insert(AnyResource::Unserializable(state))
}
pub(crate) fn create_serializable_resource<S, T>(
&self,
state: Rc<ResourceState<S, T>>,
state: Arc<ResourceState<S, T>>,
) -> ResourceId
where
S: Clone + 'static,
T: Serializable + 'static,
{
self.resources
.borrow_mut()
.write()
.insert(AnyResource::Serializable(state))
}
@@ -291,7 +315,7 @@ impl Runtime {
S: 'static,
T: 'static,
{
let resources = self.resources.borrow();
let resources = self.resources.read();
let res = resources.get(id);
if let Some(res) = res {
let res_state = match res {
@@ -317,7 +341,7 @@ impl Runtime {
/// Returns IDs for all [Resource]s found on any scope.
pub(crate) fn all_resources(&self) -> Vec<ResourceId> {
self.resources
.borrow()
.read()
.iter()
.map(|(resource_id, _)| resource_id)
.collect()
@@ -326,7 +350,7 @@ impl Runtime {
/// Returns IDs for all [Resource]s found on any scope, pending from the server.
pub(crate) fn pending_resources(&self) -> Vec<ResourceId> {
self.resources
.borrow()
.read()
.iter()
.filter_map(|(resource_id, res)| {
if matches!(res, AnyResource::Serializable(_)) {
@@ -342,7 +366,7 @@ impl Runtime {
&self,
) -> FuturesUnordered<PinnedFuture<(ResourceId, String)>> {
let f = FuturesUnordered::new();
for (id, resource) in self.resources.borrow().iter() {
for (id, resource) in self.resources.read().iter() {
if let AnyResource::Serializable(resource) = resource {
f.push(resource.to_serialization_resolver(id));
}

View File

@@ -1,6 +1,6 @@
#![forbid(unsafe_code)]
use crate::{
runtime::{with_runtime, RuntimeId},
runtime::{with_runtime, LocalObserver, RuntimeId},
EffectId, PinnedFuture, ResourceId, SignalId, SuspenseContext,
};
use futures::stream::FuturesUnordered;
@@ -108,7 +108,7 @@ impl Scope {
pub fn run_child_scope<T>(self, f: impl FnOnce(Scope) -> T) -> (T, ScopeDisposer) {
let (res, child_id, disposer) = self.runtime.run_scope_undisposed(f, Some(self));
_ = with_runtime(self.runtime, |runtime| {
let mut children = runtime.scope_children.borrow_mut();
let mut children = runtime.scope_children.write();
children
.entry(self.id)
.expect("trying to add a child to a Scope that has already been disposed")
@@ -145,9 +145,9 @@ impl Scope {
/// ```
pub fn untrack<T>(&self, f: impl FnOnce() -> T) -> T {
with_runtime(self.runtime, |runtime| {
let prev_observer = runtime.observer.take();
let prev_observer = LocalObserver::take(self.runtime);
let untracked_result = f();
runtime.observer.set(prev_observer);
LocalObserver::set(self.runtime, prev_observer);
untracked_result
})
.expect("tried to run untracked function in a runtime that has been disposed")
@@ -167,7 +167,7 @@ impl Scope {
_ = with_runtime(self.runtime, |runtime| {
// dispose of all child scopes
let children = {
let mut children = runtime.scope_children.borrow_mut();
let mut children = runtime.scope_children.write();
children.remove(self.id)
};
@@ -182,7 +182,7 @@ impl Scope {
}
// run cleanups
if let Some(cleanups) = runtime.scope_cleanups.borrow_mut().remove(self.id) {
if let Some(cleanups) = runtime.scope_cleanups.write().remove(self.id) {
for cleanup in cleanups {
cleanup();
}
@@ -190,34 +190,34 @@ impl Scope {
// remove everything we own and run cleanups
let owned = {
let owned = runtime.scopes.borrow_mut().remove(self.id);
owned.map(|owned| owned.take())
let owned = runtime.scopes.write().remove(self.id);
owned.map(|owned| std::mem::take(&mut *owned.write()))
};
if let Some(owned) = owned {
for property in owned {
match property {
ScopeProperty::Signal(id) => {
// remove the signal
runtime.signals.borrow_mut().remove(id);
let subs = runtime.signal_subscribers.borrow_mut().remove(id);
runtime.signals.write().remove(id);
let subs = runtime.signal_subscribers.write().remove(id);
// each of the subs needs to remove the signal from its dependencies
// so that it doesn't try to read the (now disposed) signal
if let Some(subs) = subs {
let source_map = runtime.effect_sources.borrow();
for effect in subs.borrow().iter() {
let source_map = runtime.effect_sources.read();
for effect in subs.read().iter() {
if let Some(effect_sources) = source_map.get(*effect) {
effect_sources.borrow_mut().remove(&id);
effect_sources.write().remove(&id);
}
}
}
}
ScopeProperty::Effect(id) => {
runtime.effects.borrow_mut().remove(id);
runtime.effect_sources.borrow_mut().remove(id);
runtime.effects.write().remove(id);
runtime.effect_sources.write().remove(id);
}
ScopeProperty::Resource(id) => {
runtime.resources.borrow_mut().remove(id);
runtime.resources.write().remove(id);
}
}
}
@@ -227,18 +227,18 @@ impl Scope {
pub(crate) fn with_scope_property(&self, f: impl FnOnce(&mut Vec<ScopeProperty>)) {
_ = with_runtime(self.runtime, |runtime| {
let scopes = runtime.scopes.borrow();
let scopes = runtime.scopes.read();
let scope = scopes
.get(self.id)
.expect("tried to add property to a scope that has been disposed");
f(&mut scope.borrow_mut());
f(&mut scope.write());
})
}
/// Returns the the parent Scope, if any.
pub fn parent(&self) -> Option<Scope> {
with_runtime(self.runtime, |runtime| {
runtime.scope_parents.borrow().get(self.id).copied()
runtime.scope_parents.read().get(self.id).copied()
})
.ok()
.flatten()
@@ -255,7 +255,7 @@ impl Scope {
/// are invalidated.
pub fn on_cleanup(cx: Scope, cleanup_fn: impl FnOnce() + 'static) {
_ = with_runtime(cx.runtime, |runtime| {
let mut cleanups = runtime.scope_cleanups.borrow_mut();
let mut cleanups = runtime.scope_cleanups.write();
let cleanups = cleanups
.entry(cx.id)
.expect("trying to clean up a Scope that has already been disposed")
@@ -327,7 +327,7 @@ impl Scope {
use futures::StreamExt;
_ = with_runtime(self.runtime, |runtime| {
let mut shared_context = runtime.shared_context.borrow_mut();
let mut shared_context = runtime.shared_context.write();
let (tx, mut rx) = futures::channel::mpsc::unbounded();
create_isomorphic_effect(*self, move |_| {
@@ -355,7 +355,7 @@ impl Scope {
/// `<Suspense/>` HTML when all resources are resolved.
pub fn pending_fragments(&self) -> HashMap<String, (String, PinnedFuture<String>)> {
with_runtime(self.runtime, |runtime| {
let mut shared_context = runtime.shared_context.borrow_mut();
let mut shared_context = runtime.shared_context.write();
std::mem::take(&mut shared_context.pending_fragments)
})
.unwrap_or_default()

View File

@@ -1,7 +1,7 @@
#![forbid(unsafe_code)]
use crate::{
debug_warn,
runtime::{with_runtime, RuntimeId},
runtime::{with_runtime, LocalObserver, RuntimeId},
Runtime, Scope, ScopeProperty, UntrackedGettableSignal, UntrackedSettableSignal,
};
use cfg_if::cfg_if;
@@ -246,7 +246,9 @@ where
#[cfg(feature = "hydrate")]
pub(crate) fn subscribe(&self) {
_ = with_runtime(self.runtime, |runtime| self.id.subscribe(runtime))
_ = with_runtime(self.runtime, |runtime| {
self.id.subscribe(self.runtime, runtime)
})
}
/// Clones and returns the current value of the signal, and subscribes
@@ -286,7 +288,9 @@ where
/// Applies the function to the current Signal, if it exists, and subscribes
/// the running effect.
pub(crate) fn try_with<U>(&self, f: impl FnOnce(&T) -> U) -> Result<U, SignalError> {
match with_runtime(self.runtime, |runtime| self.id.try_with(runtime, f)) {
match with_runtime(self.runtime, |runtime| {
self.id.try_with(self.runtime, runtime, f)
}) {
Ok(Ok(v)) => Ok(v),
Ok(Err(e)) => Err(e),
Err(_) => Err(SignalError::RuntimeDisposed),
@@ -1169,20 +1173,20 @@ pub(crate) enum SignalError {
}
impl SignalId {
pub(crate) fn subscribe(&self, runtime: &Runtime) {
pub(crate) fn subscribe(&self, runtime_id: RuntimeId, runtime: &Runtime) {
// add subscriber
if let Some(observer) = runtime.observer.get() {
if let Some(observer) = LocalObserver::get(runtime_id) {
// add this observer to the signal's dependencies (to allow notification)
let mut subs = runtime.signal_subscribers.borrow_mut();
let mut subs = runtime.signal_subscribers.write();
if let Some(subs) = subs.entry(*self) {
subs.or_default().borrow_mut().insert(observer);
subs.or_default().write().insert(observer);
}
// add this signal to the effect's sources (to allow cleanup)
let mut effect_sources = runtime.effect_sources.borrow_mut();
let mut effect_sources = runtime.effect_sources.write();
if let Some(effect_sources) = effect_sources.entry(observer) {
let sources = effect_sources.or_default();
sources.borrow_mut().insert(*self);
sources.write().insert(*self);
}
}
}
@@ -1197,7 +1201,7 @@ impl SignalId {
{
// get the value
let value = {
let signals = runtime.signals.borrow();
let signals = runtime.signals.read();
match signals.get(*self).cloned().ok_or(SignalError::Disposed) {
Ok(s) => Ok(s),
Err(e) => {
@@ -1206,12 +1210,13 @@ impl SignalId {
}
}
}?;
let value = value.try_borrow().unwrap_or_else(|e| {
debug_warn!(
"Signal::try_with_no_subscription failed on Signal<{}>. It seems you're trying to read the value of a signal within an effect caused by updating the signal.",
let value = value.try_read().unwrap_or_else(|| {
panic!(
"Signal::try_with_no_subscription failed on Signal<{}>. \
It seems you're trying to read the value of a signal within an effect \
caused by updating the signal.",
std::any::type_name::<T>()
);
panic!("{e}");
});
let value = value
.downcast_ref::<T>()
@@ -1221,13 +1226,14 @@ impl SignalId {
pub(crate) fn try_with<T, U>(
&self,
runtime_id: RuntimeId,
runtime: &Runtime,
f: impl FnOnce(&T) -> U,
) -> Result<U, SignalError>
where
T: 'static,
{
self.subscribe(runtime);
self.subscribe(runtime_id, runtime);
self.try_with_no_subscription(runtime, f)
}
@@ -1246,12 +1252,14 @@ impl SignalId {
.expect("tried to access a signal in a runtime that has been disposed")
}
pub(crate) fn with<T, U>(&self, runtime: RuntimeId, f: impl FnOnce(&T) -> U) -> U
pub(crate) fn with<T, U>(&self, runtime_id: RuntimeId, f: impl FnOnce(&T) -> U) -> U
where
T: 'static,
{
with_runtime(runtime, |runtime| self.try_with(runtime, f).unwrap())
.expect("tried to access a signal in a runtime that has been disposed")
with_runtime(runtime_id, |runtime| {
self.try_with(runtime_id, runtime, f).unwrap()
})
.expect("tried to access a signal in a runtime that has been disposed")
}
fn update_value<T, U>(&self, runtime: RuntimeId, f: impl FnOnce(&mut T) -> U) -> Option<U>
@@ -1260,11 +1268,11 @@ impl SignalId {
{
with_runtime(runtime, |runtime| {
let value = {
let signals = runtime.signals.borrow();
let signals = runtime.signals.read();
signals.get(*self).cloned()
};
if let Some(value) = value {
let mut value = value.borrow_mut();
let mut value = value.write();
if let Some(value) = value.downcast_mut::<T>() {
Some(f(value))
} else {
@@ -1300,14 +1308,14 @@ impl SignalId {
// notify subscribers
if updated.is_some() {
let subs = {
let subs = runtime.signal_subscribers.borrow();
let subs = runtime.signal_subscribers.read();
let subs = subs.get(*self);
subs.map(|subs| subs.borrow().clone())
subs.map(|subs| subs.read().clone())
};
if let Some(subs) = subs {
for sub in subs {
let effect = {
let effects = runtime.effects.borrow();
let effects = runtime.effects.read();
effects.get(sub).cloned()
};
if let Some(effect) = effect {

View File

@@ -19,7 +19,7 @@ fn untracked_set_doesnt_trigger_effect() {
create_isomorphic_effect(cx, {
let b = b.clone();
move |_| {
let formatted = format!("Value is {}", a());
let formatted = format!("Value is {}", a.get());
*b.borrow_mut() = formatted;
}
});
@@ -52,7 +52,7 @@ fn untracked_get_doesnt_trigger_effect() {
create_isomorphic_effect(cx, {
let b = b.clone();
move |_| {
let formatted = format!("Values are {} and {}", a(), a2.get_untracked());
let formatted = format!("Values are {} and {}", a.get(), a2.get_untracked());
*b.borrow_mut() = formatted;
}
});