feat: enhancing ByteStream error handling (#3869)

* feat: enhancing `ByteStream` error handling

* fix: easing unnecessary trait bound over some `TextStream` methods
This commit is contained in:
Saber Haj Rabiee
2025-04-23 04:38:39 -07:00
committed by GitHub
parent 62dac6fb8a
commit f83b14d76c

View File

@@ -6,7 +6,7 @@ use crate::{
ContentType, IntoRes, ServerFnError,
};
use bytes::Bytes;
use futures::{Stream, StreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use http::Method;
use std::{fmt::Debug, pin::Pin};
@@ -35,7 +35,8 @@ impl Encoding for Streaming {
impl<E, T, Request> IntoReq<Streaming, Request, E> for T
where
Request: ClientReq<E>,
T: Stream<Item = Bytes> + Send + Sync + 'static,
T: Stream<Item = Bytes> + Send + 'static,
E: FromServerFnError,
{
fn into_req(self, path: &str, accepts: &str) -> Result<Request, E> {
Request::try_new_post_streaming(
@@ -50,11 +51,12 @@ where
impl<E, T, Request> FromReq<Streaming, Request, E> for T
where
Request: Req<E> + Send + 'static,
T: From<ByteStream> + 'static,
T: From<ByteStream<E>> + 'static,
E: FromServerFnError,
{
async fn from_req(req: Request) -> Result<Self, E> {
let data = req.try_into_stream()?;
let s = ByteStream::new(data);
let s = ByteStream::new(data.map_err(|e| E::de(e)));
Ok(s.into())
}
}
@@ -71,37 +73,36 @@ where
/// end before the output will begin.
///
/// Streaming requests are only allowed over HTTP2 or HTTP3.
pub struct ByteStream(Pin<Box<dyn Stream<Item = Result<Bytes, Bytes>> + Send>>);
pub struct ByteStream<E = ServerFnError>(
Pin<Box<dyn Stream<Item = Result<Bytes, E>> + Send>>,
);
impl ByteStream {
impl<E> ByteStream<E> {
/// Consumes the wrapper, returning a stream of bytes.
pub fn into_inner(self) -> impl Stream<Item = Result<Bytes, Bytes>> + Send {
pub fn into_inner(self) -> impl Stream<Item = Result<Bytes, E>> + Send {
self.0
}
}
impl Debug for ByteStream {
impl<E> Debug for ByteStream<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ByteStream").finish()
}
}
impl ByteStream {
impl<E> ByteStream<E> {
/// Creates a new `ByteStream` from the given stream.
pub fn new<T, E>(
pub fn new<T>(
value: impl Stream<Item = Result<T, E>> + Send + 'static,
) -> Self
where
T: Into<Bytes>,
E: Into<Bytes>,
{
Self(Box::pin(
value.map(|value| value.map(Into::into).map_err(Into::into)),
))
Self(Box::pin(value.map(|value| value.map(Into::into))))
}
}
impl<S, T> From<S> for ByteStream
impl<E, S, T> From<S> for ByteStream<E>
where
S: Stream<Item = T> + Send + 'static,
T: Into<Bytes>,
@@ -111,23 +112,27 @@ where
}
}
impl<E, Response> IntoRes<Streaming, Response, E> for ByteStream
impl<E, Response> IntoRes<Streaming, Response, E> for ByteStream<E>
where
Response: TryRes<E>,
E: 'static,
E: FromServerFnError,
{
async fn into_res(self) -> Result<Response, E> {
Response::try_from_stream(Streaming::CONTENT_TYPE, self.into_inner())
Response::try_from_stream(
Streaming::CONTENT_TYPE,
self.into_inner().map_err(|e| e.ser()),
)
}
}
impl<E, Response> FromRes<Streaming, Response, E> for ByteStream
impl<E, Response> FromRes<Streaming, Response, E> for ByteStream<E>
where
Response: ClientRes<E> + Send,
E: FromServerFnError,
{
async fn from_res(res: Response) -> Result<Self, E> {
let stream = res.try_into_stream()?;
Ok(ByteStream(Box::pin(stream)))
Ok(ByteStream::new(stream.map_err(|e| E::de(e))))
}
}
@@ -169,14 +174,14 @@ pub struct TextStream<E = ServerFnError>(
Pin<Box<dyn Stream<Item = Result<String, E>> + Send>>,
);
impl<E: FromServerFnError> Debug for TextStream<E> {
impl<E> Debug for TextStream<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("TextStream").finish()
}
}
impl<E: FromServerFnError> TextStream<E> {
/// Creates a new `ByteStream` from the given stream.
impl<E> TextStream<E> {
/// Creates a new `TextStream` from the given stream.
pub fn new(
value: impl Stream<Item = Result<String, E>> + Send + 'static,
) -> Self {
@@ -184,7 +189,7 @@ impl<E: FromServerFnError> TextStream<E> {
}
}
impl<E: FromServerFnError> TextStream<E> {
impl<E> TextStream<E> {
/// Consumes the wrapper, returning a stream of text.
pub fn into_inner(self) -> impl Stream<Item = Result<String, E>> + Send {
self.0
@@ -195,7 +200,6 @@ impl<E, S, T> From<S> for TextStream<E>
where
S: Stream<Item = T> + Send + 'static,
T: Into<String>,
E: FromServerFnError,
{
fn from(value: S) -> Self {
Self(Box::pin(value.map(|data| Ok(data.into()))))