diff --git a/server_fn/src/codec/stream.rs b/server_fn/src/codec/stream.rs index 271d6985f..a6c418324 100644 --- a/server_fn/src/codec/stream.rs +++ b/server_fn/src/codec/stream.rs @@ -6,7 +6,7 @@ use crate::{ ContentType, IntoRes, ServerFnError, }; use bytes::Bytes; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use http::Method; use std::{fmt::Debug, pin::Pin}; @@ -35,7 +35,8 @@ impl Encoding for Streaming { impl IntoReq for T where Request: ClientReq, - T: Stream + Send + Sync + 'static, + T: Stream + Send + 'static, + E: FromServerFnError, { fn into_req(self, path: &str, accepts: &str) -> Result { Request::try_new_post_streaming( @@ -50,11 +51,12 @@ where impl FromReq for T where Request: Req + Send + 'static, - T: From + 'static, + T: From> + 'static, + E: FromServerFnError, { async fn from_req(req: Request) -> Result { let data = req.try_into_stream()?; - let s = ByteStream::new(data); + let s = ByteStream::new(data.map_err(|e| E::de(e))); Ok(s.into()) } } @@ -71,37 +73,36 @@ where /// end before the output will begin. /// /// Streaming requests are only allowed over HTTP2 or HTTP3. -pub struct ByteStream(Pin> + Send>>); +pub struct ByteStream( + Pin> + Send>>, +); -impl ByteStream { +impl ByteStream { /// Consumes the wrapper, returning a stream of bytes. - pub fn into_inner(self) -> impl Stream> + Send { + pub fn into_inner(self) -> impl Stream> + Send { self.0 } } -impl Debug for ByteStream { +impl Debug for ByteStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_tuple("ByteStream").finish() } } -impl ByteStream { +impl ByteStream { /// Creates a new `ByteStream` from the given stream. - pub fn new( + pub fn new( value: impl Stream> + Send + 'static, ) -> Self where T: Into, - E: Into, { - Self(Box::pin( - value.map(|value| value.map(Into::into).map_err(Into::into)), - )) + Self(Box::pin(value.map(|value| value.map(Into::into)))) } } -impl From for ByteStream +impl From for ByteStream where S: Stream + Send + 'static, T: Into, @@ -111,23 +112,27 @@ where } } -impl IntoRes for ByteStream +impl IntoRes for ByteStream where Response: TryRes, - E: 'static, + E: FromServerFnError, { async fn into_res(self) -> Result { - Response::try_from_stream(Streaming::CONTENT_TYPE, self.into_inner()) + Response::try_from_stream( + Streaming::CONTENT_TYPE, + self.into_inner().map_err(|e| e.ser()), + ) } } -impl FromRes for ByteStream +impl FromRes for ByteStream where Response: ClientRes + Send, + E: FromServerFnError, { async fn from_res(res: Response) -> Result { let stream = res.try_into_stream()?; - Ok(ByteStream(Box::pin(stream))) + Ok(ByteStream::new(stream.map_err(|e| E::de(e)))) } } @@ -169,14 +174,14 @@ pub struct TextStream( Pin> + Send>>, ); -impl Debug for TextStream { +impl Debug for TextStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_tuple("TextStream").finish() } } -impl TextStream { - /// Creates a new `ByteStream` from the given stream. +impl TextStream { + /// Creates a new `TextStream` from the given stream. pub fn new( value: impl Stream> + Send + 'static, ) -> Self { @@ -184,7 +189,7 @@ impl TextStream { } } -impl TextStream { +impl TextStream { /// Consumes the wrapper, returning a stream of text. pub fn into_inner(self) -> impl Stream> + Send { self.0 @@ -195,7 +200,6 @@ impl From for TextStream where S: Stream + Send + 'static, T: Into, - E: FromServerFnError, { fn from(value: S) -> Self { Self(Box::pin(value.map(|data| Ok(data.into()))))