Skip to content

Commit 0549ecb

Browse files
committed
Add initial SSE support
1 parent dd596a8 commit 0549ecb

File tree

7 files changed

+173
-1
lines changed

7 files changed

+173
-1
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ docs = ["unstable"]
2929
unstable = []
3030

3131
[dependencies]
32+
async-sse = "2.1.0"
3233
http-types = "1.0.1"
3334
http-service = "0.5.0"
3435
http-service-h1 = { version = "0.1.0", optional = true }

examples/sse.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use tide::sse;
2+
3+
#[async_std::main]
4+
async fn main() -> Result<(), std::io::Error> {
5+
let mut app = tide::new();
6+
app.at("/sse").get(sse::endpoint(|_req, sender| async move {
7+
sender.send("fruit", "banana").await;
8+
sender.send("fruit", "apple").await;
9+
Ok(())
10+
}));
11+
app.listen("localhost:8080").await?;
12+
Ok(())
13+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ pub mod prelude;
196196
pub use endpoint::Endpoint;
197197
pub use redirect::redirect;
198198
pub use request::Request;
199+
pub mod sse;
199200

200201
#[doc(inline)]
201202
pub use http_types::{Error, Result, Status};

src/response/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub(crate) enum CookieEvent {
2323
/// An HTTP response
2424
#[derive(Debug)]
2525
pub struct Response {
26-
res: http_service::Response,
26+
pub(crate) res: http_service::Response,
2727
// tracking here
2828
pub(crate) cookie_events: Vec<CookieEvent>,
2929
}

src/sse/endpoint.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use crate::http::{mime, Body, StatusCode};
2+
use crate::sse::Sender;
3+
use crate::utils::BoxFuture;
4+
use crate::{Endpoint, Request, Response, Result};
5+
6+
use async_std::future::Future;
7+
use async_std::io::BufReader;
8+
use async_std::task;
9+
10+
use std::marker::PhantomData;
11+
use std::sync::Arc;
12+
13+
/// Create an endpoint that can handle SSE connections.
14+
pub fn endpoint<F, Fut, State>(handler: F) -> SseEndpoint<F, Fut, State>
15+
where
16+
State: Send + Sync + 'static,
17+
F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
18+
Fut: Future<Output = Result<()>> + Send + Sync + 'static,
19+
{
20+
SseEndpoint {
21+
handler: Arc::new(handler),
22+
__state: PhantomData,
23+
__fut: PhantomData,
24+
}
25+
}
26+
27+
/// An endpoint that can handle SSE connections.
28+
#[derive(Debug)]
29+
pub struct SseEndpoint<F, Fut, State>
30+
where
31+
State: Send + Sync + 'static,
32+
F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
33+
Fut: Future<Output = Result<()>> + Send + Sync + 'static,
34+
{
35+
handler: Arc<F>,
36+
__state: PhantomData<State>,
37+
__fut: PhantomData<Fut>,
38+
}
39+
40+
impl<F, Fut, State> Endpoint<State> for SseEndpoint<F, Fut, State>
41+
where
42+
State: Send + Sync + 'static,
43+
F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
44+
Fut: Future<Output = Result<()>> + Send + Sync + 'static,
45+
{
46+
fn call<'a>(&'a self, req: Request<State>) -> BoxFuture<'a, Result<Response>> {
47+
let handler = self.handler.clone();
48+
Box::pin(async move {
49+
let (sender, encoder) = async_sse::encode();
50+
task::spawn(async move {
51+
let sender = Sender::new(sender);
52+
if let Err(err) = handler(req, sender).await {
53+
log::error!("SSE handler error: {:?}", err);
54+
}
55+
});
56+
57+
// Perform the handshake as described here:
58+
// https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model
59+
let mut res = Response::new(StatusCode::Ok);
60+
res.res.insert_header("Cache-Control", "no-cache").unwrap();
61+
res.res.set_content_type(mime::SSE);
62+
63+
let body = Body::from_reader(BufReader::new(encoder), None);
64+
res.set_body(body);
65+
66+
Ok(res)
67+
})
68+
}
69+
}

src/sse/mod.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
//! Server-Sent Events (SSE) types.
2+
//!
3+
//! # Errors
4+
//!
5+
//! Errors originating in the SSE handler will be logged. Errors originating
6+
//! during the encoding of the SSE stream will be handled by the backend engine
7+
//! the way any other IO error is handled.
8+
//!
9+
//! In the future we may introduce a better mechanism to handle errors that
10+
//! originate outside of regular endpoints.
11+
//!
12+
//! # Examples
13+
//!
14+
//! ```no_run
15+
//! # fn main() -> Result<(), std::io::Error> { async_std::task::block_on(async {
16+
//! #
17+
//! use tide::sse;
18+
//!
19+
//! let mut app = tide::new();
20+
//! app.at("/sse").get(sse::endpoint(|_req, sender| async move {
21+
//! sender.send("fruit", "banana").await;
22+
//! sender.send("fruit", "apple").await;
23+
//! Ok(())
24+
//! }));
25+
//! app.listen("localhost:8080").await?;
26+
//! # Ok(()) }) }
27+
//! ```
28+
29+
mod endpoint;
30+
mod upgrade;
31+
32+
pub use endpoint::{endpoint, SseEndpoint};
33+
pub use upgrade::upgrade;
34+
35+
/// An SSE message sender.
36+
#[derive(Debug)]
37+
pub struct Sender {
38+
sender: async_sse::Sender,
39+
}
40+
41+
impl Sender {
42+
/// Create a new instance of `Sender`.
43+
fn new(sender: async_sse::Sender) -> Self {
44+
Self { sender }
45+
}
46+
47+
/// Send data from the SSE channel.
48+
///
49+
/// Each message constists of a "name" and "data".
50+
pub async fn send(&self, name: &str, data: impl AsRef<[u8]>) {
51+
self.sender.send(name, data.as_ref(), None).await;
52+
}
53+
}

src/sse/upgrade.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use crate::http::{mime, Body, StatusCode};
2+
use crate::{Request, Response, Result};
3+
4+
use super::Sender;
5+
6+
use async_std::future::Future;
7+
use async_std::io::BufReader;
8+
use async_std::task;
9+
10+
/// Upgrade an existing HTTP connection to an SSE connection.
11+
pub fn upgrade<F, Fut, State>(req: Request<State>, handler: F) -> Response
12+
where
13+
State: Send + Sync + 'static,
14+
F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
15+
Fut: Future<Output = Result<()>> + Send + Sync + 'static,
16+
{
17+
let (sender, encoder) = async_sse::encode();
18+
task::spawn(async move {
19+
let sender = Sender::new(sender);
20+
if let Err(err) = handler(req, sender).await {
21+
log::error!("SSE handler error: {:?}", err);
22+
}
23+
});
24+
25+
// Perform the handshake as described here:
26+
// https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model
27+
let mut res = Response::new(StatusCode::Ok);
28+
res.res.insert_header("Cache-Control", "no-cache").unwrap();
29+
res.res.set_content_type(mime::SSE);
30+
31+
let body = Body::from_reader(BufReader::new(encoder), None);
32+
res.set_body(body);
33+
34+
res
35+
}

0 commit comments

Comments
 (0)