Skip to content

Commit

Permalink
Fetch blobs from EL via engine_getBlobsV1:
Browse files Browse the repository at this point in the history
- Add 'engine_exchangeCapabilities' request to be able to tell if EL serves blobs.
- Fetch capabilities on startup and on every epoch start.
- Rewrite `eth1_api::Endpoints` not to use mutexes.
  • Loading branch information
Tumas committed Jan 14, 2025
1 parent b69014a commit 047520e
Show file tree
Hide file tree
Showing 22 changed files with 599 additions and 255 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions eth1_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ workspace = true

[dependencies]
anyhow = { workspace = true }
arc-swap = { workspace = true }
bls = { workspace = true }
dedicated_executor = { workspace = true }
derive_more = { workspace = true }
either = { workspace = true }
enum-iterator = { workspace = true }
Expand All @@ -18,8 +20,10 @@ features = { workspace = true }
fork_choice_control = { workspace = true }
fs-err = { workspace = true }
futures = { workspace = true }
helper_functions = { workspace = true }
hex = { workspace = true }
hex-literal = { workspace = true }
itertools = { workspace = true }
jwt-simple = { workspace = true }
log = { workspace = true }
memoffset = { workspace = true }
Expand Down
224 changes: 112 additions & 112 deletions eth1_api/src/endpoints.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
use core::sync::atomic::{AtomicBool, Ordering};
use std::{collections::HashSet, sync::Arc};

use arc_swap::ArcSwap;
use derive_more::Debug;
use itertools::Itertools as _;
use types::redacting_url::RedactingUrl;

#[derive(Clone, Copy, Debug)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub enum EndpointStatus {
Online,
Offline,
}
const ORDERING: Ordering = Ordering::SeqCst;

impl EndpointStatus {
const fn is_offline(self) -> bool {
matches!(self, Self::Offline)
}
}

#[derive(Clone, Debug)]
#[cfg_attr(test, derive(PartialEq, Eq))]
#[derive(Debug)]
#[expect(clippy::partial_pub_fields)]
pub struct Endpoint {
index: usize,
status: EndpointStatus,
url: RedactingUrl,
is_online: AtomicBool,
pub is_fallback: bool,
pub url: RedactingUrl,
capabilities: ArcSwap<HashSet<String>>,
}

