fix: custom Stream implementation for streaming resource data that supports nested data/multiple polled values, rather than taking it all at once at the beginning

This commit is contained in:
Greg Johnston
2024-06-14 18:01:35 -04:00
parent 202abd1d35
commit 6d93185478
2 changed files with 115 additions and 74 deletions

View File

@@ -17,6 +17,7 @@ serde = { version = "1", features = ["derive"] }
wasm-bindgen = { version = "0.2", optional = true }
js-sys = { version = "0.3", optional = true }
once_cell = "1.19.0"
pin-project-lite = "0.2.14"
[features]
browser = ["dep:wasm-bindgen", "dep:js-sys"]

View File

@@ -1,30 +1,36 @@
use super::{SerializedDataId, SharedContext};
use crate::{PinnedFuture, PinnedStream};
use futures::{
stream::{self, FuturesUnordered},
StreamExt,
stream::{self},
Stream, StreamExt,
};
use or_poisoned::OrPoisoned;
use std::{
collections::HashSet,
fmt::{Debug, Write},
mem,
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, RwLock,
},
task::{Context, Poll},
};
use throw_error::{Error, ErrorId};
type AsyncDataBuf = Arc<RwLock<Vec<(SerializedDataId, PinnedFuture<String>)>>>;
type ErrorBuf = Arc<RwLock<Vec<(SerializedDataId, ErrorId, Error)>>>;
type SealedErrors = Arc<RwLock<HashSet<SerializedDataId>>>;
#[derive(Default)]
/// The shared context that should be used on the server side.
pub struct SsrSharedContext {
id: AtomicUsize,
is_hydrating: AtomicBool,
sync_buf: RwLock<Vec<ResolvedData>>,
async_buf: RwLock<Vec<(SerializedDataId, PinnedFuture<String>)>>,
errors: Arc<RwLock<Vec<(SerializedDataId, ErrorId, Error)>>>,
sealed_error_boundaries: Arc<RwLock<HashSet<SerializedDataId>>>,
async_buf: AsyncDataBuf,
errors: ErrorBuf,
sealed_error_boundaries: SealedErrors,
}
impl SsrSharedContext {
@@ -73,75 +79,6 @@ impl SharedContext for SsrSharedContext {
self.async_buf.write().or_poisoned().push((id, fut))
}
fn pending_data(&self) -> Option<PinnedStream<String>> {
let sync_data = mem::take(&mut *self.sync_buf.write().or_poisoned());
let async_data = mem::take(&mut *self.async_buf.write().or_poisoned());
// 1) initial, synchronous setup chunk
let mut initial_chunk = String::new();
// resolved synchronous resources and errors
initial_chunk.push_str("__RESOLVED_RESOURCES=[");
for resolved in sync_data {
resolved.write_to_buf(&mut initial_chunk);
initial_chunk.push(',');
}
initial_chunk.push_str("];");
initial_chunk.push_str("__SERIALIZED_ERRORS=[");
for error in mem::take(&mut *self.errors.write().or_poisoned()) {
_ = write!(
initial_chunk,
"[{}, {}, {:?}],",
error.0 .0,
error.1,
error.2.to_string()
);
}
initial_chunk.push_str("];");
// pending async resources
initial_chunk.push_str("__PENDING_RESOURCES=[");
for (id, _) in &async_data {
_ = write!(&mut initial_chunk, "{},", id.0);
}
initial_chunk.push_str("];");
// resolvers
initial_chunk.push_str("__RESOURCE_RESOLVERS=[];");
// 2) async resources as they resolve
let async_data = async_data
.into_iter()
.map(|(id, data)| {
let errors = Arc::clone(&self.errors);
let sealed = Arc::clone(&self.sealed_error_boundaries);
async move {
let data = data.await;
let data = data.replace('<', "\\u003c");
let mut val =
format!("__RESOLVED_RESOURCES[{}] = {:?};", id.0, data);
let sealed = sealed.read().or_poisoned();
for error in mem::take(&mut *errors.write().or_poisoned()) {
if !sealed.contains(&error.0) {
_ = write!(
val,
"__SERIALIZED_ERRORS.push([{}, {}, {:?}]);",
error.0 .0,
error.1,
error.2.to_string()
);
}
}
val
}
})
.collect::<FuturesUnordered<_>>();
let stream =
stream::once(async move { initial_chunk }).chain(async_data);
Some(Box::pin(stream))
}
fn read_data(&self, _id: &SerializedDataId) -> Option<String> {
None
}
@@ -196,6 +133,109 @@ impl SharedContext for SsrSharedContext {
.or_poisoned()
.insert(boundary_id.clone());
}
fn pending_data(&self) -> Option<PinnedStream<String>> {
let sync_data = mem::take(&mut *self.sync_buf.write().or_poisoned());
let async_data = self.async_buf.read().or_poisoned();
// 1) initial, synchronous setup chunk
let mut initial_chunk = String::new();
// resolved synchronous resources and errors
initial_chunk.push_str("__RESOLVED_RESOURCES=[");
for resolved in sync_data {
resolved.write_to_buf(&mut initial_chunk);
initial_chunk.push(',');
}
initial_chunk.push_str("];");
initial_chunk.push_str("__SERIALIZED_ERRORS=[");
for error in mem::take(&mut *self.errors.write().or_poisoned()) {
_ = write!(
initial_chunk,
"[{}, {}, {:?}],",
error.0 .0,
error.1,
error.2.to_string()
);
}
initial_chunk.push_str("];");
// pending async resources
initial_chunk.push_str("__PENDING_RESOURCES=[");
for (id, _) in async_data.iter() {
_ = write!(&mut initial_chunk, "{},", id.0);
}
initial_chunk.push_str("];");
// resolvers
initial_chunk.push_str("__RESOURCE_RESOLVERS=[];");
let async_data = AsyncDataStream {
async_buf: Arc::clone(&self.async_buf),
errors: Arc::clone(&self.errors),
sealed_error_boundaries: Arc::clone(&self.sealed_error_boundaries),
};
let stream =
stream::once(async move { initial_chunk }).chain(async_data);
Some(Box::pin(stream))
}
}
struct AsyncDataStream {
async_buf: AsyncDataBuf,
errors: ErrorBuf,
sealed_error_boundaries: SealedErrors,
}
impl Stream for AsyncDataStream {
type Item = String;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut resolved = String::new();
let mut async_buf = self.async_buf.write().or_poisoned();
let data = mem::take(&mut *async_buf);
for (id, mut fut) in data {
match fut.as_mut().poll(cx) {
// if it's not ready, put it back into the queue
Poll::Pending => {
async_buf.push((id, fut));
}
Poll::Ready(data) => {
let data = data.replace('<', "\\u003c");
_ = write!(
resolved,
"__RESOLVED_RESOURCES[{}] = {:?};",
id.0, data
);
}
}
}
let sealed = self.sealed_error_boundaries.read().or_poisoned();
for error in mem::take(&mut *self.errors.write().or_poisoned()) {
if !sealed.contains(&error.0) {
_ = write!(
resolved,
"__SERIALIZED_ERRORS.push([{}, {}, {:?}]);",
error.0 .0,
error.1,
error.2.to_string()
);
}
}
if async_buf.is_empty() && resolved.is_empty() {
return Poll::Ready(None);
}
if resolved.is_empty() {
return Poll::Pending;
}
Poll::Ready(Some(resolved))
}
}
#[derive(Debug)]