Compare commits

...

2 Commits

Author SHA1 Message Date
Greg Johnston
7376b76558 what I'm talking about 2023-07-19 21:32:22 -04:00
Greg Johnston
579abc586c wtf? 2023-07-19 21:31:16 -04:00
5 changed files with 28 additions and 3 deletions

View File

@@ -605,6 +605,8 @@ where
let res_options3 = default_res_options.clone();
let local_pool = get_leptos_pool();
let (tx, rx) = futures::channel::mpsc::channel(8);
let (complete_tx, complete_rx) =
futures::channel::oneshot::channel();
let current_span = tracing::Span::current();
local_pool.spawn_pinned(move || async move {
@@ -631,10 +633,14 @@ where
forward_stream(&options, res_options2, bundle, runtime, scope, tx).await;
complete_rx.await.expect("could not receive completion message");
eprintln!("\n\n\n\nreceived completion message");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
runtime.dispose();
}.instrument(current_span));
generate_response(res_options3, rx)
generate_response(res_options3, rx, complete_tx)
})
}
}
@@ -643,6 +649,7 @@ where
async fn generate_response(
res_options: ResponseOptions,
rx: Receiver<String>,
complete_tx: futures::channel::oneshot::Sender<()>,
) -> Response<StreamBody<PinnedHtmlStream>> {
let mut stream = Box::pin(rx.map(|html| Ok(Bytes::from(html))));
@@ -655,7 +662,13 @@ async fn generate_response(
let complete_stream =
futures::stream::iter([first_chunk.unwrap(), second_chunk.unwrap()])
.chain(stream);
.chain(stream)
.chain(futures::stream::once(async move {
complete_tx
.send(())
.expect("could not send completion message");
Ok(Bytes::from(String::new()))
}));
let mut res = Response::new(StreamBody::new(
Box::pin(complete_stream) as PinnedHtmlStream
@@ -770,6 +783,8 @@ where
let full_path = format!("http://leptos.dev{path}");
let (tx, rx) = futures::channel::mpsc::channel(8);
let (complete_tx, complete_rx) =
futures::channel::oneshot::channel();
let local_pool = get_leptos_pool();
let current_span = tracing::Span::current();
local_pool.spawn_pinned(|| async move {
@@ -791,10 +806,14 @@ where
forward_stream(&options, res_options2, bundle, runtime, scope, tx).await;
complete_rx.await.expect("could not receive completion message");
eprintln!("\n\n\nreceived completion message");
runtime.dispose();
}.instrument(current_span));
generate_response(res_options3, rx).await
generate_response(res_options3, rx, complete_tx).await
}
})
}

View File

@@ -189,6 +189,7 @@ pub fn render_to_stream_with_prefix_undisposed_with_context_and_block_replacemen
// create the runtime
let runtime = create_runtime();
eprintln!("\n\ncreated runtime {runtime:?}");
let ((shell, pending_resources, pending_fragments, serializers), scope, _) =
run_scope_undisposed(runtime, {

View File

@@ -417,6 +417,8 @@ impl RuntimeId {
pub fn dispose(self) {
#[cfg(not(any(feature = "csr", feature = "hydrate")))]
{
eprintln!("\n\ndisposing of {self:?}");
let runtime = RUNTIMES.with(move |runtimes| runtimes.borrow_mut().remove(self))
.expect("Attempted to dispose of a reactive runtime that was not found. This suggests \
a possible memory leak. Please open an issue with details at https://github.com/leptos-rs/leptos");

View File

@@ -137,6 +137,7 @@ impl<T> StoredValue<T> {
/// Same as [`StoredValue::with_value`] but returns [`Some(O)]` only if
/// the stored value has not yet been disposed. [`None`] otherwise.
pub fn try_with_value<O>(&self, f: impl FnOnce(&T) -> O) -> Option<O> {
eprintln!("\n\nlooking in {:?}", self.runtime);
with_runtime(self.runtime, |runtime| {
let value = {
let values = runtime.stored_values.borrow();
@@ -293,6 +294,7 @@ pub fn store_value<T>(cx: Scope, value: T) -> StoredValue<T>
where
T: 'static,
{
eprintln!("\nstoring in {:?}", cx.runtime);
let id = with_runtime(cx.runtime, |runtime| {
runtime
.stored_values

View File

@@ -135,6 +135,7 @@ where
/// The URL associated with the action (typically as part of a server function.)
/// This enables integration with the `ActionForm` component in `leptos_router`.
pub fn url(&self) -> Option<String> {
eprintln!("action = {:?}", self.0);
self.0.with_value(|a| a.url.as_ref().cloned())
}