impl Endpoint {
pub const fn url(&self) -> &RedactingUrl {
&self.url
}

pub const fn is_fallback(&self) -> bool {
self.index > 0
pub fn is_online(&self) -> bool {
self.is_online.load(ORDERING)
}

pub fn set_capabilities(&self, capabilities: HashSet<String>) {
self.capabilities.store(Arc::new(capabilities));
}

pub fn set_online_status(&self, is_online: bool) {
self.is_online.store(is_online, ORDERING)
}
}

pub struct Endpoints {
current: usize,
endpoints: Vec<Endpoint>,
}

Expand All @@ -43,64 +45,46 @@ impl Endpoints {
.into_iter()
.enumerate()
.map(|(index, url)| Endpoint {
index,
status: EndpointStatus::Online,
is_online: AtomicBool::new(true),
is_fallback: index > 0,
url,
capabilities: ArcSwap::from_pointee(HashSet::default()),
})
.collect();

Self {
current: 0,
endpoints,
}
Self { endpoints }
}

pub fn el_offline(&self) -> bool {
self.endpoints
.iter()
.all(|endpoint| endpoint.status.is_offline())
}

pub fn current(&self) -> Option<&Endpoint> {
self.endpoints.get(self.current)
self.endpoints.iter().all(|endpoint| !endpoint.is_online())
}

pub fn is_empty(&self) -> bool {
self.endpoints.is_empty()
}

pub fn peek_next(&self) -> Option<&Endpoint> {
self.endpoints.get(self.next_index())
}

pub fn advance(&mut self) {
self.current = self.next_index();
}

pub fn set_status(&mut self, status: EndpointStatus) {
if let Some(current) = self.current_mut() {
current.status = status;
}
}

pub fn reset(&mut self) {
self.current = 0;
}

const fn next_index(&self) -> usize {
self.current.saturating_add(1)
}

fn current_mut(&mut self) -> Option<&mut Endpoint> {
self.endpoints.get_mut(self.current)
pub fn endpoints_for_request(
&self,
capability: Option<&str>,
) -> impl Iterator<Item = &Endpoint> {
self.endpoints
.iter()
.filter(|endpoint| {
capability
.map(|capability| endpoint.capabilities.load().contains(capability))
.unwrap_or(true)
})
.sorted_by_key(|endpoint| !endpoint.is_online())
}
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;

use anyhow::Result;

use crate::endpoints::{Endpoint, EndpointStatus, Endpoints};
use crate::{endpoints::Endpoints, eth1_api::ENGINE_GET_EL_BLOBS_V1};

#[test]
fn test_empty_endpoints() {
Expand All @@ -109,13 +93,14 @@ mod tests {
assert!(endpoints.is_empty());
assert!(endpoints.el_offline());

assert_eq!(endpoints.current(), None);
assert_eq!(endpoints.peek_next(), None);
let mut endpoints_for_request = endpoints.endpoints_for_request(None);

assert!(endpoints_for_request.next().is_none());
}

#[test]
fn test_endpoints() -> Result<()> {
let mut endpoints = Endpoints::new([
let endpoints = Endpoints::new([
"https://example1.net".parse()?,
"https://example2.net".parse()?,
]);
Expand All @@ -124,74 +109,89 @@ mod tests {
assert!(!endpoints.el_offline(), "initially endpoints are online");

assert_eq!(
endpoints.current().cloned(),
Some(Endpoint {
index: 0,
status: EndpointStatus::Online,
url: "https://example1.net".parse()?,
}),
);

assert_eq!(
endpoints.peek_next().cloned(),
Some(Endpoint {
index: 1,
status: EndpointStatus::Online,
url: "https://example2.net".parse()?,
}),
endpoints
.endpoints_for_request(None)
.map(|endpoint| (endpoint.url().clone(), endpoint.is_online()))
.collect::<Vec<_>>(),
[
("https://example1.net".parse()?, true),
("https://example2.net".parse()?, true),
]
);

endpoints.set_status(EndpointStatus::Offline);
// set first endpoint to be offline
let current_endpoint = endpoints
.endpoints_for_request(None)
.next()
.expect("current endpoint should be present");

assert_eq!(
endpoints.current().map(|endpoint| endpoint.status),
Some(EndpointStatus::Offline),
);
current_endpoint.set_online_status(false);

endpoints.advance();
assert!(!current_endpoint.is_online());
assert!(!endpoints.el_offline());

// check that online endpoint is used for requests
assert_eq!(
endpoints.current().cloned(),
Some(Endpoint {
index: 1,
status: EndpointStatus::Online,
url: "https://example2.net".parse()?,
}),
endpoints
.endpoints_for_request(None)
.map(|endpoint| (endpoint.url().clone(), endpoint.is_online()))
.collect::<Vec<_>>(),
[
("https://example2.net".parse()?, true),
("https://example1.net".parse()?, false),
]
);

assert_eq!(endpoints.peek_next(), None);
assert!(!endpoints.el_offline());
// set the fallback endpoint to be offline
let current_endpoint = endpoints
.endpoints_for_request(None)
.next()
.expect("current endpoint should be present");

endpoints.set_status(EndpointStatus::Offline);
endpoints.advance();
current_endpoint.set_online_status(false);

assert!(!endpoints.is_empty());
assert!(!current_endpoint.is_online());
assert!(endpoints.el_offline());

assert_eq!(endpoints.current(), None);
assert_eq!(endpoints.peek_next(), None);
assert_eq!(
endpoints
.endpoints_for_request(None)
.map(|endpoint| (endpoint.url().clone(), endpoint.is_online()))
.collect::<Vec<_>>(),
[
("https://example1.net".parse()?, false),
("https://example2.net".parse()?, false),
]
);

Ok(())
}

#[test]
fn test_endpoints_with_capabilities() -> Result<()> {
let endpoints = Endpoints::new([
"https://example1.net".parse()?,
"https://example2.net".parse()?,
]);

endpoints.reset();
assert!(endpoints
.endpoints_for_request(Some(ENGINE_GET_EL_BLOBS_V1))
.next()
.is_none());

// offline endpoints are still offline after reset
assert!(endpoints.el_offline());
let current_endpoint = endpoints
.endpoints_for_request(None)
.next()
.expect("current endpoint should be present");

assert_eq!(
endpoints.current().cloned(),
Some(Endpoint {
index: 0,
status: EndpointStatus::Offline,
url: "https://example1.net".parse()?,
}),
);
current_endpoint.set_capabilities(HashSet::from([ENGINE_GET_EL_BLOBS_V1.to_owned()]));

assert_eq!(
endpoints.peek_next().cloned(),
Some(Endpoint {
index: 1,
status: EndpointStatus::Offline,
url: "https://example2.net".parse()?,
}),
endpoints
.endpoints_for_request(Some(ENGINE_GET_EL_BLOBS_V1))
.map(|endpoint| endpoint.url().clone())
.collect::<Vec<_>>(),
["https://example1.net".parse()?]
);

Ok(())
Expand Down
Loading

0 comments on commit 047520e

Please sign in to comment.