blocking resources

This commit is contained in:
Greg Johnston
2024-07-01 21:17:40 -04:00
parent fd48a61eef
commit d7ca969848
6 changed files with 221 additions and 60 deletions

View File

@@ -76,4 +76,12 @@ impl SharedContext for CsrSharedContext {
#[inline(always)]
fn hydration_complete(&self) {}
#[inline(always)]
fn defer_stream(&self, _wait_for: PinnedFuture<()>) {}
#[inline(always)]
fn await_deferred(&self) -> Option<PinnedFuture<()>> {
None
}
}

View File

@@ -158,4 +158,12 @@ impl SharedContext for HydrateSharedContext {
fn take_errors(&self) -> Vec<(SerializedDataId, ErrorId, Error)> {
self.errors.clone()
}
#[inline(always)]
fn defer_stream(&self, _wait_for: PinnedFuture<()>) {}
#[inline(always)]
fn await_deferred(&self) -> Option<PinnedFuture<()>> {
None
}
}

View File

@@ -126,4 +126,18 @@ pub trait SharedContext: Debug {
error_id: ErrorId,
error: Error,
);
/// Adds a `Future` to the set of “blocking resources” that should prevent the servers
/// response stream from beginning until all are resolved. The `Future` returned by
/// [`blocking_resources`](Self::blocking_resources) will not resolve until every `Future`
/// added by this method has resolved.
///
/// In browser implementations, this should be a no-op.
fn defer_stream(&self, wait_for: PinnedFuture<()>);
/// Returns a `Future` that will resolve when every `Future` added via
/// [`defer_stream`](Self::defer_stream) has resolved.
///
/// In browser implementations, this should be a no-op.
fn await_deferred(&self) -> Option<PinnedFuture<()>>;
}

View File

