feat: allow using different error types for req/resp with WebSockets, closes #3724 (#3766)

* feat: allow using different error types for request/response with WebSocket

* [autofix.ci] apply automated fixes

* chore: clean up merge issues

* chore: fix custom client example

* we can't use nightly features on stable

* update flake inputs

* include gcc and glib to flake dev shell

* update expected stderr outputs server_fn/tests/invalid

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Greg Johnston <greg.johnston@gmail.com>
This commit is contained in:
Mykyta
2025-04-09 20:03:47 +03:00
committed by GitHub
parent 8bc4fd4198
commit ba7bfd8bac
15 changed files with 544 additions and 189 deletions

View File

@@ -885,9 +885,11 @@ pub fn CustomClientExample() -> impl IntoView {
pub struct CustomClient; pub struct CustomClient;
// Implement the `Client` trait for it. // Implement the `Client` trait for it.
impl<E> Client<E> for CustomClient impl<E, IS, OS> Client<E, IS, OS> for CustomClient
where where
E: FromServerFnError, E: FromServerFnError,
IS: FromServerFnError,
OS: FromServerFnError,
{ {
// BrowserRequest and BrowserResponse are the defaults used by other server functions. // BrowserRequest and BrowserResponse are the defaults used by other server functions.
// They are wrappers for the underlying Web Fetch API types. // They are wrappers for the underlying Web Fetch API types.
@@ -904,7 +906,7 @@ pub fn CustomClientExample() -> impl IntoView {
// modify the headers by appending one // modify the headers by appending one
headers.append("X-Custom-Header", "foobar"); headers.append("X-Custom-Header", "foobar");
// delegate back out to BrowserClient to send the modified request // delegate back out to BrowserClient to send the modified request
BrowserClient::send(req) <BrowserClient as Client<E, IS, OS>>::send(req)
} }
fn open_websocket( fn open_websocket(
@@ -912,8 +914,10 @@ pub fn CustomClientExample() -> impl IntoView {
) -> impl Future< ) -> impl Future<
Output = Result< Output = Result<
( (
impl Stream<Item = Result<server_fn::Bytes, E>> + Send + 'static, impl Stream<Item = Result<server_fn::Bytes, OS>>
impl Sink<Result<server_fn::Bytes, E>> + Send + 'static, + Send
+ 'static,
impl Sink<Result<server_fn::Bytes, IS>> + Send + 'static,
), ),
E, E,
>, >,

24
flake.lock generated
View File

@@ -5,11 +5,11 @@
"systems": "systems" "systems": "systems"
}, },
"locked": { "locked": {
"lastModified": 1726560853, "lastModified": 1731533236,
"narHash": "sha256-X6rJYSESBVr3hBoH0WbKE5KvhPU5bloyZ2L4K60/fPQ=", "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
"owner": "numtide", "owner": "numtide",
"repo": "flake-utils", "repo": "flake-utils",
"rev": "c1dfcf08411b08f6b8615f7d8971a2bfa81d5e8a", "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
"type": "github" "type": "github"
}, },
"original": { "original": {
@@ -20,11 +20,11 @@
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1727634051, "lastModified": 1743583204,
"narHash": "sha256-S5kVU7U82LfpEukbn/ihcyNt2+EvG7Z5unsKW9H/yFA=", "narHash": "sha256-F7n4+KOIfWrwoQjXrL2wD9RhFYLs2/GGe/MQY1sSdlE=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "06cf0e1da4208d3766d898b7fdab6513366d45b9", "rev": "2c8d3f48d33929642c1c12cd243df4cc7d2ce434",
"type": "github" "type": "github"
}, },
"original": { "original": {
@@ -36,11 +36,11 @@
}, },
"nixpkgs_2": { "nixpkgs_2": {
"locked": { "locked": {
"lastModified": 1718428119, "lastModified": 1736320768,
"narHash": "sha256-WdWDpNaq6u1IPtxtYHHWpl5BmabtpmLnMAx0RdJ/vo8=", "narHash": "sha256-nIYdTAiKIGnFNugbomgBJR+Xv5F1ZQU+HfaBqJKroC0=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "e6cea36f83499eb4e9cd184c8a8e823296b50ad5", "rev": "4bc9c909d9ac828a039f288cf872d16d38185db8",
"type": "github" "type": "github"
}, },
"original": { "original": {
@@ -62,11 +62,11 @@
"nixpkgs": "nixpkgs_2" "nixpkgs": "nixpkgs_2"
}, },
"locked": { "locked": {
"lastModified": 1727749966, "lastModified": 1743820323,
"narHash": "sha256-DUS8ehzqB1DQzfZ4bRXVSollJhu+y7cvh1DJ9mbWebE=", "narHash": "sha256-UXxJogXhPhBFaX4uxmMudcD/x3sEGFtoSc4busTcftY=",
"owner": "oxalica", "owner": "oxalica",
"repo": "rust-overlay", "repo": "rust-overlay",
"rev": "00decf1b4f9886d25030b9ee4aed7bfddccb5f66", "rev": "b4734ce867252f92cdc7d25f8cc3b7cef153e703",
"type": "github" "type": "github"
}, },
"original": { "original": {

View File

@@ -7,8 +7,15 @@
flake-utils.url = "github:numtide/flake-utils"; flake-utils.url = "github:numtide/flake-utils";
}; };
outputs = { self, nixpkgs, rust-overlay, flake-utils, ... }: outputs =
flake-utils.lib.eachDefaultSystem (system: {
nixpkgs,
rust-overlay,
flake-utils,
...
}:
flake-utils.lib.eachDefaultSystem (
system:
let let
overlays = [ (import rust-overlay) ]; overlays = [ (import rust-overlay) ];
pkgs = import nixpkgs { pkgs = import nixpkgs {
@@ -18,22 +25,31 @@
with pkgs; with pkgs;
{ {
devShells.default = mkShell { devShells.default = mkShell {
buildInputs = [ buildInputs =
[
gcc
glib
openssl openssl
pkg-config pkg-config
cacert cacert
cargo-make cargo-make
trunk trunk
(rust-bin.selectLatestNightlyWith( toolchain: toolchain.default.override { (rust-bin.selectLatestNightlyWith (
extensions= [ "rust-src" "rust-analyzer" ]; toolchain:
toolchain.default.override {
extensions = [
"rust-src"
"rust-analyzer"
];
targets = [ "wasm32-unknown-unknown" ]; targets = [ "wasm32-unknown-unknown" ];
})) }
] ++ pkgs.lib.optionals pkg.stdenv.isDarwin [ ))
]
++ pkgs.lib.optionals pkg.stdenv.isDarwin [
darwin.apple_sdk.frameworks.SystemConfiguration darwin.apple_sdk.frameworks.SystemConfiguration
]; ];
shellHook = '' shellHook = '''';
'';
}; };
} }
); );

View File

@@ -97,6 +97,7 @@ where
ServFn: Send + Sync + 'static, ServFn: Send + Sync + 'static,
ServFn::Output: Send + Sync + 'static, ServFn::Output: Send + Sync + 'static,
ServFn::Error: Send + Sync + 'static, ServFn::Error: Send + Sync + 'static,
<ServFn as ServerFn>::Client: Client<<ServFn as ServerFn>::Error>,
{ {
// if redirect hook has not yet been set (by a router), defaults to a browser redirect // if redirect hook has not yet been set (by a router), defaults to a browser redirect
_ = server_fn::redirect::set_redirect_hook(|loc: &str| { _ = server_fn::redirect::set_redirect_hook(|loc: &str| {
@@ -172,6 +173,7 @@ where
ServFn::Error, ServFn::Error,
>>::FormData: From<FormData>, >>::FormData: From<FormData>,
ServFn::Error: Send + Sync + 'static, ServFn::Error: Send + Sync + 'static,
<ServFn as ServerFn>::Client: Client<<ServFn as ServerFn>::Error>,
{ {
// if redirect hook has not yet been set (by a router), defaults to a browser redirect // if redirect hook has not yet been set (by a router), defaults to a browser redirect
_ = server_fn::redirect::set_redirect_hook(|loc: &str| { _ = server_fn::redirect::set_redirect_hook(|loc: &str| {

View File

@@ -23,16 +23,16 @@ pub fn get_server_url() -> &'static str {
/// This trait is implemented for things like a browser `fetch` request or for /// This trait is implemented for things like a browser `fetch` request or for
/// the `reqwest` trait. It should almost never be necessary to implement it /// the `reqwest` trait. It should almost never be necessary to implement it
/// yourself, unless youre trying to use an alternative HTTP crate on the client side. /// yourself, unless youre trying to use an alternative HTTP crate on the client side.
pub trait Client<E> { pub trait Client<Error, InputStreamError = Error, OutputStreamError = Error> {
/// The type of a request sent by this client. /// The type of a request sent by this client.
type Request: ClientReq<E> + Send + 'static; type Request: ClientReq<Error> + Send + 'static;
/// The type of a response received by this client. /// The type of a response received by this client.
type Response: ClientRes<E> + Send + 'static; type Response: ClientRes<Error> + Send + 'static;
/// Sends the request and receives a response. /// Sends the request and receives a response.
fn send( fn send(
req: Self::Request, req: Self::Request,
) -> impl Future<Output = Result<Self::Response, E>> + Send; ) -> impl Future<Output = Result<Self::Response, Error>> + Send;
/// Opens a websocket connection to the server. /// Opens a websocket connection to the server.
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
@@ -41,10 +41,12 @@ pub trait Client<E> {
) -> impl Future< ) -> impl Future<
Output = Result< Output = Result<
( (
impl Stream<Item = Result<Bytes, E>> + Send + 'static, impl Stream<Item = Result<Bytes, OutputStreamError>>
impl Sink<Result<Bytes, E>> + Send + 'static, + Send
+ 'static,
impl Sink<Result<Bytes, InputStreamError>> + Send + 'static,
), ),
E, Error,
>, >,
> + Send; > + Send;
@@ -70,13 +72,19 @@ pub mod browser {
/// Implements [`Client`] for a `fetch` request in the browser. /// Implements [`Client`] for a `fetch` request in the browser.
pub struct BrowserClient; pub struct BrowserClient;
impl<E: FromServerFnError> Client<E> for BrowserClient { impl<
Error: FromServerFnError,
InputStreamError: FromServerFnError,
OutputStreamError: FromServerFnError,
> Client<Error, InputStreamError, OutputStreamError> for BrowserClient
{
type Request = BrowserRequest; type Request = BrowserRequest;
type Response = BrowserResponse; type Response = BrowserResponse;
fn send( fn send(
req: Self::Request, req: Self::Request,
) -> impl Future<Output = Result<Self::Response, E>> + Send { ) -> impl Future<Output = Result<Self::Response, Error>> + Send
{
SendWrapper::new(async move { SendWrapper::new(async move {
let req = req.0.take(); let req = req.0.take();
let RequestInner { let RequestInner {
@@ -106,10 +114,14 @@ pub mod browser {
) -> impl Future< ) -> impl Future<
Output = Result< Output = Result<
( (
impl futures::Stream<Item = Result<Bytes, E>> + Send + 'static, impl futures::Stream<Item = Result<Bytes, OutputStreamError>>
impl futures::Sink<Result<Bytes, E>> + Send + 'static, + Send
+ 'static,
impl futures::Sink<Result<Bytes, InputStreamError>>
+ Send
+ 'static,
), ),
E, Error,
>, >,
> + Send { > + Send {
SendWrapper::new(async move { SendWrapper::new(async move {
@@ -117,18 +129,18 @@ pub mod browser {
gloo_net::websocket::futures::WebSocket::open(url) gloo_net::websocket::futures::WebSocket::open(url)
.map_err(|err| { .map_err(|err| {
web_sys::console::error_1(&err.to_string().into()); web_sys::console::error_1(&err.to_string().into());
E::from_server_fn_error(ServerFnErrorErr::Request( Error::from_server_fn_error(
err.to_string(), ServerFnErrorErr::Request(err.to_string()),
)) )
})?; })?;
let (sink, stream) = websocket.split(); let (sink, stream) = websocket.split();
let stream = stream let stream = stream
.map_err(|err| { .map_err(|err| {
web_sys::console::error_1(&err.to_string().into()); web_sys::console::error_1(&err.to_string().into());
E::from_server_fn_error(ServerFnErrorErr::Request( OutputStreamError::from_server_fn_error(
err.to_string(), ServerFnErrorErr::Request(err.to_string()),
)) )
}) })
.map_ok(move |msg| match msg { .map_ok(move |msg| match msg {
Message::Text(text) => Bytes::from(text), Message::Text(text) => Bytes::from(text),
@@ -186,22 +198,26 @@ pub mod browser {
} }
} }
let sink = sink.with(|message: Result<Bytes, E>| async move { let sink = sink.with(
|message: Result<Bytes, InputStreamError>| async move {
match message { match message {
Ok(message) => Ok(Message::Bytes(message.into())), Ok(message) => Ok(Message::Bytes(message.into())),
Err(err) => { Err(err) => {
web_sys::console::error_1(&js_sys::JsString::from( web_sys::console::error_1(
err.ser(), &js_sys::JsString::from(err.ser()),
)); );
const CLOSE_CODE_ERROR: u16 = 1011; const CLOSE_CODE_ERROR: u16 = 1011;
Err(WebSocketError::ConnectionClose(CloseEvent { Err(WebSocketError::ConnectionClose(
CloseEvent {
code: CLOSE_CODE_ERROR, code: CLOSE_CODE_ERROR,
reason: err.ser(), reason: err.ser(),
was_clean: true, was_clean: true,
})) },
))
} }
} }
}); },
);
let sink = SendWrapperSink::new(Box::pin(sink)); let sink = SendWrapperSink::new(Box::pin(sink));
Ok((stream, sink)) Ok((stream, sink))

View File

@@ -174,9 +174,13 @@ pub use xxhash_rust;
type ServerFnServerRequest<Fn> = <<Fn as ServerFn>::Server as crate::Server< type ServerFnServerRequest<Fn> = <<Fn as ServerFn>::Server as crate::Server<
<Fn as ServerFn>::Error, <Fn as ServerFn>::Error,
<Fn as ServerFn>::InputStreamError,
<Fn as ServerFn>::OutputStreamError,
>>::Request; >>::Request;
type ServerFnServerResponse<Fn> = <<Fn as ServerFn>::Server as crate::Server< type ServerFnServerResponse<Fn> = <<Fn as ServerFn>::Server as crate::Server<
<Fn as ServerFn>::Error, <Fn as ServerFn>::Error,
<Fn as ServerFn>::InputStreamError,
<Fn as ServerFn>::OutputStreamError,
>>::Response; >>::Response;
/// Defines a function that runs only on the server, but can be called from the server or the client. /// Defines a function that runs only on the server, but can be called from the server or the client.
@@ -215,12 +219,20 @@ pub trait ServerFn: Send + Sized {
/// The type of the HTTP client that will send the request from the client side. /// The type of the HTTP client that will send the request from the client side.
/// ///
/// For example, this might be `gloo-net` in the browser, or `reqwest` for a desktop app. /// For example, this might be `gloo-net` in the browser, or `reqwest` for a desktop app.
type Client: Client<Self::Error>; type Client: Client<
Self::Error,
Self::InputStreamError,
Self::OutputStreamError,
>;
/// The type of the HTTP server that will send the response from the server side. /// The type of the HTTP server that will send the response from the server side.
/// ///
/// For example, this might be `axum` or `actix-web`. /// For example, this might be `axum` or `actix-web`.
type Server: Server<Self::Error>; type Server: Server<
Self::Error,
Self::InputStreamError,
Self::OutputStreamError,
>;
/// The protocol the server function uses to communicate with the client. /// The protocol the server function uses to communicate with the client.
type Protocol: Protocol< type Protocol: Protocol<
@@ -229,6 +241,8 @@ pub trait ServerFn: Send + Sized {
Self::Client, Self::Client,
Self::Server, Self::Server,
Self::Error, Self::Error,
Self::InputStreamError,
Self::OutputStreamError,
>; >;
/// The return type of the server function. /// The return type of the server function.
@@ -237,8 +251,15 @@ pub trait ServerFn: Send + Sized {
/// *from* `ClientResponse` when received by the client. /// *from* `ClientResponse` when received by the client.
type Output: Send; type Output: Send;
/// The type of the error on the server function. Typically [`ServerFnError`], but allowed to be any type that implements [`FromServerFnError`]. /// The type of error in the server function return.
/// Typically [`ServerFnError`], but allowed to be any type that implements [`FromServerFnError`].
type Error: FromServerFnError + Send + Sync; type Error: FromServerFnError + Send + Sync;
/// The type of error in the server function for stream items sent from the client to the server.
/// Typically [`ServerFnError`], but allowed to be any type that implements [`FromServerFnError`].
type InputStreamError: FromServerFnError + Send + Sync;
/// The type of error in the server function for stream items sent from the server to the client.
/// Typically [`ServerFnError`], but allowed to be any type that implements [`FromServerFnError`].
type OutputStreamError: FromServerFnError + Send + Sync;
/// Returns [`Self::PATH`]. /// Returns [`Self::PATH`].
fn url() -> &'static str { fn url() -> &'static str {
@@ -288,6 +309,8 @@ pub trait ServerFn: Send + Sized {
( (
<<Self as ServerFn>::Server as crate::Server< <<Self as ServerFn>::Server as crate::Server<
Self::Error, Self::Error,
Self::InputStreamError,
Self::OutputStreamError,
>>::Response::error_response( >>::Response::error_response(
Self::PATH, e.ser() Self::PATH, e.ser()
), ),
@@ -331,10 +354,17 @@ pub trait ServerFn: Send + Sized {
/// The protocol that a server function uses to communicate with the client. This trait handles /// The protocol that a server function uses to communicate with the client. This trait handles
/// the server and client side of running a server function. It is implemented for the [`Http`] and /// the server and client side of running a server function. It is implemented for the [`Http`] and
/// [`Websocket`] protocols and can be used to implement custom protocols. /// [`Websocket`] protocols and can be used to implement custom protocols.
pub trait Protocol<Input, Output, Client, Server, E> pub trait Protocol<
where Input,
Server: crate::Server<E>, Output,
Client: crate::Client<E>, Client,
Server,
Error,
InputStreamError = Error,
OutputStreamError = Error,
> where
Server: crate::Server<Error, InputStreamError, OutputStreamError>,
Client: crate::Client<Error, InputStreamError, OutputStreamError>,
{ {
/// The HTTP method used for requests. /// The HTTP method used for requests.
const METHOD: Method; const METHOD: Method;
@@ -344,17 +374,17 @@ where
fn run_server<F, Fut>( fn run_server<F, Fut>(
request: Server::Request, request: Server::Request,
server_fn: F, server_fn: F,
) -> impl Future<Output = Result<Server::Response, E>> + Send ) -> impl Future<Output = Result<Server::Response, Error>> + Send
where where
F: Fn(Input) -> Fut + Send, F: Fn(Input) -> Fut + Send,
Fut: Future<Output = Result<Output, E>> + Send; Fut: Future<Output = Result<Output, Error>> + Send;
/// Run the server function on the client. The implementation should handle serializing the /// Run the server function on the client. The implementation should handle serializing the
/// input, sending the request, and deserializing the output. /// input, sending the request, and deserializing the output.
fn run_client( fn run_client(
path: &str, path: &str,
input: Input, input: Input,
) -> impl Future<Output = Result<Output, E>> + Send; ) -> impl Future<Output = Result<Output, Error>> + Send;
} }
/// The http protocol with specific input and output encodings for the request and response. This is /// The http protocol with specific input and output encodings for the request and response. This is
@@ -561,18 +591,30 @@ impl<
OutputEncoding, OutputEncoding,
Client, Client,
Server, Server,
E, Error,
> Protocol<Input, BoxedStream<OutputItem, E>, Client, Server, E> InputStreamError,
for Websocket<InputEncoding, OutputEncoding> OutputStreamError,
>
Protocol<
Input,
BoxedStream<OutputItem, OutputStreamError>,
Client,
Server,
Error,
InputStreamError,
OutputStreamError,
> for Websocket<InputEncoding, OutputEncoding>
where where
Input: Deref<Target = BoxedStream<InputItem, E>> Input: Deref<Target = BoxedStream<InputItem, InputStreamError>>
+ Into<BoxedStream<InputItem, E>> + Into<BoxedStream<InputItem, InputStreamError>>
+ From<BoxedStream<InputItem, E>>, + From<BoxedStream<InputItem, InputStreamError>>,
InputEncoding: Encodes<InputItem> + Decodes<InputItem>, InputEncoding: Encodes<InputItem> + Decodes<InputItem>,
OutputEncoding: Encodes<OutputItem> + Decodes<OutputItem>, OutputEncoding: Encodes<OutputItem> + Decodes<OutputItem>,
Server: crate::Server<E>, InputStreamError: FromServerFnError + Send,
E: FromServerFnError + Send, OutputStreamError: FromServerFnError + Send,
Client: crate::Client<E>, Error: FromServerFnError + Send,
Server: crate::Server<Error, InputStreamError, OutputStreamError>,
Client: crate::Client<Error, InputStreamError, OutputStreamError>,
OutputItem: Send + 'static, OutputItem: Send + 'static,
InputItem: Send + 'static, InputItem: Send + 'static,
{ {
@@ -581,34 +623,44 @@ where
async fn run_server<F, Fut>( async fn run_server<F, Fut>(
request: Server::Request, request: Server::Request,
server_fn: F, server_fn: F,
) -> Result<Server::Response, E> ) -> Result<Server::Response, Error>
where where
F: Fn(Input) -> Fut + Send, F: Fn(Input) -> Fut + Send,
Fut: Future<Output = Result<BoxedStream<OutputItem, E>, E>> + Send, Fut: Future<
Output = Result<
BoxedStream<OutputItem, OutputStreamError>,
Error,
>,
> + Send,
{ {
let (request_bytes, response_stream, response) = let (request_bytes, response_stream, response) =
request.try_into_websocket().await?; request.try_into_websocket().await?;
let input = request_bytes.map(|request_bytes| match request_bytes { let input = request_bytes.map(|request_bytes| match request_bytes {
Ok(request_bytes) => { Ok(request_bytes) => {
InputEncoding::decode(request_bytes).map_err(|e| { InputEncoding::decode(request_bytes).map_err(|e| {
E::from_server_fn_error(ServerFnErrorErr::Deserialization( InputStreamError::from_server_fn_error(
e.to_string(), ServerFnErrorErr::Deserialization(e.to_string()),
)) )
}) })
} }
Err(err) => Err(err), Err(err) => Err(err),
}); });
let boxed = Box::pin(input) let boxed = Box::pin(input)
as Pin<Box<dyn Stream<Item = Result<InputItem, E>> + Send>>; as Pin<
Box<
dyn Stream<Item = Result<InputItem, InputStreamError>>
+ Send,
>,
>;
let input = BoxedStream { stream: boxed }; let input = BoxedStream { stream: boxed };
let output = server_fn(input.into()).await?; let output = server_fn(input.into()).await?;
let output = output.stream.map(|output| match output { let output = output.stream.map(|output| match output {
Ok(output) => OutputEncoding::encode(output).map_err(|e| { Ok(output) => OutputEncoding::encode(output).map_err(|e| {
E::from_server_fn_error(ServerFnErrorErr::Serialization( OutputStreamError::from_server_fn_error(
e.to_string(), ServerFnErrorErr::Serialization(e.to_string()),
)) )
}), }),
Err(err) => Err(err), Err(err) => Err(err),
}); });
@@ -629,8 +681,9 @@ where
fn run_client( fn run_client(
path: &str, path: &str,
input: Input, input: Input,
) -> impl Future<Output = Result<BoxedStream<OutputItem, E>, E>> + Send ) -> impl Future<
{ Output = Result<BoxedStream<OutputItem, OutputStreamError>, Error>,
> + Send {
let input = input.into(); let input = input.into();
async move { async move {
@@ -644,7 +697,7 @@ where
if sink if sink
.send(input.and_then(|input| { .send(input.and_then(|input| {
InputEncoding::encode(input).map_err(|e| { InputEncoding::encode(input).map_err(|e| {
E::from_server_fn_error( InputStreamError::from_server_fn_error(
ServerFnErrorErr::Serialization( ServerFnErrorErr::Serialization(
e.to_string(), e.to_string(),
), ),
@@ -663,14 +716,19 @@ where
let stream = stream.map(|request_bytes| match request_bytes { let stream = stream.map(|request_bytes| match request_bytes {
Ok(request_bytes) => OutputEncoding::decode(request_bytes) Ok(request_bytes) => OutputEncoding::decode(request_bytes)
.map_err(|e| { .map_err(|e| {
E::from_server_fn_error( OutputStreamError::from_server_fn_error(
ServerFnErrorErr::Deserialization(e.to_string()), ServerFnErrorErr::Deserialization(e.to_string()),
) )
}), }),
Err(err) => Err(err), Err(err) => Err(err),
}); });
let boxed = Box::pin(stream) let boxed = Box::pin(stream)
as Pin<Box<dyn Stream<Item = Result<OutputItem, E>> + Send>>; as Pin<
Box<
dyn Stream<Item = Result<OutputItem, OutputStreamError>>
+ Send,
>,
>;
let output = BoxedStream { stream: boxed }; let output = BoxedStream { stream: boxed };
Ok(output) Ok(output)
} }
@@ -738,13 +796,25 @@ impl<Req, Res> ServerFnTraitObj<Req, Res> {
/// Converts the relevant parts of a server function into a trait object. /// Converts the relevant parts of a server function into a trait object.
pub const fn new< pub const fn new<
S: ServerFn< S: ServerFn<
Server: crate::Server<S::Error, Request = Req, Response = Res>, Server: crate::Server<
S::Error,
S::InputStreamError,
S::OutputStreamError,
Request = Req,
Response = Res,
>,
>, >,
>( >(
handler: fn(Req) -> Pin<Box<dyn Future<Output = Res> + Send>>, handler: fn(Req) -> Pin<Box<dyn Future<Output = Res> + Send>>,
) -> Self ) -> Self
where where
Req: crate::Req<S::Error, WebsocketResponse = Res> + Send + 'static, Req: crate::Req<
S::Error,
S::InputStreamError,
S::OutputStreamError,
WebsocketResponse = Res,
> + Send
+ 'static,
Res: crate::TryRes<S::Error> + Send + 'static, Res: crate::TryRes<S::Error> + Send + 'static,
{ {
Self { Self {
@@ -848,14 +918,21 @@ pub mod axum {
/// The axum server function backend /// The axum server function backend
pub struct AxumServerFnBackend; pub struct AxumServerFnBackend;
impl<E: FromServerFnError + Send + Sync> Server<E> for AxumServerFnBackend { impl<Error, InputStreamError, OutputStreamError>
Server<Error, InputStreamError, OutputStreamError>
for AxumServerFnBackend
where
Error: FromServerFnError + Send + Sync,
InputStreamError: FromServerFnError + Send + Sync,
OutputStreamError: FromServerFnError + Send + Sync,
{
type Request = Request<Body>; type Request = Request<Body>;
type Response = Response<Body>; type Response = Response<Body>;
#[allow(unused_variables)] #[allow(unused_variables)]
fn spawn( fn spawn(
future: impl Future<Output = ()> + Send + 'static, future: impl Future<Output = ()> + Send + 'static,
) -> Result<(), E> { ) -> Result<(), Error> {
#[cfg(feature = "axum")] #[cfg(feature = "axum")]
{ {
tokio::spawn(future); tokio::spawn(future);
@@ -863,7 +940,7 @@ pub mod axum {
} }
#[cfg(not(feature = "axum"))] #[cfg(not(feature = "axum"))]
{ {
Err(E::from_server_fn_error( Err(Error::from_server_fn_error(
crate::error::ServerFnErrorErr::Request( crate::error::ServerFnErrorErr::Request(
"No async runtime available. You need to either \ "No async runtime available. You need to either \
enable the full axum feature to pull in tokio, or \ enable the full axum feature to pull in tokio, or \
@@ -884,6 +961,8 @@ pub mod axum {
T: ServerFn< T: ServerFn<
Server: crate::Server< Server: crate::Server<
T::Error, T::Error,
T::InputStreamError,
T::OutputStreamError,
Request = Request<Body>, Request = Request<Body>,
Response = Response<Body>, Response = Response<Body>,
>, >,
@@ -966,13 +1045,20 @@ pub mod actix {
/// The actix server function backend /// The actix server function backend
pub struct ActixServerFnBackend; pub struct ActixServerFnBackend;
impl<E: FromServerFnError + Send + Sync> Server<E> for ActixServerFnBackend { impl<Error, InputStreamError, OutputStreamError>
Server<Error, InputStreamError, OutputStreamError>
for ActixServerFnBackend
where
Error: FromServerFnError + Send + Sync,
InputStreamError: FromServerFnError + Send + Sync,
OutputStreamError: FromServerFnError + Send + Sync,
{
type Request = ActixRequest; type Request = ActixRequest;
type Response = ActixResponse; type Response = ActixResponse;
fn spawn( fn spawn(
future: impl Future<Output = ()> + Send + 'static, future: impl Future<Output = ()> + Send + 'static,
) -> Result<(), E> { ) -> Result<(), Error> {
actix_web::rt::spawn(future); actix_web::rt::spawn(future);
Ok(()) Ok(())
} }
@@ -986,6 +1072,8 @@ pub mod actix {
T: ServerFn< T: ServerFn<
Server: crate::Server< Server: crate::Server<
T::Error, T::Error,
T::InputStreamError,
T::OutputStreamError,
Request = ActixRequest, Request = ActixRequest,
Response = ActixResponse, Response = ActixResponse,
>, >,
@@ -1075,13 +1163,20 @@ pub mod mock {
/// server type when compiling for the client. /// server type when compiling for the client.
pub struct BrowserMockServer; pub struct BrowserMockServer;
impl<E: Send + 'static> crate::server::Server<E> for BrowserMockServer { impl<Error, InputStreamError, OutputStreamError>
crate::server::Server<Error, InputStreamError, OutputStreamError>
for BrowserMockServer
where
Error: Send + 'static,
InputStreamError: Send + 'static,
OutputStreamError: Send + 'static,
{
type Request = crate::request::BrowserMockReq; type Request = crate::request::BrowserMockReq;
type Response = crate::response::BrowserMockRes; type Response = crate::response::BrowserMockRes;
fn spawn( fn spawn(
_: impl Future<Output = ()> + Send + 'static, _: impl Future<Output = ()> + Send + 'static,
) -> Result<(), E> { ) -> Result<(), Error> {
unreachable!() unreachable!()
} }
} }

View File

@@ -38,9 +38,12 @@ impl From<(HttpRequest, Payload)> for ActixRequest {
} }
} }
impl<E> Req<E> for ActixRequest impl<Error, InputStreamError, OutputStreamError>
Req<Error, InputStreamError, OutputStreamError> for ActixRequest
where where
E: FromServerFnError + Send, Error: FromServerFnError + Send,
InputStreamError: FromServerFnError + Send,
OutputStreamError: FromServerFnError + Send,
{ {
type WebsocketResponse = ActixResponse; type WebsocketResponse = ActixResponse;
@@ -60,7 +63,9 @@ where
self.header("Referer") self.header("Referer")
} }
fn try_into_bytes(self) -> impl Future<Output = Result<Bytes, E>> + Send { fn try_into_bytes(
self,
) -> impl Future<Output = Result<Bytes, Error>> + Send {
// Actix is going to keep this on a single thread anyway so it's fine to wrap it // Actix is going to keep this on a single thread anyway so it's fine to wrap it
// with SendWrapper, which makes it `Send` but will panic if it moves to another thread // with SendWrapper, which makes it `Send` but will panic if it moves to another thread
SendWrapper::new(async move { SendWrapper::new(async move {
@@ -72,18 +77,20 @@ where
}) })
} }
fn try_into_string(self) -> impl Future<Output = Result<String, E>> + Send { fn try_into_string(
self,
) -> impl Future<Output = Result<String, Error>> + Send {
// Actix is going to keep this on a single thread anyway so it's fine to wrap it // Actix is going to keep this on a single thread anyway so it's fine to wrap it
// with SendWrapper, which makes it `Send` but will panic if it moves to another thread // with SendWrapper, which makes it `Send` but will panic if it moves to another thread
SendWrapper::new(async move { SendWrapper::new(async move {
let payload = self.0.take().1; let payload = self.0.take().1;
let bytes = payload.to_bytes().await.map_err(|e| { let bytes = payload.to_bytes().await.map_err(|e| {
E::from_server_fn_error(ServerFnErrorErr::Deserialization( Error::from_server_fn_error(ServerFnErrorErr::Deserialization(
e.to_string(), e.to_string(),
)) ))
})?; })?;
String::from_utf8(bytes.into()).map_err(|e| { String::from_utf8(bytes.into()).map_err(|e| {
E::from_server_fn_error(ServerFnErrorErr::Deserialization( Error::from_server_fn_error(ServerFnErrorErr::Deserialization(
e.to_string(), e.to_string(),
)) ))
}) })
@@ -92,7 +99,7 @@ where
fn try_into_stream( fn try_into_stream(
self, self,
) -> Result<impl Stream<Item = Result<Bytes, E>> + Send, E> { ) -> Result<impl Stream<Item = Result<Bytes, Error>> + Send, Error> {
let payload = self.0.take().1; let payload = self.0.take().1;
let stream = payload.map(|res| { let stream = payload.map(|res| {
res.map_err(|e| { res.map_err(|e| {
@@ -107,16 +114,16 @@ where
self, self,
) -> Result< ) -> Result<
( (
impl Stream<Item = Result<Bytes, E>> + Send + 'static, impl Stream<Item = Result<Bytes, InputStreamError>> + Send + 'static,
impl futures::Sink<Result<Bytes, E>> + Send + 'static, impl futures::Sink<Result<Bytes, OutputStreamError>> + Send + 'static,
Self::WebsocketResponse, Self::WebsocketResponse,
), ),
E, Error,
> { > {
let (request, payload) = self.0.take(); let (request, payload) = self.0.take();
let (response, mut session, mut msg_stream) = let (response, mut session, mut msg_stream) =
actix_ws::handle(&request, payload).map_err(|e| { actix_ws::handle(&request, payload).map_err(|e| {
E::from_server_fn_error(ServerFnErrorErr::Request( Error::from_server_fn_error(ServerFnErrorErr::Request(
e.to_string(), e.to_string(),
)) ))
})?; })?;
@@ -124,7 +131,9 @@ where
let (mut response_stream_tx, response_stream_rx) = let (mut response_stream_tx, response_stream_rx) =
futures::channel::mpsc::channel(2048); futures::channel::mpsc::channel(2048);
let (response_sink_tx, mut response_sink_rx) = let (response_sink_tx, mut response_sink_rx) =
futures::channel::mpsc::channel(2048); futures::channel::mpsc::channel::<Result<Bytes, OutputStreamError>>(
2048,
);
actix_web::rt::spawn(async move { actix_web::rt::spawn(async move {
loop { loop {
@@ -136,11 +145,11 @@ where
match incoming { match incoming {
Ok(message) => { Ok(message) => {
if let Err(err) = session.binary(message).await { if let Err(err) = session.binary(message).await {
_ = response_stream_tx.start_send(Err(E::from_server_fn_error(ServerFnErrorErr::Request(err.to_string())))); _ = response_stream_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::Request(err.to_string()))));
} }
} }
Err(err) => { Err(err) => {
_ = response_stream_tx.start_send(Err(err)); _ = response_stream_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::ServerError(err.ser()))));
} }
} }
}, },
@@ -166,7 +175,7 @@ where
Ok(_other) => { Ok(_other) => {
} }
Err(e) => { Err(e) => {
_ = response_stream_tx.start_send(Err(E::from_server_fn_error(ServerFnErrorErr::Response(e.to_string())))); _ = response_stream_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::Response(e.to_string()))));
} }
} }
} }

View File

@@ -14,9 +14,12 @@ use http::{
use http_body_util::BodyExt; use http_body_util::BodyExt;
use std::borrow::Cow; use std::borrow::Cow;
impl<E> Req<E> for Request<Body> impl<Error, InputStreamError, OutputStreamError>
Req<Error, InputStreamError, OutputStreamError> for Request<Body>
where where
E: FromServerFnError + Send, Error: FromServerFnError + Send,
InputStreamError: FromServerFnError + Send,
OutputStreamError: FromServerFnError + Send,
{ {
type WebsocketResponse = Response; type WebsocketResponse = Response;
@@ -42,7 +45,7 @@ where
.map(|h| String::from_utf8_lossy(h.as_bytes())) .map(|h| String::from_utf8_lossy(h.as_bytes()))
} }
async fn try_into_bytes(self) -> Result<Bytes, E> { async fn try_into_bytes(self) -> Result<Bytes, Error> {
let (_parts, body) = self.into_parts(); let (_parts, body) = self.into_parts();
body.collect().await.map(|c| c.to_bytes()).map_err(|e| { body.collect().await.map(|c| c.to_bytes()).map_err(|e| {
@@ -50,8 +53,8 @@ where
}) })
} }
async fn try_into_string(self) -> Result<String, E> { async fn try_into_string(self) -> Result<String, Error> {
let bytes = self.try_into_bytes().await?; let bytes = Req::<Error>::try_into_bytes(self).await?;
String::from_utf8(bytes.to_vec()).map_err(|e| { String::from_utf8(bytes.to_vec()).map_err(|e| {
ServerFnErrorErr::Deserialization(e.to_string()).into_app_error() ServerFnErrorErr::Deserialization(e.to_string()).into_app_error()
}) })
@@ -59,7 +62,8 @@ where
fn try_into_stream( fn try_into_stream(
self, self,
) -> Result<impl Stream<Item = Result<Bytes, E>> + Send + 'static, E> { ) -> Result<impl Stream<Item = Result<Bytes, Error>> + Send + 'static, Error>
{
Ok(self.into_body().into_data_stream().map(|chunk| { Ok(self.into_body().into_data_stream().map(|chunk| {
chunk.map_err(|e| { chunk.map_err(|e| {
ServerFnErrorErr::Deserialization(e.to_string()) ServerFnErrorErr::Deserialization(e.to_string())
@@ -72,22 +76,24 @@ where
self, self,
) -> Result< ) -> Result<
( (
impl Stream<Item = Result<Bytes, E>> + Send + 'static, impl Stream<Item = Result<Bytes, InputStreamError>> + Send + 'static,
impl Sink<Result<Bytes, E>> + Send + 'static, impl Sink<Result<Bytes, OutputStreamError>> + Send + 'static,
Self::WebsocketResponse, Self::WebsocketResponse,
), ),
E, Error,
> { > {
#[cfg(not(feature = "axum"))] #[cfg(not(feature = "axum"))]
{ {
Err::< Err::<
( (
futures::stream::Once<std::future::Ready<Result<Bytes, E>>>, futures::stream::Once<
futures::sink::Drain<Result<Bytes, E>>, std::future::Ready<Result<Bytes, InputStreamError>>,
>,
futures::sink::Drain<Result<Bytes, OutputStreamError>>,
Self::WebsocketResponse, Self::WebsocketResponse,
), ),
_, Error,
>(E::from_server_fn_error( >(Error::from_server_fn_error(
crate::ServerFnErrorErr::Response( crate::ServerFnErrorErr::Response(
"Websocket connections not supported for Axum when the \ "Websocket connections not supported for Axum when the \
`axum` feature is not enabled on the `server_fn` crate." `axum` feature is not enabled on the `server_fn` crate."
@@ -104,19 +110,21 @@ where
axum::extract::ws::WebSocketUpgrade::from_request(self, &()) axum::extract::ws::WebSocketUpgrade::from_request(self, &())
.await .await
.map_err(|err| { .map_err(|err| {
E::from_server_fn_error(ServerFnErrorErr::Request( Error::from_server_fn_error(ServerFnErrorErr::Request(
err.to_string(), err.to_string(),
)) ))
})?; })?;
let (mut outgoing_tx, outgoing_rx) = let (mut outgoing_tx, outgoing_rx) =
futures::channel::mpsc::channel(2048); futures::channel::mpsc::channel(2048);
let (incoming_tx, mut incoming_rx) = let (incoming_tx, mut incoming_rx) =
futures::channel::mpsc::channel::<Result<Bytes, E>>(2048); futures::channel::mpsc::channel::<
Result<Bytes, OutputStreamError>,
>(2048);
let response = upgrade let response = upgrade
.on_failed_upgrade({ .on_failed_upgrade({
let mut outgoing_tx = outgoing_tx.clone(); let mut outgoing_tx = outgoing_tx.clone();
move |err: axum::Error| { move |err: axum::Error| {
_ = outgoing_tx.start_send(Err(E::from_server_fn_error(ServerFnErrorErr::Response(err.to_string())))); _ = outgoing_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::Response(err.to_string()))));
} }
}) })
.on_upgrade(|mut session| async move { .on_upgrade(|mut session| async move {
@@ -129,11 +137,11 @@ where
match incoming { match incoming {
Ok(message) => { Ok(message) => {
if let Err(err) = session.send(Message::Binary(message)).await { if let Err(err) = session.send(Message::Binary(message)).await {
_ = outgoing_tx.start_send(Err(E::from_server_fn_error(ServerFnErrorErr::Request(err.to_string())))); _ = outgoing_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::Request(err.to_string()))));
} }
} }
Err(err) => { Err(err) => {
_ = outgoing_tx.start_send(Err(err)); _ = outgoing_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::ServerError(err.ser()))));
} }
} }
}, },
@@ -153,7 +161,7 @@ where
} }
Ok(_other) => {} Ok(_other) => {}
Err(e) => { Err(e) => {
_ = outgoing_tx.start_send(Err(E::from_server_fn_error(ServerFnErrorErr::Response(e.to_string())))); _ = outgoing_tx.start_send(Err(InputStreamError::from_server_fn_error(ServerFnErrorErr::Response(e.to_string()))));
} }
} }
} }

View File

@@ -24,17 +24,20 @@ use futures::{
use http::{Request, Response}; use http::{Request, Response};
use std::borrow::Cow; use std::borrow::Cow;
impl<E> Req<E> for Request<Bytes> impl<Error, InputStreamError, OutputStreamError>
Req<Error, InputStreamError, OutputStreamError> for Request<Bytes>
where where
E: FromServerFnError + Send, Error: FromServerFnError + Send,
InputStreamError: FromServerFnError + Send,
OutputStreamError: FromServerFnError + Send,
{ {
type WebsocketResponse = Response<Bytes>; type WebsocketResponse = Response<Bytes>;
async fn try_into_bytes(self) -> Result<Bytes, E> { async fn try_into_bytes(self) -> Result<Bytes, Error> {
Ok(self.into_body()) Ok(self.into_body())
} }
async fn try_into_string(self) -> Result<String, E> { async fn try_into_string(self) -> Result<String, Error> {
String::from_utf8(self.into_body().into()).map_err(|err| { String::from_utf8(self.into_body().into()).map_err(|err| {
ServerFnErrorErr::Deserialization(err.to_string()).into_app_error() ServerFnErrorErr::Deserialization(err.to_string()).into_app_error()
}) })
@@ -42,7 +45,8 @@ where
fn try_into_stream( fn try_into_stream(
self, self,
) -> Result<impl Stream<Item = Result<Bytes, E>> + Send + 'static, E> { ) -> Result<impl Stream<Item = Result<Bytes, Error>> + Send + 'static, Error>
{
Ok(stream::iter(self.into_body()) Ok(stream::iter(self.into_body())
.ready_chunks(16) .ready_chunks(16)
.map(|chunk| Ok(Bytes::from(chunk)))) .map(|chunk| Ok(Bytes::from(chunk))))
@@ -74,21 +78,25 @@ where
self, self,
) -> Result< ) -> Result<
( (
impl Stream<Item = Result<Bytes, E>> + Send + 'static, impl Stream<Item = Result<Bytes, InputStreamError>> + Send + 'static,
impl Sink<Result<Bytes, E>> + Send + 'static, impl Sink<Result<Bytes, OutputStreamError>> + Send + 'static,
Self::WebsocketResponse, Self::WebsocketResponse,
), ),
E, Error,
> { > {
Err::< Err::<
( (
futures::stream::Once<std::future::Ready<Result<Bytes, E>>>, futures::stream::Once<
futures::sink::Drain<Result<Bytes, E>>, std::future::Ready<Result<Bytes, InputStreamError>>,
>,
futures::sink::Drain<Result<Bytes, OutputStreamError>>,
Self::WebsocketResponse, Self::WebsocketResponse,
), ),
_, _,
>(E::from_server_fn_error(crate::ServerFnErrorErr::Response( >(Error::from_server_fn_error(
crate::ServerFnErrorErr::Response(
"Websockets are not supported on this platform.".to_string(), "Websockets are not supported on this platform.".to_string(),
))) ),
))
} }
} }

View File

@@ -318,7 +318,7 @@ where
} }
/// Represents the request as received by the server. /// Represents the request as received by the server.
pub trait Req<E> pub trait Req<Error, InputStreamError = Error, OutputStreamError = Error>
where where
Self: Sized, Self: Sized,
{ {
@@ -338,15 +338,19 @@ where
fn referer(&self) -> Option<Cow<'_, str>>; fn referer(&self) -> Option<Cow<'_, str>>;
/// Attempts to extract the body of the request into [`Bytes`]. /// Attempts to extract the body of the request into [`Bytes`].
fn try_into_bytes(self) -> impl Future<Output = Result<Bytes, E>> + Send; fn try_into_bytes(
self,
) -> impl Future<Output = Result<Bytes, Error>> + Send;
/// Attempts to convert the body of the request into a string. /// Attempts to convert the body of the request into a string.
fn try_into_string(self) -> impl Future<Output = Result<String, E>> + Send; fn try_into_string(
self,
) -> impl Future<Output = Result<String, Error>> + Send;
/// Attempts to convert the body of the request into a stream of bytes. /// Attempts to convert the body of the request into a stream of bytes.
fn try_into_stream( fn try_into_stream(
self, self,
) -> Result<impl Stream<Item = Result<Bytes, E>> + Send + 'static, E>; ) -> Result<impl Stream<Item = Result<Bytes, Error>> + Send + 'static, Error>;
/// Attempts to convert the body of the request into a websocket handle. /// Attempts to convert the body of the request into a websocket handle.
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
@@ -355,11 +359,11 @@ where
) -> impl Future< ) -> impl Future<
Output = Result< Output = Result<
( (
impl Stream<Item = Result<Bytes, E>> + Send + 'static, impl Stream<Item = Result<Bytes, InputStreamError>> + Send + 'static,
impl Sink<Result<Bytes, E>> + Send + 'static, impl Sink<Result<Bytes, OutputStreamError>> + Send + 'static,
Self::WebsocketResponse, Self::WebsocketResponse,
), ),
E, Error,
>, >,
> + Send; > + Send;
} }
@@ -368,7 +372,13 @@ where
/// when compiling for the browser. /// when compiling for the browser.
pub struct BrowserMockReq; pub struct BrowserMockReq;
impl<E: Send + 'static> Req<E> for BrowserMockReq { impl<Error, InputStreamError, OutputStreamError>
Req<Error, InputStreamError, OutputStreamError> for BrowserMockReq
where
Error: Send + 'static,
InputStreamError: Send + 'static,
OutputStreamError: Send + 'static,
{
type WebsocketResponse = crate::response::BrowserMockRes; type WebsocketResponse = crate::response::BrowserMockRes;
fn as_query(&self) -> Option<&str> { fn as_query(&self) -> Option<&str> {
@@ -386,17 +396,17 @@ impl<E: Send + 'static> Req<E> for BrowserMockReq {
fn referer(&self) -> Option<Cow<'_, str>> { fn referer(&self) -> Option<Cow<'_, str>> {
unreachable!() unreachable!()
} }
async fn try_into_bytes(self) -> Result<Bytes, E> { async fn try_into_bytes(self) -> Result<Bytes, Error> {
unreachable!() unreachable!()
} }
async fn try_into_string(self) -> Result<String, E> { async fn try_into_string(self) -> Result<String, Error> {
unreachable!() unreachable!()
} }
fn try_into_stream( fn try_into_stream(
self, self,
) -> Result<impl Stream<Item = Result<Bytes, E>> + Send, E> { ) -> Result<impl Stream<Item = Result<Bytes, Error>> + Send, Error> {
Ok(futures::stream::once(async { unreachable!() })) Ok(futures::stream::once(async { unreachable!() }))
} }
@@ -404,17 +414,19 @@ impl<E: Send + 'static> Req<E> for BrowserMockReq {
self, self,
) -> Result< ) -> Result<
( (
impl Stream<Item = Result<Bytes, E>> + Send + 'static, impl Stream<Item = Result<Bytes, InputStreamError>> + Send + 'static,
impl Sink<Result<Bytes, E>> + Send + 'static, impl Sink<Result<Bytes, OutputStreamError>> + Send + 'static,
Self::WebsocketResponse, Self::WebsocketResponse,
), ),
E, Error,
> { > {
#[allow(unreachable_code)] #[allow(unreachable_code)]
Err::< Err::<
( (
futures::stream::Once<std::future::Ready<Result<Bytes, E>>>, futures::stream::Once<
futures::sink::Drain<Result<Bytes, E>>, std::future::Ready<Result<Bytes, InputStreamError>>,
>,
futures::sink::Drain<Result<Bytes, OutputStreamError>>,
Self::WebsocketResponse, Self::WebsocketResponse,
), ),
_, _,

View File

@@ -10,15 +10,21 @@ use std::future::Future;
/// This trait is implemented for any server backend for server functions including /// This trait is implemented for any server backend for server functions including
/// `axum` and `actix-web`. It should almost never be necessary to implement it /// `axum` and `actix-web`. It should almost never be necessary to implement it
/// yourself, unless youre trying to use an alternative HTTP server. /// yourself, unless youre trying to use an alternative HTTP server.
pub trait Server<E> { pub trait Server<Error, InputStreamError = Error, OutputStreamError = Error> {
/// The type of the HTTP request when received by the server function on the server side. /// The type of the HTTP request when received by the server function on the server side.
type Request: Req<E, WebsocketResponse = Self::Response> + Send + 'static; type Request: Req<
Error,
InputStreamError,
OutputStreamError,
WebsocketResponse = Self::Response,
> + Send
+ 'static;
/// The type of the HTTP response returned by the server function on the server side. /// The type of the HTTP response returned by the server function on the server side.
type Response: Res + TryRes<E> + Send + 'static; type Response: Res + TryRes<Error> + Send + 'static;
/// Spawn an async task on the server. /// Spawn an async task on the server.
fn spawn( fn spawn(
future: impl Future<Output = ()> + Send + 'static, future: impl Future<Output = ()> + Send + 'static,
) -> Result<(), E>; ) -> Result<(), Error>;
} }

View File

@@ -9,8 +9,13 @@ error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied
note: required by a bound in `server_fn::ServerFn::Client` note: required by a bound in `server_fn::ServerFn::Client`
--> src/lib.rs --> src/lib.rs
| |
| type Client: Client<Self::Error>; | type Client: Client<
| ^^^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::Client` | __________________^
| | Self::Error,
| | Self::InputStreamError,
| | Self::OutputStreamError,
| | >;
| |_____^ required by this bound in `ServerFn::Client`
= note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info) = note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info)
error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied
@@ -30,8 +35,8 @@ note: required by a bound in `server_fn::ServerFn::Protocol`
| | Self, | | Self,
| | Self::Output, | | Self::Output,
| | Self::Client, | | Self::Client,
| | Self::Server, ... |
| | Self::Error, | | Self::OutputStreamError,
| | >; | | >;
| |_____^ required by this bound in `ServerFn::Protocol` | |_____^ required by this bound in `ServerFn::Protocol`
= note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info) = note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info)
@@ -49,3 +54,31 @@ note: required by a bound in `server_fn::ServerFn::Error`
| type Error: FromServerFnError + Send + Sync; | type Error: FromServerFnError + Send + Sync;
| ^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::Error` | ^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::Error`
= note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info) = note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info)
error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied
--> tests/invalid/aliased_return_full.rs:11:1
|
11 | #[server]
| ^^^^^^^^^ the trait `FromServerFnError` is not implemented for `InvalidError`
|
= help: the trait `FromServerFnError` is implemented for `ServerFnError<CustErr>`
note: required by a bound in `server_fn::ServerFn::InputStreamError`
--> src/lib.rs
|
| type InputStreamError: FromServerFnError + Send + Sync;
| ^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::InputStreamError`
= note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info)
error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied
--> tests/invalid/aliased_return_full.rs:11:1
|
11 | #[server]
| ^^^^^^^^^ the trait `FromServerFnError` is not implemented for `InvalidError`
|
= help: the trait `FromServerFnError` is implemented for `ServerFnError<CustErr>`
note: required by a bound in `server_fn::ServerFn::OutputStreamError`
--> src/lib.rs
|
| type OutputStreamError: FromServerFnError + Send + Sync;
| ^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::OutputStreamError`
= note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info)

View File

@@ -9,8 +9,13 @@ error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied
note: required by a bound in `server_fn::ServerFn::Client` note: required by a bound in `server_fn::ServerFn::Client`
--> src/lib.rs --> src/lib.rs
| |
| type Client: Client<Self::Error>; | type Client: Client<
| ^^^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::Client` | __________________^
| | Self::Error,
| | Self::InputStreamError,
| | Self::OutputStreamError,
| | >;
| |_____^ required by this bound in `ServerFn::Client`
= note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info) = note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info)
error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied
@@ -30,8 +35,8 @@ note: required by a bound in `server_fn::ServerFn::Protocol`
| | Self, | | Self,
| | Self::Output, | | Self::Output,
| | Self::Client, | | Self::Client,
| | Self::Server, ... |
| | Self::Error, | | Self::OutputStreamError,
| | >; | | >;
| |_____^ required by this bound in `ServerFn::Protocol` | |_____^ required by this bound in `ServerFn::Protocol`
= note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info) = note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info)
@@ -48,3 +53,29 @@ note: required by a bound in `server_fn::ServerFn::Error`
| |
| type Error: FromServerFnError + Send + Sync; | type Error: FromServerFnError + Send + Sync;
| ^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::Error` | ^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::Error`
error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied
--> tests/invalid/aliased_return_none.rs:10:50
|
10 | pub async fn no_alias_result() -> Result<String, InvalidError> {
| ^^^^^^^^^^^^ the trait `FromServerFnError` is not implemented for `InvalidError`
|
= help: the trait `FromServerFnError` is implemented for `ServerFnError<CustErr>`
note: required by a bound in `server_fn::ServerFn::InputStreamError`
--> src/lib.rs
|
| type InputStreamError: FromServerFnError + Send + Sync;
| ^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::InputStreamError`
error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied
--> tests/invalid/aliased_return_none.rs:10:50
|
10 | pub async fn no_alias_result() -> Result<String, InvalidError> {
| ^^^^^^^^^^^^ the trait `FromServerFnError` is not implemented for `InvalidError`
|
= help: the trait `FromServerFnError` is implemented for `ServerFnError<CustErr>`
note: required by a bound in `server_fn::ServerFn::OutputStreamError`
--> src/lib.rs
|
| type OutputStreamError: FromServerFnError + Send + Sync;
| ^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::OutputStreamError`

View File

@@ -9,8 +9,13 @@ error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied
note: required by a bound in `server_fn::ServerFn::Client` note: required by a bound in `server_fn::ServerFn::Client`
--> src/lib.rs --> src/lib.rs
| |
| type Client: Client<Self::Error>; | type Client: Client<
| ^^^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::Client` | __________________^
| | Self::Error,
| | Self::InputStreamError,
| | Self::OutputStreamError,
| | >;
| |_____^ required by this bound in `ServerFn::Client`
= note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info) = note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info)
error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied
@@ -30,8 +35,8 @@ note: required by a bound in `server_fn::ServerFn::Protocol`
| | Self, | | Self,
| | Self::Output, | | Self::Output,
| | Self::Client, | | Self::Client,
| | Self::Server, ... |
| | Self::Error, | | Self::OutputStreamError,
| | >; | | >;
| |_____^ required by this bound in `ServerFn::Protocol` | |_____^ required by this bound in `ServerFn::Protocol`
= note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info) = note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info)
@@ -49,3 +54,31 @@ note: required by a bound in `server_fn::ServerFn::Error`
| type Error: FromServerFnError + Send + Sync; | type Error: FromServerFnError + Send + Sync;
| ^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::Error` | ^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::Error`
= note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info) = note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info)
error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied
--> tests/invalid/aliased_return_part.rs:11:1
|
11 | #[server]
| ^^^^^^^^^ the trait `FromServerFnError` is not implemented for `InvalidError`
|
= help: the trait `FromServerFnError` is implemented for `ServerFnError<CustErr>`
note: required by a bound in `server_fn::ServerFn::InputStreamError`
--> src/lib.rs
|
| type InputStreamError: FromServerFnError + Send + Sync;
| ^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::InputStreamError`
= note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info)
error[E0277]: the trait bound `InvalidError: FromServerFnError` is not satisfied
--> tests/invalid/aliased_return_part.rs:11:1
|
11 | #[server]
| ^^^^^^^^^ the trait `FromServerFnError` is not implemented for `InvalidError`
|
= help: the trait `FromServerFnError` is implemented for `ServerFnError<CustErr>`
note: required by a bound in `server_fn::ServerFn::OutputStreamError`
--> src/lib.rs
|
| type OutputStreamError: FromServerFnError + Send + Sync;
| ^^^^^^^^^^^^^^^^^ required by this bound in `ServerFn::OutputStreamError`
= note: this error originates in the attribute macro `server` (in Nightly builds, run with -Z macro-backtrace for more info)

View File

@@ -570,6 +570,24 @@ impl ServerFnCall {
}, },
ToTokens::to_token_stream, ToTokens::to_token_stream,
); );
let error_ws_in_ty = if self.websocket_protocol() {
self.body
.error_ws_in_ty
.as_ref()
.map(ToTokens::to_token_stream)
.unwrap_or(error_ty.clone())
} else {
error_ty.clone()
};
let error_ws_out_ty = if self.websocket_protocol() {
self.body
.error_ws_out_ty
.as_ref()
.map(ToTokens::to_token_stream)
.unwrap_or(error_ty.clone())
} else {
error_ty.clone()
};
let field_names = self.field_names(); let field_names = self.field_names();
// run_body in the trait implementation // run_body in the trait implementation
@@ -645,6 +663,8 @@ impl ServerFnCall {
type Protocol = #protocol; type Protocol = #protocol;
type Output = #output_ty; type Output = #output_ty;
type Error = #error_ty; type Error = #error_ty;
type InputStreamError = #error_ws_in_ty;
type OutputStreamError = #error_ws_out_ty;
fn middlewares() -> Vec<std::sync::Arc<dyn #server_fn_path::middleware::Layer<<Self::Server as #server_fn_path::server::Server<Self::Error>>::Request, <Self::Server as #server_fn_path::server::Server<Self::Error>>::Response>>> { fn middlewares() -> Vec<std::sync::Arc<dyn #server_fn_path::middleware::Layer<<Self::Server as #server_fn_path::server::Server<Self::Error>>::Request, <Self::Server as #server_fn_path::server::Server<Self::Error>>::Response>>> {
#middlewares #middlewares
@@ -925,6 +945,60 @@ fn err_type(return_ty: &Type) -> Option<&Type> {
None None
} }
fn err_ws_in_type(
inputs: &Punctuated<ServerFnArg, syn::token::Comma>,
) -> Option<Type> {
inputs.into_iter().find_map(|pat| {
if let syn::Type::Path(ref pat) = *pat.arg.ty {
if pat.path.segments[0].ident != "BoxedStream" {
return None;
}
if let PathArguments::AngleBracketed(args) =
&pat.path.segments[0].arguments
{
// BoxedStream<T>
if args.args.len() == 1 {
return None;
}
// BoxedStream<T, E>
else if let GenericArgument::Type(ty) = &args.args[1] {
return Some(ty.clone());
}
};
};
None
})
}
fn err_ws_out_type(output_ty: &Option<Type>) -> Result<Option<Type>> {
if let Some(syn::Type::Path(ref pat)) = output_ty {
if pat.path.segments[0].ident == "BoxedStream" {
if let PathArguments::AngleBracketed(args) =
&pat.path.segments[0].arguments
{
// BoxedStream<T>
if args.args.len() == 1 {
return Ok(None);
}
// BoxedStream<T, E>
else if let GenericArgument::Type(ty) = &args.args[1] {
return Ok(Some(ty.clone()));
}
return Err(syn::Error::new(
output_ty.span(),
"websocket server functions should return \
BoxedStream<Result<T, E>> where E: FromServerFnError",
));
};
}
};
Ok(None)
}
/// The arguments to the `server` macro. /// The arguments to the `server` macro.
#[derive(Debug)] #[derive(Debug)]
#[non_exhaustive] #[non_exhaustive]
@@ -1375,6 +1449,10 @@ pub struct ServerFnBody {
pub output_ty: Option<syn::Type>, pub output_ty: Option<syn::Type>,
/// The error output type of the server function. /// The error output type of the server function.
pub error_ty: Option<syn::Type>, pub error_ty: Option<syn::Type>,
/// The error type of WebSocket client-sent error
pub error_ws_in_ty: Option<syn::Type>,
/// The error type of WebSocket server-sent error
pub error_ws_out_ty: Option<syn::Type>,
/// The body of the server function. /// The body of the server function.
pub block: TokenStream2, pub block: TokenStream2,
/// The documentation of the server function. /// The documentation of the server function.
@@ -1404,6 +1482,8 @@ impl Parse for ServerFnBody {
let return_ty = input.parse()?; let return_ty = input.parse()?;
let output_ty = output_type(&return_ty).cloned(); let output_ty = output_type(&return_ty).cloned();
let error_ty = err_type(&return_ty).cloned(); let error_ty = err_type(&return_ty).cloned();
let error_ws_in_ty = err_ws_in_type(&inputs);
let error_ws_out_ty = err_ws_out_type(&output_ty)?;
let block = input.parse()?; let block = input.parse()?;
@@ -1461,6 +1541,8 @@ impl Parse for ServerFnBody {
return_ty, return_ty,
output_ty, output_ty,
error_ty, error_ty,
error_ws_in_ty,
error_ws_out_ty,
block, block,
attrs, attrs,
docs, docs,