From f2bc2085c5f04c0e3af2679ac5baa136ab71bedb Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 28 Nov 2024 20:44:27 +0200 Subject: [PATCH] fix: ids are strings --- Cargo.lock | 136 +++++++++++++++++++++++++++++++++++--- russe/Cargo.toml | 9 ++- russe/examples/manager.rs | 37 +++++++++++ russe/src/decoder.rs | 8 +-- russe/src/message.rs | 6 +- russe/src/reqwest_0_12.rs | 78 +++++++++++++++++++++- 6 files changed, 252 insertions(+), 22 deletions(-) create mode 100644 russe/examples/manager.rs diff --git a/Cargo.lock b/Cargo.lock index 1da9eac2..4809ae30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -396,13 +396,19 @@ dependencies = [ [[package]] name = "addr2line" -version = "0.24.2" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" dependencies = [ "gimli", ] +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "adler2" version = "2.0.0" @@ -638,17 +644,17 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.74" +version = "0.3.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" dependencies = [ "addr2line", + "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.7.4", "object", "rustc-demangle", - "windows-targets", ] [[package]] @@ -818,6 +824,33 @@ dependencies = [ "cc", ] +[[package]] +name = "color-eyre" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55146f5e46f237f7423d74111267d4597b59b0dad0ffaf7303bce9945d843ad5" +dependencies = [ + "backtrace", + "color-spantrace", + "eyre", + "indenter", + "once_cell", + "owo-colors", + "tracing-error", +] + +[[package]] +name = "color-spantrace" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd6be1b2a7e382e2b98b43b2adcca6bb0e465af0bdd38123873ae61eb17a72c2" +dependencies = [ + "once_cell", + "owo-colors", + "tracing-core", + "tracing-error", +] + [[package]] name = "colorchoice" version = "1.0.3" @@ -1104,6 +1137,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "eyre" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd915d99f24784cdc19fd37ef22b97e3ff0ae756c7e492e9fbfe897d61e2aec" +dependencies = [ + "indenter", + "once_cell", +] + [[package]] name = "fastrand" version = "2.2.0" @@ -1123,7 +1166,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c936bfdafb507ebbf50b8074c54fa31c5be9a1e7e5f467dd659697041407d07c" dependencies = [ "crc32fast", - "miniz_oxide", + "miniz_oxide 0.8.0", ] [[package]] @@ -1286,9 +1329,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.31.1" +version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "glob" @@ -1674,6 +1717,12 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae21c3177a27788957044151cc2800043d127acaa460a47ebb9b84dfa2c6aa0" +[[package]] +name = "indenter" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" + [[package]] name = "indexmap" version = "2.6.0" @@ -1894,6 +1943,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "miniz_oxide" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" +dependencies = [ + "adler", +] + [[package]] name = "miniz_oxide" version = "0.8.0" @@ -1960,9 +2018,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.5" +version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" dependencies = [ "memchr", ] @@ -2017,6 +2075,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "owo-colors" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" + [[package]] name = "parking" version = "2.2.1" @@ -2312,6 +2376,8 @@ dependencies = [ "aho-corasick", "bytes", "bytestring", + "color-eyre", + "eyre", "futures-test", "futures-util", "indoc", @@ -2319,6 +2385,7 @@ dependencies = [ "mime", "reqwest", "tokio", + "tokio-stream", "tokio-test", "tokio-util", "tracing", @@ -2574,6 +2641,15 @@ dependencies = [ "keccak", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -2757,6 +2833,16 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "time" version = "0.3.36" @@ -2969,6 +3055,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-error" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e" +dependencies = [ + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "sharded-slab", + "thread_local", + "tracing-core", ] [[package]] @@ -3057,6 +3165,12 @@ version = "0.15.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4e8257fbc510f0a46eb602c10215901938b5c2a7d5e70fc11483b1d3c9b5b18c" +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/russe/Cargo.toml b/russe/Cargo.toml index 583b639f..971aedf1 100644 --- a/russe/Cargo.toml +++ b/russe/Cargo.toml @@ -26,16 +26,23 @@ futures-util = "0.3.18" memchr = "2" mime = { version = "0.3.17", optional = true } reqwest-0_12 = { package = "reqwest", version = "0.12", optional = true, features = ["stream"] } -tokio = "1" +tokio = { version = "1", features = ["sync"] } tokio-util = { version = "0.7", features = ["codec"] } tracing = "0.1.30" [dev-dependencies] futures-test = "0.3" +eyre = "0.6" +color-eyre = "0.6" indoc = "2" tokio = { version = "1.28.2", features = ["macros"] } tokio-test = "0.4" +tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["codec", "io", "rt"] } +[[examples]] +name = "manager" +required-features = ["reqwest-0_12"] + [lints] workspace = true diff --git a/russe/examples/manager.rs b/russe/examples/manager.rs new file mode 100644 index 00000000..fc6c42be --- /dev/null +++ b/russe/examples/manager.rs @@ -0,0 +1,37 @@ +//! Demonstrates usage of the SSE connection manager. + +extern crate reqwest_0_12 as reqwest; + +use futures_util::StreamExt as _; +use reqwest::{Method, Request}; +use russe::reqwest_0_12::Manager; +use tokio_stream::wrappers::UnboundedReceiverStream; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> eyre::Result<()> { + color_eyre::install()?; + + let client = reqwest::Client::default(); + + let mut req = Request::new(Method::GET, "https://sse.dev/test".parse().unwrap()); + let headers = req.headers_mut(); + headers.insert("accept", russe::MEDIA_TYPE_STR.parse().unwrap()); + + let mut manager = Manager::new(&client, req); + + let (_task_handle, events) = manager.send().await.unwrap(); + + let mut event_stream = UnboundedReceiverStream::new(events); + + while let Some(Ok(ev)) = event_stream.next().await { + println!("{ev:?}"); + + if let russe::Event::Message(msg) = ev { + if let Some(id) = msg.id { + manager.commit_id(id); + } + } + } + + Ok(()) +} diff --git a/russe/src/decoder.rs b/russe/src/decoder.rs index 550dc256..68eeec29 100644 --- a/russe/src/decoder.rs +++ b/russe/src/decoder.rs @@ -100,7 +100,7 @@ impl tokio_util::codec::Decoder for Decoder { // id 2 | 3 => { - message.id = Some(input.parse().expect("ID should be an integer")); + message.id = Some(input.to_owned()); message_event = true; } @@ -170,7 +170,7 @@ mod tests { retry: 999 data: msg5 specifies new retry - id: 43 + id: 43a event: msg data: msg6 is named @@ -215,7 +215,7 @@ mod tests { assert_eq!( Event::Message(Message { data: "msg4 with an ID".into(), - id: Some(42), + id: Some("42".to_owned()), ..Default::default() }), ev, @@ -225,7 +225,7 @@ mod tests { assert_eq!( Event::Message(Message { data: "msg5 specifies new retry".into(), - id: Some(43), + id: Some("43a".to_owned()), retry: Some(Duration::from_millis(999)), event: None, }), diff --git a/russe/src/message.rs b/russe/src/message.rs index 3760fb9d..4e15716b 100644 --- a/russe/src/message.rs +++ b/russe/src/message.rs @@ -17,9 +17,9 @@ pub struct Message { /// Event identifier. /// /// Used in Last-Event-ID header. - // TODO: not always a number - // see https://github.com/whatwg/html/issues/7363 - pub id: Option, + /// + /// See . + pub id: Option, } #[cfg(test)] diff --git a/russe/src/reqwest_0_12.rs b/russe/src/reqwest_0_12.rs index cd08f617..89e3e3b1 100644 --- a/russe/src/reqwest_0_12.rs +++ b/russe/src/reqwest_0_12.rs @@ -2,14 +2,21 @@ use std::io; -use futures_util::{stream::BoxStream, TryStreamExt as _}; +use futures_util::{stream::BoxStream, StreamExt as _, TryStreamExt as _}; +use reqwest_0_12::{Client, Request, Response}; +use tokio::{ + sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + task::JoinHandle, +}; use tokio_util::{codec::FramedRead, io::StreamReader}; use crate::{Decoder, Error, Event}; mod sealed { + use super::*; + pub trait Sealed {} - impl Sealed for reqwest_0_12::Response {} + impl Sealed for Response {} } /// SSE extension methods for `reqwest` v0.12. @@ -18,7 +25,7 @@ pub trait ReqwestExt: sealed::Sealed { fn sse_stream(self) -> BoxStream<'static, Result>; } -impl ReqwestExt for reqwest_0_12::Response { +impl ReqwestExt for Response { fn sse_stream(self) -> BoxStream<'static, Result> { let body_stream = self.bytes_stream().map_err(io::Error::other); let body_reader = StreamReader::new(body_stream); @@ -28,3 +35,68 @@ impl ReqwestExt for reqwest_0_12::Response { Box::pin(frame_reader) } } + +/// An SSE request manager which tracks latest IDs and automatically reconnects. +#[derive(Debug)] +pub struct Manager { + client: Client, + req: Request, + last_event_id: Option, + tx: UnboundedSender>, + rx: Option>>, +} + +impl Manager { + /// Constructs new SSE request manager. + /// + /// No attempts are made to validate or modify the given request. + /// + /// # Panics + /// + /// Panics if request is given a stream body. + pub fn new(client: &Client, req: Request) -> Self { + let (tx, rx) = unbounded_channel(); + + let req = req.try_clone().expect("Request should be clone-able"); + + Self { + client: client.clone(), + req, + last_event_id: None, + tx, + rx: Some(rx), + } + } + + /// Sends request, starts connection management, and returns stream of events. + /// + /// # Panics + /// + /// Panics if called more than once. + pub async fn send( + &mut self, + ) -> Result<(JoinHandle<()>, UnboundedReceiver>), Error> { + let client = self.client.clone(); + let req = self.req.try_clone().unwrap(); + let tx = self.tx.clone(); + + let task_handle = tokio::spawn(async move { + let mut stream = client.execute(req).await.unwrap().sse_stream(); + + while let Some(ev) = stream.next().await { + let _ = tx.send(ev); + } + }); + + Ok((task_handle, self.rx.take().unwrap())) + } + + /// Commits an event ID for this manager. + /// + /// The given ID will be used as the `Last-Event-Id` header in case of reconnects. + pub fn commit_id(&mut self, id: String) { + self.last_event_id = Some(id); + } +} + +// - optionally read id from stream and set automatically