@@ -1,6 +1,7 @@
use super::{SerializedDataId, SharedContext};
use crate::{PinnedFuture, PinnedStream};
use futures::{
future::join_all,
stream::{self},
Stream, StreamExt,
};
@@ -12,7 +13,7 @@ use std::{
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, RwLock,
Arc, Mutex, RwLock,
},
task::{Context, Poll},
};
@@ -31,6 +32,7 @@ pub struct SsrSharedContext {
async_buf: AsyncDataBuf,
errors: ErrorBuf,
sealed_error_boundaries: SealedErrors,
deferred: Mutex<Vec<PinnedFuture<()>>>,
}
impl SsrSharedContext {
@@ -186,6 +188,21 @@ impl SharedContext for SsrSharedContext {
}
fn hydration_complete(&self) {}
fn defer_stream(&self, wait_for: PinnedFuture<()>) {
self.deferred.lock().or_poisoned().push(wait_for);
}
fn await_deferred(&self) -> Option<PinnedFuture<()>> {
let deferred = mem::take(&mut *self.deferred.lock().or_poisoned());
if deferred.is_empty() {
None
} else {
Some(Box::pin(async move {
join_all(deferred).await;
}))
}
}
}
struct AsyncDataStream {

View File

@@ -36,13 +36,18 @@ pub trait ExtendResponse: Sized {
IV: IntoView + 'static,
{
async move {
let (owner, stream) = build_response(
app_fn,
meta_context,
additional_context,
stream_builder,
);
let mut stream = stream.await;
let (owner, stream) =
build_response(app_fn, additional_context, stream_builder);
let stream = stream.await.ready_chunks(32).map(|n| n.join(""));
let sc = owner.shared_context().unwrap();
while let Some(pending) = sc.await_deferred() {
pending.await;
}
let mut stream =
Box::pin(meta_context.inject_meta_context(stream).await);
// wait for the first chunk of the stream, then set the status and headers
let first_chunk = stream.next().await.unwrap_or_default();
@@ -71,7 +76,6 @@ pub trait ExtendResponse: Sized {
pub fn build_response<IV>(
app_fn: impl FnOnce() -> IV + Send + 'static,
meta_context: ServerMetaContext,
additional_context: impl FnOnce() + Send + 'static,
stream_builder: fn(
IV,
@@ -87,42 +91,43 @@ where
let stream = Box::pin(Sandboxed::new({
let owner = owner.clone();
async move {
let stream = owner
.with(|| {
additional_context();
let stream = owner.with(|| {
additional_context();
// run app
let app = app_fn();
// run app
let app = app_fn();
let nonce = use_nonce()
.as_ref()
.map(|nonce| format!(" nonce=\"{nonce}\""))
.unwrap_or_default();
let nonce = use_nonce()
.as_ref()
.map(|nonce| format!(" nonce=\"{nonce}\""))
.unwrap_or_default();
let shared_context =
Owner::current_shared_context().unwrap();
let chunks = Box::new(move || {
let shared_context = Owner::current_shared_context().unwrap();
let chunks = Box::new({
let shared_context = shared_context.clone();
move || {
Box::pin(shared_context.pending_data().unwrap().map(
move |chunk| {
format!("<script{nonce}>{chunk}</script>")
},
))
as Pin<Box<dyn Stream<Item = String> + Send>>
});
}
});
// convert app to appropriate response type
// and chain the app stream, followed by chunks
// in theory, we could select here, and intersperse them
// the problem is that during the DOM walk, that would be mean random <script> tags
// interspersed where we expect other children
//
// we also don't actually start hydrating until after the whole stream is complete,
// so it's not useful to send those scripts down earlier.
stream_builder(app, chunks)
})
.await;
Box::pin(meta_context.inject_meta_context(stream).await)
as PinnedStream<String>
// convert app to appropriate response type
// and chain the app stream, followed by chunks
// in theory, we could select here, and intersperse them
// the problem is that during the DOM walk, that would be mean random <script> tags
// interspersed where we expect other children
//
// we also don't actually start hydrating until after the whole stream is complete,
// so it's not useful to send those scripts down earlier.
stream_builder(app, chunks)
});
stream.await
}
}));
(owner, stream)

View File

@@ -58,7 +58,19 @@ where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
ArcResource::new_with_encoding(source, fetcher)
ArcResource::new_with_options(source, fetcher, false)
}
pub fn new_str_blocking<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
) -> Self
where
S: PartialEq + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
ArcResource::new_with_options(source, fetcher, true)
}
}
@@ -78,14 +90,11 @@ where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
ArcResource::new_with_encoding(source, fetcher)
ArcResource::new_with_options(source, fetcher, false)
}
#[track_caller]
#[deprecated = "Use ::new() instead; I'm going to switch the default to \
SerdeJson and keep the FromStr/ToString available as \
::new_str()."]
pub fn new_serde<S, Fut>(
pub fn new_blocking<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
) -> Self
@@ -94,7 +103,7 @@ where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
ArcResource::new_with_encoding(source, fetcher)
ArcResource::new_with_options(source, fetcher, true)
}
}
@@ -114,7 +123,19 @@ where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
ArcResource::new_with_encoding(source, fetcher)
ArcResource::new_with_options(source, fetcher, false)
}
pub fn new_serde_wb_blocking<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
) -> Self
where
S: PartialEq + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
ArcResource::new_with_options(source, fetcher, true)
}
}
#[cfg(feature = "miniserde")]
@@ -133,7 +154,19 @@ where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
ArcResource::new_with_encoding(source, fetcher)
ArcResource::new_with_options(source, fetcher, false)
}
pub fn new_miniserde_blocking<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
) -> Self
where
S: PartialEq + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
ArcResource::new_with_options(source, fetcher, true)
}
}
@@ -153,7 +186,19 @@ where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
ArcResource::new_with_encoding(source, fetcher)
ArcResource::new_with_options(source, fetcher, false)
}
pub fn new_serde_lite_blocking<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
) -> Self
where
S: PartialEq + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
ArcResource::new_with_options(source, fetcher, true)
}
}
@@ -173,7 +218,19 @@ where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
ArcResource::new_with_encoding(source, fetcher)
ArcResource::new_with_options(source, fetcher, false)
}
pub fn new_rkyv_blocking<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
) -> Self
where
S: PartialEq + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
ArcResource::new_with_options(source, fetcher, true)
}
}
@@ -184,9 +241,10 @@ where
T::SerErr: Debug,
T::DeErr: Debug,
{
pub fn new_with_encoding<S, Fut>(
pub fn new_with_options<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
blocking: bool,
) -> ArcResource<T, Ser>
where
S: PartialEq + Clone + Send + Sync + 'static,
@@ -219,6 +277,10 @@ where
let value = data.clone();
let ready_fut = data.ready();
if blocking {
shared_context.defer_stream(Box::pin(data.ready()));
}
shared_context.write_async(
id,
Box::pin(async move {
@@ -329,7 +391,20 @@ where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Resource::new_with_encoding(source, fetcher)
Resource::new_with_options(source, fetcher, false)
}
#[track_caller]
pub fn new_str_blocking<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
) -> Self
where
S: PartialEq + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Resource::new_with_options(source, fetcher, true)
}
}
@@ -349,14 +424,11 @@ where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Resource::new_with_encoding(source, fetcher)
Resource::new_with_options(source, fetcher, false)
}
#[track_caller]
#[deprecated = "Use ::new() instead; I'm going to switch the default to \
SerdeJson and keep the FromStr/ToString available as \
::new_str()."]
pub fn new_serde<S, Fut>(
pub fn new_blocking<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
) -> Self
@@ -365,7 +437,7 @@ where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Resource::new_with_encoding(source, fetcher)
Resource::new_with_options(source, fetcher, true)
}
}
@@ -385,7 +457,19 @@ where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Resource::new_with_encoding(source, fetcher)
Resource::new_with_options(source, fetcher, false)
}
pub fn new_serde_wb_blocking<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
) -> Self
where
S: PartialEq + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Resource::new_with_options(source, fetcher, true)
}
}
@@ -405,7 +489,7 @@ where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Resource::new_with_encoding(source, fetcher)
Resource::new_with_options(source, fetcher, false)
}
}
@@ -425,7 +509,19 @@ where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Resource::new_with_encoding(source, fetcher)
Resource::new_with_options(source, fetcher, false)
}
pub fn new_serde_lite_blocking<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
) -> Self
where
S: PartialEq + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Resource::new_with_options(source, fetcher, true)
}
}
@@ -445,7 +541,19 @@ where
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Resource::new_with_encoding(source, fetcher)
Resource::new_with_options(source, fetcher, false)
}
pub fn new_rkyv_blocking<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
) -> Self
where
S: PartialEq + Clone + Send + Sync + 'static,
T: Send + Sync + 'static,
Fut: Future<Output = T> + Send + 'static,
{
Resource::new_with_options(source, fetcher, true)
}
}
@@ -456,9 +564,10 @@ where
T::SerErr: Debug,
T::DeErr: Debug,
{
pub fn new_with_encoding<S, Fut>(
pub fn new_with_options<S, Fut>(
source: impl Fn() -> S + Send + Sync + 'static,
fetcher: impl Fn(S) -> Fut + Send + Sync + 'static,
blocking: bool,
) -> Resource<T, Ser>
where
S: Send + Sync + Clone + PartialEq + 'static,
@@ -466,7 +575,7 @@ where
Fut: Future<Output = T> + Send + 'static,
{
let ArcResource { data, .. } =
ArcResource::new_with_encoding(source, fetcher);
ArcResource::new_with_options(source, fetcher, blocking);
Resource {
ser: PhantomData,
data: data.into(),