From 2662dc7f8fba71a5682f8906a6f6a71374757c23 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 18 Dec 2024 05:35:58 +0530 Subject: [PATCH] Fix Sse client api (#6685) * Use reqwest eventsource for get_events api * await for Event::Open before returning stream * fmt * Merge branch 'unstable' into sse-client-fix * Ignore lint --- Cargo.lock | 28 ++++++++++++ beacon_node/beacon_chain/tests/store_tests.rs | 1 + common/eth2/Cargo.toml | 1 + common/eth2/src/lib.rs | 43 ++++++++++++++----- common/eth2/src/types.rs | 21 +-------- 5 files changed, 65 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2978a3a19f6..c62e9fbc878 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2576,6 +2576,7 @@ dependencies = [ "proto_array", "psutil", "reqwest", + "reqwest-eventsource", "sensitive_url", "serde", "serde_json", @@ -2977,6 +2978,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom", + "pin-project-lite", +] + [[package]] name = "execution_engine_integration" version = "0.1.0" @@ -7179,6 +7191,22 @@ dependencies = [ "winreg", ] +[[package]] +name = "reqwest-eventsource" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f529a5ff327743addc322af460761dff5b50e0c826b9e6ac44c3195c50bb2026" +dependencies = [ + "eventsource-stream", + "futures-core", + "futures-timer", + "mime", + "nom", + "pin-project-lite", + "reqwest", + "thiserror 1.0.69", +] + [[package]] name = "resolv-conf" version = "0.7.0" diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 73805a8525d..e1258ccdea7 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2796,6 +2796,7 @@ async fn finalizes_after_resuming_from_db() { ); } +#[allow(clippy::large_stack_frames)] #[tokio::test] async fn revert_minority_fork_on_resume() { let validator_count = 16; diff --git a/common/eth2/Cargo.toml b/common/eth2/Cargo.toml index f735b4c6888..912051da36d 100644 --- a/common/eth2/Cargo.toml +++ b/common/eth2/Cargo.toml @@ -27,6 +27,7 @@ slashing_protection = { workspace = true } mediatype = "0.19.13" pretty_reqwest_error = { workspace = true } derivative = { workspace = true } +reqwest-eventsource = "0.5.0" [dev-dependencies] tokio = { workspace = true } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 522c6414eae..12b1538984e 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -27,6 +27,7 @@ use reqwest::{ Body, IntoUrl, RequestBuilder, Response, }; pub use reqwest::{StatusCode, Url}; +use reqwest_eventsource::{Event, EventSource}; pub use sensitive_url::{SensitiveError, SensitiveUrl}; use serde::{de::DeserializeOwned, Serialize}; use ssz::Encode; @@ -52,6 +53,8 @@ pub const SSZ_CONTENT_TYPE_HEADER: &str = "application/octet-stream"; pub enum Error { /// The `reqwest` client raised an error. HttpClient(PrettyReqwestError), + /// The `reqwest_eventsource` client raised an error. + SseClient(reqwest_eventsource::Error), /// The server returned an error message where the body was able to be parsed. ServerMessage(ErrorMessage), /// The server returned an error message with an array of errors. @@ -93,6 +96,13 @@ impl Error { pub fn status(&self) -> Option { match self { Error::HttpClient(error) => error.inner().status(), + Error::SseClient(error) => { + if let reqwest_eventsource::Error::InvalidStatusCode(status, _) = error { + Some(*status) + } else { + None + } + } Error::ServerMessage(msg) => StatusCode::try_from(msg.code).ok(), Error::ServerIndexedMessage(msg) => StatusCode::try_from(msg.code).ok(), Error::StatusCode(status) => Some(*status), @@ -2592,16 +2602,29 @@ impl BeaconNodeHttpClient { .join(","); path.query_pairs_mut().append_pair("topics", &topic_string); - Ok(self - .client - .get(path) - .send() - .await? - .bytes_stream() - .map(|next| match next { - Ok(bytes) => EventKind::from_sse_bytes(bytes.as_ref()), - Err(e) => Err(Error::HttpClient(e.into())), - })) + let mut es = EventSource::get(path); + // If we don't await `Event::Open` here, then the consumer + // will not get any Message events until they start awaiting the stream. + // This is a way to register the stream with the sse server before + // message events start getting emitted. + while let Some(event) = es.next().await { + match event { + Ok(Event::Open) => break, + Err(err) => return Err(Error::SseClient(err)), + // This should never happen as we are guaranteed to get the + // Open event before any message starts coming through. + Ok(Event::Message(_)) => continue, + } + } + Ok(Box::pin(es.filter_map(|event| async move { + match event { + Ok(Event::Open) => None, + Ok(Event::Message(message)) => { + Some(EventKind::from_sse_bytes(&message.event, &message.data)) + } + Err(err) => Some(Err(Error::SseClient(err))), + } + }))) } /// `POST validator/duties/sync/{epoch}` diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index c187399ebd7..a303953a863 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -13,7 +13,7 @@ use serde_json::Value; use ssz::{Decode, DecodeError}; use ssz_derive::{Decode, Encode}; use std::fmt::{self, Display}; -use std::str::{from_utf8, FromStr}; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use types::beacon_block_body::KzgCommitments; @@ -1153,24 +1153,7 @@ impl EventKind { } } - pub fn from_sse_bytes(message: &[u8]) -> Result { - let s = from_utf8(message) - .map_err(|e| ServerError::InvalidServerSentEvent(format!("{:?}", e)))?; - - let mut split = s.split('\n'); - let event = split - .next() - .ok_or_else(|| { - ServerError::InvalidServerSentEvent("Could not parse event tag".to_string()) - })? - .trim_start_matches("event:"); - let data = split - .next() - .ok_or_else(|| { - ServerError::InvalidServerSentEvent("Could not parse data tag".to_string()) - })? - .trim_start_matches("data:"); - + pub fn from_sse_bytes(event: &str, data: &str) -> Result { match event { "attestation" => Ok(EventKind::Attestation(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Attestation: {:?}", e)),