diff --git a/Cargo.toml b/Cargo.toml index c21330552..0d770bb17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ docs = ["unstable"] unstable = [] [dependencies] +async-sse = "2.1.0" http-types = "1.0.1" http-service = "0.5.0" http-service-h1 = { version = "0.1.0", optional = true } diff --git a/examples/sse.rs b/examples/sse.rs new file mode 100644 index 000000000..9e2dcdb23 --- /dev/null +++ b/examples/sse.rs @@ -0,0 +1,13 @@ +use tide::sse; + +#[async_std::main] +async fn main() -> Result<(), std::io::Error> { + let mut app = tide::new(); + app.at("/sse").get(sse::endpoint(|_req, sender| async move { + sender.send("fruit", "banana", None).await; + sender.send("fruit", "apple", None).await; + Ok(()) + })); + app.listen("localhost:8080").await?; + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index aa2f10197..2010fdcc2 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -199,6 +199,7 @@ pub mod security; pub use endpoint::Endpoint; pub use request::Request; +pub mod sse; #[doc(inline)] pub use http_types::{Error, Result, Status}; diff --git a/src/response/mod.rs b/src/response/mod.rs index 9ddfa9ee1..c17872672 100644 --- a/src/response/mod.rs +++ b/src/response/mod.rs @@ -23,7 +23,7 @@ pub(crate) enum CookieEvent { /// An HTTP response #[derive(Debug)] pub struct Response { - res: http_service::Response, + pub(crate) res: http_service::Response, // tracking here pub(crate) cookie_events: Vec, } diff --git a/src/sse/endpoint.rs b/src/sse/endpoint.rs new file mode 100644 index 000000000..62fa5e82d --- /dev/null +++ b/src/sse/endpoint.rs @@ -0,0 +1,70 @@ +use crate::http::{mime, Body, StatusCode}; +use crate::log; +use crate::sse::Sender; +use crate::utils::BoxFuture; +use crate::{Endpoint, Request, Response, Result}; + +use async_std::future::Future; +use async_std::io::BufReader; +use async_std::task; + +use std::marker::PhantomData; +use std::sync::Arc; + +/// Create an endpoint that can handle SSE connections. +pub fn endpoint(handler: F) -> SseEndpoint +where + State: Send + Sync + 'static, + F: Fn(Request, Sender) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + Sync + 'static, +{ + SseEndpoint { + handler: Arc::new(handler), + __state: PhantomData, + __fut: PhantomData, + } +} + +/// An endpoint that can handle SSE connections. +#[derive(Debug)] +pub struct SseEndpoint +where + State: Send + Sync + 'static, + F: Fn(Request, Sender) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + Sync + 'static, +{ + handler: Arc, + __state: PhantomData, + __fut: PhantomData, +} + +impl Endpoint for SseEndpoint +where + State: Send + Sync + 'static, + F: Fn(Request, Sender) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + Sync + 'static, +{ + fn call<'a>(&'a self, req: Request) -> BoxFuture<'a, Result> { + let handler = self.handler.clone(); + Box::pin(async move { + let (sender, encoder) = async_sse::encode(); + task::spawn(async move { + let sender = Sender::new(sender); + if let Err(err) = handler(req, sender).await { + log::error!("SSE handler error: {:?}", err); + } + }); + + // Perform the handshake as described here: + // https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model + let mut res = Response::new(StatusCode::Ok); + res.res.insert_header("Cache-Control", "no-cache").unwrap(); + res.res.set_content_type(mime::SSE); + + let body = Body::from_reader(BufReader::new(encoder), None); + res.set_body(body); + + Ok(res) + }) + } +} diff --git a/src/sse/mod.rs b/src/sse/mod.rs new file mode 100644 index 000000000..365b4ca45 --- /dev/null +++ b/src/sse/mod.rs @@ -0,0 +1,35 @@ +//! Server-Sent Events (SSE) types. +//! +//! # Errors +//! +//! Errors originating in the SSE handler will be logged. Errors originating +//! during the encoding of the SSE stream will be handled by the backend engine +//! the way any other IO error is handled. +//! +//! In the future we may introduce a better mechanism to handle errors that +//! originate outside of regular endpoints. +//! +//! # Examples +//! +//! ```no_run +//! # fn main() -> Result<(), std::io::Error> { async_std::task::block_on(async { +//! # +//! use tide::sse; +//! +//! let mut app = tide::new(); +//! app.at("/sse").get(sse::endpoint(|_req, sender| async move { +//! sender.send("fruit", "banana", None).await; +//! sender.send("fruit", "apple", None).await; +//! Ok(()) +//! })); +//! app.listen("localhost:8080").await?; +//! # Ok(()) }) } +//! ``` + +mod endpoint; +mod sender; +mod upgrade; + +pub use endpoint::{endpoint, SseEndpoint}; +pub use sender::Sender; +pub use upgrade::upgrade; diff --git a/src/sse/sender.rs b/src/sse/sender.rs new file mode 100644 index 000000000..05659044c --- /dev/null +++ b/src/sse/sender.rs @@ -0,0 +1,19 @@ +/// An SSE message sender. +#[derive(Debug)] +pub struct Sender { + sender: async_sse::Sender, +} + +impl Sender { + /// Create a new instance of `Sender`. + pub(crate) fn new(sender: async_sse::Sender) -> Self { + Self { sender } + } + + /// Send data from the SSE channel. + /// + /// Each message constists of a "name" and "data". + pub async fn send(&self, name: &str, data: impl AsRef, id: Option<&str>) { + self.sender.send(name, data.as_ref().as_bytes(), id).await; + } +} diff --git a/src/sse/upgrade.rs b/src/sse/upgrade.rs new file mode 100644 index 000000000..5e6aecff7 --- /dev/null +++ b/src/sse/upgrade.rs @@ -0,0 +1,36 @@ +use crate::http::{mime, Body, StatusCode}; +use crate::log; +use crate::{Request, Response, Result}; + +use super::Sender; + +use async_std::future::Future; +use async_std::io::BufReader; +use async_std::task; + +/// Upgrade an existing HTTP connection to an SSE connection. +pub fn upgrade(req: Request, handler: F) -> Response +where + State: Send + Sync + 'static, + F: Fn(Request, Sender) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + Sync + 'static, +{ + let (sender, encoder) = async_sse::encode(); + task::spawn(async move { + let sender = Sender::new(sender); + if let Err(err) = handler(req, sender).await { + log::error!("SSE handler error: {:?}", err); + } + }); + + // Perform the handshake as described here: + // https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model + let mut res = Response::new(StatusCode::Ok); + res.res.insert_header("Cache-Control", "no-cache").unwrap(); + res.res.set_content_type(mime::SSE); + + let body = Body::from_reader(BufReader::new(encoder), None); + res.set_body(body); + + res +}