From 6d5ab7359476cf9d3a5f8322489ca67557d5d91e Mon Sep 17 00:00:00 2001 From: Saber Haj Rabiee Date: Mon, 14 Apr 2025 23:52:02 -0700 Subject: [PATCH 1/7] chore: easing `Display` bound on `FromServerFnError`, #3811 follow-up --- server_fn/src/client.rs | 9 ++++++--- server_fn/src/error.rs | 4 +--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/server_fn/src/client.rs b/server_fn/src/client.rs index 7571e20a4..a6c413895 100644 --- a/server_fn/src/client.rs +++ b/server_fn/src/client.rs @@ -201,14 +201,17 @@ pub mod browser { Ok(message) => Ok(Message::Bytes(message.into())), Err(err) => { let err = InputStreamError::de(err); + let formatted_err = format!("{:?}", err); web_sys::console::error_1( - &js_sys::JsString::from(err.to_string()), + &js_sys::JsString::from( + formatted_err.clone(), + ), ); const CLOSE_CODE_ERROR: u16 = 1011; Err(WebSocketError::ConnectionClose( CloseEvent { code: CLOSE_CODE_ERROR, - reason: err.to_string(), + reason: formatted_err, was_clean: true, }, )) @@ -303,7 +306,7 @@ pub mod reqwest { Err(err) => { let err = E::de(err); Err(tokio_tungstenite::tungstenite::Error::Io( - std::io::Error::other(err.to_string()), + std::io::Error::other(format!("{:?}", err)), )) } } diff --git a/server_fn/src/error.rs b/server_fn/src/error.rs index 527ba95c4..1b8491f5f 100644 --- a/server_fn/src/error.rs +++ b/server_fn/src/error.rs @@ -561,9 +561,7 @@ impl FromStr for ServerFnErrorWrapper { } /// A trait for types that can be returned from a server function. -pub trait FromServerFnError: - std::fmt::Debug + Sized + Display + 'static -{ +pub trait FromServerFnError: std::fmt::Debug + Sized + 'static { /// The encoding strategy used to serialize and deserialize this error type. Must implement the [`Encodes`](server_fn::Encodes) trait for references to the error type. type Encoder: Encodes + Decodes; From 30c445a419c6dce591854cbab076623964f17949 Mon Sep 17 00:00:00 2001 From: Saber Haj Rabiee Date: Tue, 15 Apr 2025 07:43:47 -0700 Subject: [PATCH 2/7] fix: send/receive websocket data --- server_fn/src/client.rs | 106 ++++++++++------------- server_fn/src/lib.rs | 141 +++++++++++++++++++++++++------ server_fn/src/request/actix.rs | 17 ++-- server_fn/src/request/axum.rs | 31 ++++--- server_fn/src/request/generic.rs | 4 +- server_fn/src/request/mod.rs | 6 +- 6 files changed, 187 insertions(+), 118 deletions(-) diff --git a/server_fn/src/client.rs b/server_fn/src/client.rs index a6c413895..34766aa86 100644 --- a/server_fn/src/client.rs +++ b/server_fn/src/client.rs @@ -42,7 +42,7 @@ pub trait Client { Output = Result< ( impl Stream> + Send + 'static, - impl Sink> + Send + 'static, + impl Sink + Send + 'static, ), Error, >, @@ -62,8 +62,8 @@ pub mod browser { response::browser::BrowserResponse, }; use bytes::Bytes; - use futures::{Sink, SinkExt, StreamExt, TryStreamExt}; - use gloo_net::websocket::{events::CloseEvent, Message, WebSocketError}; + use futures::{Sink, SinkExt, StreamExt}; + use gloo_net::websocket::{Message, WebSocketError}; use send_wrapper::SendWrapper; use std::future::Future; @@ -115,7 +115,7 @@ pub mod browser { impl futures::Stream> + Send + 'static, - impl futures::Sink> + Send + 'static, + impl futures::Sink + Send + 'static, ), Error, >, @@ -131,18 +131,23 @@ pub mod browser { })?; let (sink, stream) = websocket.split(); - let stream = stream - .map_err(|err| { + let stream = stream.map(|message| match message { + Ok(message) => { + crate::deserialize_result::( + match message { + Message::Text(text) => Bytes::from(text), + Message::Bytes(bytes) => Bytes::from(bytes), + }, + ) + } + Err(err) => { web_sys::console::error_1(&err.to_string().into()); - OutputStreamError::from_server_fn_error( + Err(OutputStreamError::from_server_fn_error( ServerFnErrorErr::Request(err.to_string()), ) - .ser() - }) - .map_ok(move |msg| match msg { - Message::Text(text) => Bytes::from(text), - Message::Bytes(bytes) => Bytes::from(bytes), - }); + .ser()) + } + }); let stream = SendWrapper::new(stream); struct SendWrapperSink { @@ -195,29 +200,11 @@ pub mod browser { } } - let sink = - sink.with(|message: Result| async move { - match message { - Ok(message) => Ok(Message::Bytes(message.into())), - Err(err) => { - let err = InputStreamError::de(err); - let formatted_err = format!("{:?}", err); - web_sys::console::error_1( - &js_sys::JsString::from( - formatted_err.clone(), - ), - ); - const CLOSE_CODE_ERROR: u16 = 1011; - Err(WebSocketError::ConnectionClose( - CloseEvent { - code: CLOSE_CODE_ERROR, - reason: formatted_err, - was_clean: true, - }, - )) - } - } - }); + let sink = sink.with(|message: Bytes| async move { + Ok::(Message::Bytes( + message.into(), + )) + }); let sink = SendWrapperSink::new(Box::pin(sink)); Ok((stream, sink)) @@ -246,13 +233,19 @@ pub mod reqwest { /// Implements [`Client`] for a request made by [`reqwest`]. pub struct ReqwestClient; - impl Client for ReqwestClient { + impl< + Error: FromServerFnError, + InputStreamError: FromServerFnError, + OutputStreamError: FromServerFnError, + > Client for ReqwestClient + { type Request = Request; type Response = Response; fn send( req: Self::Request, - ) -> impl Future> + Send { + ) -> impl Future> + Send + { CLIENT.execute(req).map_err(|e| { ServerFnErrorErr::Request(e.to_string()).into_app_error() }) @@ -262,12 +255,10 @@ pub mod reqwest { path: &str, ) -> Result< ( - impl futures::Stream> - + Send - + 'static, - impl futures::Sink> + Send + 'static, + impl futures::Stream> + Send + 'static, + impl futures::Sink + Send + 'static, ), - E, + Error, > { let mut websocket_server_url = get_server_url().to_string(); if let Some(postfix) = websocket_server_url.strip_prefix("http://") @@ -281,7 +272,7 @@ pub mod reqwest { let url = format!("{}{}", websocket_server_url, path); let (ws_stream, _) = tokio_tungstenite::connect_async(url).await.map_err(|e| { - E::from_server_fn_error(ServerFnErrorErr::Request( + Error::from_server_fn_error(ServerFnErrorErr::Request( e.to_string(), )) })?; @@ -290,26 +281,21 @@ pub mod reqwest { Ok(( read.map(|msg| match msg { - Ok(msg) => Ok(msg.into_data()), - Err(e) => Err(E::from_server_fn_error( + Ok(msg) => crate::deserialize_result::( + msg.into_data(), + ), + Err(e) => Err(OutputStreamError::from_server_fn_error( ServerFnErrorErr::Request(e.to_string()), ) .ser()), }), - write.with(|msg: Result| async move { - match msg { - Ok(msg) => { - Ok(tokio_tungstenite::tungstenite::Message::Binary( - msg, - )) - } - Err(err) => { - let err = E::de(err); - Err(tokio_tungstenite::tungstenite::Error::Io( - std::io::Error::other(format!("{:?}", err)), - )) - } - } + write.with(|msg: Bytes| async move { + Ok::< + tokio_tungstenite::tungstenite::Message, + tokio_tungstenite::tungstenite::Error, + >( + tokio_tungstenite::tungstenite::Message::Binary(msg) + ) }), )) } diff --git a/server_fn/src/lib.rs b/server_fn/src/lib.rs index f006e9f3f..705e26266 100644 --- a/server_fn/src/lib.rs +++ b/server_fn/src/lib.rs @@ -136,6 +136,7 @@ use base64::{engine::general_purpose::STANDARD_NO_PAD, DecodeError, Engine}; // re-exported to make it possible to implement a custom Client without adding a separate // dependency on `bytes` pub use bytes::Bytes; +use bytes::{BufMut, BytesMut}; use client::Client; use codec::{Encoding, FromReq, FromRes, IntoReq, IntoRes}; #[doc(hidden)] @@ -656,14 +657,17 @@ where let output = server_fn(input.into()).await?; - let output = output.stream.map(|output| match output { - Ok(output) => OutputEncoding::encode(&output).map_err(|e| { - OutputStreamError::from_server_fn_error( - ServerFnErrorErr::Serialization(e.to_string()), - ) - .ser() - }), - Err(err) => Err(err.ser()), + let output = output.stream.map(|output| { + let result = match output { + Ok(output) => OutputEncoding::encode(&output).map_err(|e| { + OutputStreamError::from_server_fn_error( + ServerFnErrorErr::Serialization(e.to_string()), + ) + .ser() + }), + Err(err) => Err(err.ser()), + }; + serialize_result(result) }); Server::spawn(async move { @@ -695,23 +699,21 @@ where pin_mut!(input); pin_mut!(sink); while let Some(input) = input.stream.next().await { - if sink - .send( - input - .and_then(|input| { - InputEncoding::encode(&input).map_err(|e| { - InputStreamError::from_server_fn_error( - ServerFnErrorErr::Serialization( - e.to_string(), - ), - ) - }) - }) - .map_err(|e| e.ser()), - ) - .await - .is_err() - { + let result = match input { + Ok(input) => { + InputEncoding::encode(&input).map_err(|e| { + InputStreamError::from_server_fn_error( + ServerFnErrorErr::Serialization( + e.to_string(), + ), + ) + .ser() + }) + } + Err(err) => Err(err.ser()), + }; + let result = serialize_result(result); + if sink.send(result).await.is_err() { break; } } @@ -740,6 +742,53 @@ where } } +/// Serializes a Result into a single Bytes instance. +/// Format: [tag: u8][content: Bytes] +/// - Tag 0: Ok variant +/// - Tag 1: Err variant +pub(crate) fn serialize_result(result: Result) -> Bytes { + match result { + Ok(bytes) => { + let mut buf = BytesMut::with_capacity(1 + bytes.len()); + buf.put_u8(0); // Tag for Ok variant + buf.extend_from_slice(&bytes); + buf.freeze() + } + Err(bytes) => { + let mut buf = BytesMut::with_capacity(1 + bytes.len()); + buf.put_u8(1); // Tag for Err variant + buf.extend_from_slice(&bytes); + buf.freeze() + } + } +} + +/// Deserializes a Bytes instance back into a Result. +pub(crate) fn deserialize_result( + bytes: Bytes, +) -> Result { + if bytes.is_empty() { + return Err(E::from_server_fn_error( + ServerFnErrorErr::Deserialization("Data is empty".into()), + ) + .ser()); + } + + let tag = bytes[0]; + let content = bytes.slice(1..); + + match tag { + 0 => Ok(content), + 1 => Err(content), + _ => { + return Err(E::from_server_fn_error( + ServerFnErrorErr::Deserialization("Invalid data tag".into()), + ) + .ser()) + } // Invalid tag + } +} + /// Encode format type pub enum Format { /// Binary representation @@ -1218,3 +1267,45 @@ pub mod mock { } } } + +#[cfg(test)] +mod tests { + + use super::*; + use crate::codec::JsonEncoding; + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Serialize, Deserialize)] + enum TestError { + ServerFnError(ServerFnErrorErr), + } + + impl FromServerFnError for TestError { + type Encoder = JsonEncoding; + + fn from_server_fn_error(value: ServerFnErrorErr) -> Self { + Self::ServerFnError(value) + } + } + #[test] + fn test_result_serialization() { + // Test Ok variant + let ok_result: Result = + Ok(Bytes::from_static(b"success data")); + let serialized = serialize_result(ok_result); + let deserialized = deserialize_result::(serialized); + assert!(deserialized.is_ok()); + assert_eq!(deserialized.unwrap(), Bytes::from_static(b"success data")); + + // Test Err variant + let err_result: Result = + Err(Bytes::from_static(b"error details")); + let serialized = serialize_result(err_result); + let deserialized = deserialize_result::(serialized); + assert!(deserialized.is_err()); + assert_eq!( + deserialized.unwrap_err(), + Bytes::from_static(b"error details") + ); + } +} diff --git a/server_fn/src/request/actix.rs b/server_fn/src/request/actix.rs index 261ef7f16..a8f25eff7 100644 --- a/server_fn/src/request/actix.rs +++ b/server_fn/src/request/actix.rs @@ -117,7 +117,7 @@ where ) -> Result< ( impl Stream> + Send + 'static, - impl futures::Sink> + Send + 'static, + impl futures::Sink + Send + 'static, Self::WebsocketResponse, ), Error, @@ -133,7 +133,7 @@ where let (mut response_stream_tx, response_stream_rx) = futures::channel::mpsc::channel(2048); let (response_sink_tx, mut response_sink_rx) = - futures::channel::mpsc::channel::>(2048); + futures::channel::mpsc::channel::(2048); actix_web::rt::spawn(async move { loop { @@ -142,16 +142,9 @@ where let Some(incoming) = incoming else { break; }; - match incoming { - Ok(message) => { - if let Err(err) = session.binary(message).await { + if let Err(err) = session.binary(incoming).await { _ = response_stream_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::Request(err.to_string())).ser())); } - } - Err(err) => { - _ = response_stream_tx.start_send(Err(err)); - } - } }, outgoing = msg_stream.next().fuse() => { let Some(outgoing) = outgoing else { @@ -166,11 +159,11 @@ where Ok(Message::Binary(bytes)) => { _ = response_stream_tx .start_send( - Ok(bytes), + crate::deserialize_result::(bytes), ); } Ok(Message::Text(text)) => { - _ = response_stream_tx.start_send(Ok(text.into_bytes())); + _ = response_stream_tx.start_send(crate::deserialize_result::(text.into_bytes())); } Ok(_other) => { } diff --git a/server_fn/src/request/axum.rs b/server_fn/src/request/axum.rs index 99f676666..5f2e4e64b 100644 --- a/server_fn/src/request/axum.rs +++ b/server_fn/src/request/axum.rs @@ -79,7 +79,7 @@ where ) -> Result< ( impl Stream> + Send + 'static, - impl Sink> + Send + 'static, + impl Sink + Send + 'static, Self::WebsocketResponse, ), Error, @@ -91,7 +91,7 @@ where futures::stream::Once< std::future::Ready>, >, - futures::sink::Drain>, + futures::sink::Drain, Self::WebsocketResponse, ), Error, @@ -117,9 +117,9 @@ where )) })?; let (mut outgoing_tx, outgoing_rx) = - futures::channel::mpsc::channel(2048); - let (incoming_tx, mut incoming_rx) = futures::channel::mpsc::channel::>(2048); + let (incoming_tx, mut incoming_rx) = + futures::channel::mpsc::channel::(2048); let response = upgrade .on_failed_upgrade({ let mut outgoing_tx = outgoing_tx.clone(); @@ -134,18 +134,11 @@ where let Some(incoming) = incoming else { break; }; - match incoming { - Ok(message) => { - if let Err(err) = session.send(Message::Binary(message)).await { - _ = outgoing_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::Request(err.to_string())).ser())); - } - } - Err(err) => { - _ = outgoing_tx.start_send(Err(err)); - } + if let Err(err) = session.send(Message::Binary(incoming)).await { + _ = outgoing_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::Request(err.to_string())).ser())); } }, - outgoing = session.recv().fuse() => { + outgoing = session.recv().fuse() => { let Some(outgoing) = outgoing else { break; }; @@ -153,14 +146,20 @@ where Ok(Message::Binary(bytes)) => { _ = outgoing_tx .start_send( - Ok(bytes), + crate::deserialize_result::(bytes), ); } Ok(Message::Text(text)) => { - _ = outgoing_tx.start_send(Ok(Bytes::from(text))); + _ = outgoing_tx.start_send(crate::deserialize_result::(Bytes::from(text))); + } + Ok(Message::Ping(bytes)) => { + if session.send(Message::Pong(bytes)).await.is_err() { + break; + } } Ok(_other) => {} Err(e) => { + println!("2"); _ = outgoing_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::Response(e.to_string())).ser())); } } diff --git a/server_fn/src/request/generic.rs b/server_fn/src/request/generic.rs index 7da7beeb0..3e72adb27 100644 --- a/server_fn/src/request/generic.rs +++ b/server_fn/src/request/generic.rs @@ -79,7 +79,7 @@ where ) -> Result< ( impl Stream> + Send + 'static, - impl Sink> + Send + 'static, + impl Sink + Send + 'static, Self::WebsocketResponse, ), Error, @@ -87,7 +87,7 @@ where Err::< ( futures::stream::Once>>, - futures::sink::Drain>, + futures::sink::Drain, Self::WebsocketResponse, ), _, diff --git a/server_fn/src/request/mod.rs b/server_fn/src/request/mod.rs index a1c808f45..f10a1b361 100644 --- a/server_fn/src/request/mod.rs +++ b/server_fn/src/request/mod.rs @@ -360,7 +360,7 @@ where Output = Result< ( impl Stream> + Send + 'static, - impl Sink> + Send + 'static, + impl Sink + Send + 'static, Self::WebsocketResponse, ), Error, @@ -415,7 +415,7 @@ where ) -> Result< ( impl Stream> + Send + 'static, - impl Sink> + Send + 'static, + impl Sink + Send + 'static, Self::WebsocketResponse, ), Error, @@ -424,7 +424,7 @@ where Err::< ( futures::stream::Once>>, - futures::sink::Drain>, + futures::sink::Drain, Self::WebsocketResponse, ), _, From b95e827b8b8b32893695458e0c2d28e27aee4018 Mon Sep 17 00:00:00 2001 From: Saber Haj Rabiee Date: Tue, 15 Apr 2025 07:59:26 -0700 Subject: [PATCH 3/7] fix: clippy warnings --- server_fn/src/lib.rs | 12 ++++++------ server_fn/src/request/axum.rs | 1 - 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/server_fn/src/lib.rs b/server_fn/src/lib.rs index 705e26266..4f64e5b92 100644 --- a/server_fn/src/lib.rs +++ b/server_fn/src/lib.rs @@ -746,6 +746,7 @@ where /// Format: [tag: u8][content: Bytes] /// - Tag 0: Ok variant /// - Tag 1: Err variant +#[allow(dead_code)] pub(crate) fn serialize_result(result: Result) -> Bytes { match result { Ok(bytes) => { @@ -764,6 +765,7 @@ pub(crate) fn serialize_result(result: Result) -> Bytes { } /// Deserializes a Bytes instance back into a Result. +#[allow(dead_code)] pub(crate) fn deserialize_result( bytes: Bytes, ) -> Result { @@ -780,12 +782,10 @@ pub(crate) fn deserialize_result( match tag { 0 => Ok(content), 1 => Err(content), - _ => { - return Err(E::from_server_fn_error( - ServerFnErrorErr::Deserialization("Invalid data tag".into()), - ) - .ser()) - } // Invalid tag + _ => Err(E::from_server_fn_error(ServerFnErrorErr::Deserialization( + "Invalid data tag".into(), + )) + .ser()), // Invalid tag } } diff --git a/server_fn/src/request/axum.rs b/server_fn/src/request/axum.rs index 5f2e4e64b..f54de3b15 100644 --- a/server_fn/src/request/axum.rs +++ b/server_fn/src/request/axum.rs @@ -159,7 +159,6 @@ where } Ok(_other) => {} Err(e) => { - println!("2"); _ = outgoing_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::Response(e.to_string())).ser())); } } From 4d2010576043297647fc3057834a949ca1e53af2 Mon Sep 17 00:00:00 2001 From: Saber Haj Rabiee Date: Tue, 15 Apr 2025 09:23:32 -0700 Subject: [PATCH 4/7] fix: server_fn_axum example --- examples/server_fns_axum/src/app.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/server_fns_axum/src/app.rs b/examples/server_fns_axum/src/app.rs index e5561c2df..3c8050253 100644 --- a/examples/server_fns_axum/src/app.rs +++ b/examples/server_fns_axum/src/app.rs @@ -945,9 +945,7 @@ pub fn CustomClientExample() -> impl IntoView { Item = Result, > + Send + 'static, - impl Sink> - + Send - + 'static, + impl Sink + Send + 'static, ), E, >, From de3a55820326a1d45031421646407ada7236e4ed Mon Sep 17 00:00:00 2001 From: Saber Haj Rabiee Date: Tue, 15 Apr 2025 09:24:43 -0700 Subject: [PATCH 5/7] fix: make de/serialize_result functions public --- server_fn/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server_fn/src/lib.rs b/server_fn/src/lib.rs index 4f64e5b92..66f94932c 100644 --- a/server_fn/src/lib.rs +++ b/server_fn/src/lib.rs @@ -747,7 +747,7 @@ where /// - Tag 0: Ok variant /// - Tag 1: Err variant #[allow(dead_code)] -pub(crate) fn serialize_result(result: Result) -> Bytes { +pub fn serialize_result(result: Result) -> Bytes { match result { Ok(bytes) => { let mut buf = BytesMut::with_capacity(1 + bytes.len()); @@ -766,7 +766,7 @@ pub(crate) fn serialize_result(result: Result) -> Bytes { /// Deserializes a Bytes instance back into a Result. #[allow(dead_code)] -pub(crate) fn deserialize_result( +pub fn deserialize_result( bytes: Bytes, ) -> Result { if bytes.is_empty() { From 078c252e2eb0dcb7994eef6139c837f40e826f4d Mon Sep 17 00:00:00 2001 From: Saber Haj Rabiee Date: Thu, 17 Apr 2025 03:38:04 -0700 Subject: [PATCH 6/7] fix: make websocket result ser/de private --- server_fn/src/client.rs | 16 +++------- server_fn/src/lib.rs | 56 ++++++++++++++++++++-------------- server_fn/src/request/actix.rs | 4 +-- server_fn/src/request/axum.rs | 4 +-- 4 files changed, 42 insertions(+), 38 deletions(-) diff --git a/server_fn/src/client.rs b/server_fn/src/client.rs index 34766aa86..5360722fe 100644 --- a/server_fn/src/client.rs +++ b/server_fn/src/client.rs @@ -132,14 +132,10 @@ pub mod browser { let (sink, stream) = websocket.split(); let stream = stream.map(|message| match message { - Ok(message) => { - crate::deserialize_result::( - match message { - Message::Text(text) => Bytes::from(text), - Message::Bytes(bytes) => Bytes::from(bytes), - }, - ) - } + Ok(message) => Ok(match message { + Message::Text(text) => Bytes::from(text), + Message::Bytes(bytes) => Bytes::from(bytes), + }), Err(err) => { web_sys::console::error_1(&err.to_string().into()); Err(OutputStreamError::from_server_fn_error( @@ -281,9 +277,7 @@ pub mod reqwest { Ok(( read.map(|msg| match msg { - Ok(msg) => crate::deserialize_result::( - msg.into_data(), - ), + Ok(msg) => Ok(msg.into_data()), Err(e) => Err(OutputStreamError::from_server_fn_error( ServerFnErrorErr::Request(e.to_string()), ) diff --git a/server_fn/src/lib.rs b/server_fn/src/lib.rs index 66f94932c..c1613c0c1 100644 --- a/server_fn/src/lib.rs +++ b/server_fn/src/lib.rs @@ -636,15 +636,19 @@ where { let (request_bytes, response_stream, response) = request.try_into_websocket().await?; - let input = request_bytes.map(|request_bytes| match request_bytes { - Ok(request_bytes) => { - InputEncoding::decode(request_bytes).map_err(|e| { - InputStreamError::from_server_fn_error( - ServerFnErrorErr::Deserialization(e.to_string()), - ) - }) + let input = request_bytes.map(|request_bytes| { + let request_bytes = request_bytes + .map(|bytes| deserialize_result::(bytes)) + .unwrap_or_else(Err); + match request_bytes { + Ok(request_bytes) => InputEncoding::decode(request_bytes) + .map_err(|e| { + InputStreamError::from_server_fn_error( + ServerFnErrorErr::Deserialization(e.to_string()), + ) + }), + Err(err) => Err(InputStreamError::de(err)), } - Err(err) => Err(InputStreamError::de(err)), }); let boxed = Box::pin(input) as Pin< @@ -720,14 +724,21 @@ where }); // Return the output stream - let stream = stream.map(|request_bytes| match request_bytes { - Ok(request_bytes) => OutputEncoding::decode(request_bytes) - .map_err(|e| { - OutputStreamError::from_server_fn_error( - ServerFnErrorErr::Deserialization(e.to_string()), - ) - }), - Err(err) => Err(OutputStreamError::de(err)), + let stream = stream.map(|request_bytes| { + let request_bytes = request_bytes + .map(|bytes| deserialize_result::(bytes)) + .unwrap_or_else(Err); + match request_bytes { + Ok(request_bytes) => OutputEncoding::decode(request_bytes) + .map_err(|e| { + OutputStreamError::from_server_fn_error( + ServerFnErrorErr::Deserialization( + e.to_string(), + ), + ) + }), + Err(err) => Err(OutputStreamError::de(err)), + } }); let boxed = Box::pin(stream) as Pin< @@ -742,12 +753,11 @@ where } } -/// Serializes a Result into a single Bytes instance. -/// Format: [tag: u8][content: Bytes] -/// - Tag 0: Ok variant -/// - Tag 1: Err variant -#[allow(dead_code)] -pub fn serialize_result(result: Result) -> Bytes { +// Serializes a Result into a single Bytes instance. +// Format: [tag: u8][content: Bytes] +// - Tag 0: Ok variant +// - Tag 1: Err variant +fn serialize_result(result: Result) -> Bytes { match result { Ok(bytes) => { let mut buf = BytesMut::with_capacity(1 + bytes.len()); @@ -766,7 +776,7 @@ pub fn serialize_result(result: Result) -> Bytes { /// Deserializes a Bytes instance back into a Result. #[allow(dead_code)] -pub fn deserialize_result( +fn deserialize_result( bytes: Bytes, ) -> Result { if bytes.is_empty() { diff --git a/server_fn/src/request/actix.rs b/server_fn/src/request/actix.rs index a8f25eff7..41764633d 100644 --- a/server_fn/src/request/actix.rs +++ b/server_fn/src/request/actix.rs @@ -159,11 +159,11 @@ where Ok(Message::Binary(bytes)) => { _ = response_stream_tx .start_send( - crate::deserialize_result::(bytes), + Ok(bytes), ); } Ok(Message::Text(text)) => { - _ = response_stream_tx.start_send(crate::deserialize_result::(text.into_bytes())); + _ = response_stream_tx.start_send(Ok(text.into_bytes())); } Ok(_other) => { } diff --git a/server_fn/src/request/axum.rs b/server_fn/src/request/axum.rs index f54de3b15..1e6471ff6 100644 --- a/server_fn/src/request/axum.rs +++ b/server_fn/src/request/axum.rs @@ -146,11 +146,11 @@ where Ok(Message::Binary(bytes)) => { _ = outgoing_tx .start_send( - crate::deserialize_result::(bytes), + Ok(bytes), ); } Ok(Message::Text(text)) => { - _ = outgoing_tx.start_send(crate::deserialize_result::(Bytes::from(text))); + _ = outgoing_tx.start_send(Ok(Bytes::from(text))); } Ok(Message::Ping(bytes)) => { if session.send(Message::Pong(bytes)).await.is_err() { From c743f0641ceb0e32b36044a1faf2a19d908e25b6 Mon Sep 17 00:00:00 2001 From: Saber Haj Rabiee Date: Thu, 17 Apr 2025 06:45:40 -0700 Subject: [PATCH 7/7] chore: make the doc a comment and remove allow dead_code --- server_fn/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server_fn/src/lib.rs b/server_fn/src/lib.rs index c1613c0c1..c96b9a21c 100644 --- a/server_fn/src/lib.rs +++ b/server_fn/src/lib.rs @@ -774,8 +774,7 @@ fn serialize_result(result: Result) -> Bytes { } } -/// Deserializes a Bytes instance back into a Result. -#[allow(dead_code)] +// Deserializes a Bytes instance back into a Result. fn deserialize_result( bytes: Bytes, ) -> Result {