Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl get_lookahead, minor refactoring #35

Merged
merged 2 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions registry/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ pub(crate) trait SyncTransaction {
pub(crate) trait RegistryDb: Clone + Send + Sync + 'static {
type SyncTransaction: SyncTransaction + Send;

/// Begin a new sync transaction. A sync transaction groups database mutations together in a
/// single atomic operation.
/// Begin a new sync transaction.
/// A sync transaction groups database mutations together in a single atomic operation.
async fn begin_sync(&self) -> DbResult<Self::SyncTransaction>;

/// Register validators in the database.
Expand Down
73 changes: 8 additions & 65 deletions registry/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
//! Entrypoint.
//! Entrypoint for the registry server binary.

use client::BeaconClient;
use tokio_stream::StreamExt;
use tracing::{error, info};
use url::Url;
use eyre::bail;
use tracing::{info, warn};

mod api;
use api::{
actions::{Action, ActionStream},
spec::RegistryError,
ApiConfig, RegistryApi,
};

mod client;

mod db;
use db::{InMemoryDb, RegistryDb, SQLDb};
use db::{InMemoryDb, SQLDb};

mod primitives;

Expand All @@ -40,7 +38,7 @@ async fn main() -> eyre::Result<()> {
let (srv, actions) = RegistryApi::new(ApiConfig::default());

if let Err(e) = srv.spawn().await {
error!("Failed to start API server: {}", e);
bail!("Failed to start API server: {}", e);
}

// Initialize the registry with the specified database backend.
Expand All @@ -50,70 +48,15 @@ async fn main() -> eyre::Result<()> {
if let Some(ref db_url) = config.db_url {
info!("Using PostgreSQL database backend");
let db = SQLDb::new(db_url).await?;
let registry = Registry::new(config, db, beacon);

handle_actions(actions, registry).await;
Registry::new(config, db, beacon).handle_actions(actions).await;
} else {
info!("Using In-memory database backend");
let db = InMemoryDb::default();
let registry = Registry::new(config, db, beacon);

handle_actions(actions, registry).await;
Registry::new(config, db, beacon).handle_actions(actions).await;
}

warn!("Action stream closed, shutting down...");
Ok(())
}

/// Handle incoming actions from the API server and update the registry.
async fn handle_actions<Db>(mut actions: ActionStream, mut registry: Registry<Db>)
where
Db: RegistryDb,
{
while let Some(action) = actions.next().await {
match action {
Action::Register { registration, response } => {
let res = registry.register_validators(registration).await;
let _ = response.send(res);
}
Action::Deregister { deregistration, response } => {
let res = registry.deregister_validators(deregistration).await;
let _ = response.send(res);
}
Action::GetRegistrations { response } => {
let res = registry.list_registrations().await;
let _ = response.send(res);
}
Action::GetValidators { response } => {
let res = registry.list_validators().await;
let _ = response.send(res);
}
Action::GetValidatorsByPubkeys { pubkeys, response } => {
let res = registry.get_validators_by_pubkey(&pubkeys).await;
let _ = response.send(res);
}
Action::GetValidatorsByIndices { indices, response } => {
let res = registry.get_validators_by_index(indices).await;
let _ = response.send(res);
}
Action::GetValidatorByPubkey { pubkey, response } => {
let res = registry.get_validators_by_pubkey(&[pubkey]).await;
let first_validator_res = res.map(|mut v| v.pop()).transpose();
let _ = response.send(first_validator_res.unwrap_or(Err(RegistryError::NotFound)));
}
Action::GetOperator { signer, response } => {
let res = registry.get_operators_by_signer(&[signer]).await;
let first_operator_res = res.map(|mut o| o.pop()).transpose();
let _ = response.send(first_operator_res.unwrap_or(Err(RegistryError::NotFound)));
}
Action::GetOperators { response } => {
let res = registry.list_operators().await;
let _ = response.send(res);
}
Action::GetLookahead { epoch, response } => {
// TODO: fetch lookahead from beacon node
// let res = registry.get_lookahead(epoch).await;
// let _ = response.send(res);
}
}
}
}
1 change: 1 addition & 0 deletions registry/src/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ impl BlsPublicKey {
Ok(Self(bls::PublicKey::deserialize(bytes)?))
}

/// Converts the BLS public key to an [`ethereum_consensus::crypto::PublicKey`].
pub(crate) fn to_consensus(&self) -> PublicKey {
PublicKey::try_from(self.0.compress().serialize().as_ref()).unwrap()
}
Expand Down
100 changes: 99 additions & 1 deletion registry/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;

use alloy::primitives::Address;
use tokio_stream::StreamExt;
use tracing::info;

use crate::{
Expand All @@ -9,11 +10,15 @@ use crate::{
client::BeaconClient,
db::RegistryDb,
primitives::{
registry::{DeregistrationBatch, Operator, Registration, RegistrationBatch, RegistryEntry},
registry::{
DeregistrationBatch, Lookahead, Operator, Registration, RegistrationBatch,
RegistryEntry,
},
BlsPublicKey,
},
sources::kapi::KeysApi,
sync::{SyncHandle, Syncer},
Action, ActionStream,
};

/// The main registry object.
Expand All @@ -31,6 +36,7 @@ impl<Db> Registry<Db>
where
Db: RegistryDb,
{
/// Create a new registry instance.
pub(crate) fn new(config: Config, db: Db, beacon: BeaconClient) -> Self {
let kapi = KeysApi::new(&config.keys_api_url);
// TODO: add health check for the keys API before proceeding
Expand All @@ -45,6 +51,59 @@ where
Self { db, beacon, sync: handle }
}

/// Handle incoming actions from the API server and update the registry.
///
/// This method will execute until the action stream is closed.
pub(crate) async fn handle_actions(mut self, mut actions: ActionStream) {
while let Some(action) = actions.next().await {
match action {
Action::Register { registration, response } => {
let res = self.register_validators(registration).await;
response.send(res).ok();
}
Action::Deregister { deregistration, response } => {
let res = self.deregister_validators(deregistration).await;
response.send(res).ok();
}
Action::GetRegistrations { response } => {
let res = self.list_registrations().await;
response.send(res).ok();
}
Action::GetValidators { response } => {
let res = self.list_validators().await;
response.send(res).ok();
}
Action::GetValidatorsByPubkeys { pubkeys, response } => {
let res = self.get_validators_by_pubkey(&pubkeys).await;
response.send(res).ok();
}
Action::GetValidatorsByIndices { indices, response } => {
let res = self.get_validators_by_index(indices).await;
response.send(res).ok();
}
Action::GetValidatorByPubkey { pubkey, response } => {
let res = self.get_validators_by_pubkey(&[pubkey]).await;
let first_validator_res = res.map(|mut v| v.pop()).transpose();
response.send(first_validator_res.unwrap_or(Err(RegistryError::NotFound))).ok();
}
Action::GetOperator { signer, response } => {
let res = self.get_operators_by_signer(&[signer]).await;
let first_operator_res = res.map(|mut o| o.pop()).transpose();
response.send(first_operator_res.unwrap_or(Err(RegistryError::NotFound))).ok();
}
Action::GetOperators { response } => {
let res = self.list_operators().await;
response.send(res).ok();
}
Action::GetLookahead { epoch, response } => {
let res = self.get_lookahead(epoch).await;
response.send(res).ok();
}
}
}
}

/// Register validators in the registry.
pub(crate) async fn register_validators(
&mut self,
registration: RegistrationBatch,
Expand Down Expand Up @@ -84,6 +143,7 @@ where
Ok(())
}

/// Deregister validators from the registry.
pub(crate) async fn deregister_validators(
&mut self,
deregistration: DeregistrationBatch,
Expand All @@ -98,11 +158,13 @@ where
Ok(())
}

/// List all registrations in the registry.
pub(crate) async fn list_registrations(&mut self) -> Result<Vec<Registration>, RegistryError> {
self.sync.wait_for_sync().await;
Ok(self.db.list_registrations().await?)
}

/// Get registrations by validator public key.
pub(crate) async fn get_registrations_by_pubkey(
&mut self,
pubkeys: &[BlsPublicKey],
Expand All @@ -111,11 +173,13 @@ where
Ok(self.db.get_registrations_by_pubkey(pubkeys).await?)
}

/// List all validators in the registry.
pub(crate) async fn list_validators(&mut self) -> Result<Vec<RegistryEntry>, RegistryError> {
self.sync.wait_for_sync().await;
Ok(self.db.list_validators().await?)
}

/// Get validators by validator public key.
pub(crate) async fn get_validators_by_pubkey(
&mut self,
pubkeys: &[BlsPublicKey],
Expand All @@ -124,6 +188,7 @@ where
Ok(self.db.get_validators_by_pubkey(pubkeys).await?)
}

/// Get validators by validator index.
pub(crate) async fn get_validators_by_index(
&mut self,
indices: Vec<u64>,
Expand All @@ -132,16 +197,49 @@ where
Ok(self.db.get_validators_by_index(indices).await?)
}

/// List all operators in the registry.
pub(crate) async fn list_operators(&mut self) -> Result<Vec<Operator>, RegistryError> {
self.sync.wait_for_sync().await;
Ok(self.db.list_operators().await?)
}

/// Get operators by signer.
pub(crate) async fn get_operators_by_signer(
&mut self,
signers: &[Address],
) -> Result<Vec<Operator>, RegistryError> {
self.sync.wait_for_sync().await;
Ok(self.db.get_operators_by_signer(signers).await?)
}

/// Get the active validators that will propose in the given epoch
/// that are also registered in the registry.
pub(crate) async fn get_lookahead(&mut self, epoch: u64) -> Result<Lookahead, RegistryError> {
// 1. fetch the proposer duties from the beacon node
let proposer_duties = self.beacon.get_lookahead(epoch, false).await?;
let proposer_pubkeys = proposer_duties
.iter()
.map(|d| BlsPublicKey::from_bytes(&d.public_key).expect("valid BLS pubkey"))
.collect::<Vec<_>>();

// 2. fetch the registry entries from the database
self.sync.wait_for_sync().await;
let registry_entries = self.db.get_validators_by_pubkey(&proposer_pubkeys).await?;

// 3. map registry entries to their proposal slot. example result:
//
// 10936976: { validator_pubkey: 0x1234, operator: 0x5678, gas_limit: 1000000, rpc_endpoint: https://rpc.example.com }
// 10936977: { validator_pubkey: 0x9214, operator: 0x5678, gas_limit: 1000000, rpc_endpoint: https://rpc.example.com }
// 10936978: { validator_pubkey: 0x1983, operator: 0x5678, gas_limit: 1000000, rpc_endpoint: https://rpc.example.com }
let mut lookahead = HashMap::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit but can we create a type alias in primitives, to be used in openAPI spec?

type Lookahead = HashMap<u64, Entry>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's already like that, I guess I could use Lookahead::new() instead to make it more clear though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done 116d8f1

for duty in proposer_duties {
let bls_pubkey = BlsPublicKey::from_bytes(&duty.public_key).expect("valid BLS pubkey");
if let Some(entry) = registry_entries.iter().find(|e| e.validator_pubkey == bls_pubkey)
{
lookahead.insert(duty.slot, entry.clone());
}
}

Ok(lookahead)
}
}
Loading