Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(registry): added get_lookahead api handler, + minor refactor #34

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions registry/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ pub(crate) trait SyncTransaction {
pub(crate) trait RegistryDb: Clone + Send + Sync + 'static {
type SyncTransaction: SyncTransaction + Send;

/// Begin a new sync transaction. A sync transaction groups database mutations together in a
/// single atomic operation.
/// Begin a new sync transaction.
/// A sync transaction groups database mutations together in a single atomic operation.
async fn begin_sync(&self) -> DbResult<Self::SyncTransaction>;

/// Register validators in the database.
Expand Down
73 changes: 8 additions & 65 deletions registry/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
//! Entrypoint.
//! Entrypoint for the registry server binary.

use client::BeaconClient;
use tokio_stream::StreamExt;
use tracing::{error, info};
use url::Url;
use eyre::bail;
use tracing::{info, warn};

mod api;
use api::{
actions::{Action, ActionStream},
spec::RegistryError,
ApiConfig, RegistryApi,
};

mod client;

mod db;
use db::{InMemoryDb, RegistryDb, SQLDb};
use db::{InMemoryDb, SQLDb};

mod primitives;

Expand All @@ -40,7 +38,7 @@ async fn main() -> eyre::Result<()> {
let (srv, actions) = RegistryApi::new(ApiConfig::default());

if let Err(e) = srv.spawn().await {
error!("Failed to start API server: {}", e);
bail!("Failed to start API server: {}", e);
}

// Initialize the registry with the specified database backend.
Expand All @@ -50,70 +48,15 @@ async fn main() -> eyre::Result<()> {
if let Some(ref db_url) = config.db_url {
info!("Using PostgreSQL database backend");
let db = SQLDb::new(db_url).await?;
let registry = Registry::new(config, db, beacon);

handle_actions(actions, registry).await;
Registry::new(config, db, beacon).handle_actions(actions).await;
} else {
info!("Using In-memory database backend");
let db = InMemoryDb::default();
let registry = Registry::new(config, db, beacon);

handle_actions(actions, registry).await;
Registry::new(config, db, beacon).handle_actions(actions).await;
}

warn!("Action stream closed, shutting down...");
Ok(())
}

/// Handle incoming actions from the API server and update the registry.
async fn handle_actions<Db>(mut actions: ActionStream, mut registry: Registry<Db>)
where
Db: RegistryDb,
{
while let Some(action) = actions.next().await {
match action {
Action::Register { registration, response } => {
let res = registry.register_validators(registration).await;
let _ = response.send(res);
}
Action::Deregister { deregistration, response } => {
let res = registry.deregister_validators(deregistration).await;
let _ = response.send(res);
}
Action::GetRegistrations { response } => {
let res = registry.list_registrations().await;
let _ = response.send(res);
}
Action::GetValidators { response } => {
let res = registry.list_validators().await;
let _ = response.send(res);
}
Action::GetValidatorsByPubkeys { pubkeys, response } => {
let res = registry.get_validators_by_pubkey(&pubkeys).await;
let _ = response.send(res);
}
Action::GetValidatorsByIndices { indices, response } => {
let res = registry.get_validators_by_index(indices).await;
let _ = response.send(res);
}
Action::GetValidatorByPubkey { pubkey, response } => {
let res = registry.get_validators_by_pubkey(&[pubkey]).await;
let first_validator_res = res.map(|mut v| v.pop()).transpose();
let _ = response.send(first_validator_res.unwrap_or(Err(RegistryError::NotFound)));
}
Action::GetOperator { signer, response } => {
let res = registry.get_operators_by_signer(&[signer]).await;
let first_operator_res = res.map(|mut o| o.pop()).transpose();
let _ = response.send(first_operator_res.unwrap_or(Err(RegistryError::NotFound)));
}
Action::GetOperators { response } => {
let res = registry.list_operators().await;
let _ = response.send(res);
}
Action::GetLookahead { epoch, response } => {
// TODO: fetch lookahead from beacon node
// let res = registry.get_lookahead(epoch).await;
// let _ = response.send(res);
}
}
}
}
1 change: 1 addition & 0 deletions registry/src/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ impl BlsPublicKey {
Ok(Self(bls::PublicKey::deserialize(bytes)?))
}

/// Converts the BLS public key to an [`ethereum_consensus::crypto::PublicKey`].
pub(crate) fn to_consensus(&self) -> PublicKey {
PublicKey::try_from(self.0.compress().serialize().as_ref()).unwrap()
}
Expand Down
100 changes: 99 additions & 1 deletion registry/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;

use alloy::primitives::Address;
use tokio_stream::StreamExt;
use tracing::info;

use crate::{
Expand All @@ -9,11 +10,15 @@ use crate::{
client::BeaconClient,
db::RegistryDb,
primitives::{
registry::{DeregistrationBatch, Operator, Registration, RegistrationBatch, RegistryEntry},
registry::{
DeregistrationBatch, Lookahead, Operator, Registration, RegistrationBatch,
RegistryEntry,
},
BlsPublicKey,
},
sources::kapi::KeysApi,
sync::{SyncHandle, Syncer},
Action, ActionStream,
};

/// The main registry object.
Expand All @@ -31,6 +36,7 @@ impl<Db> Registry<Db>
where
Db: RegistryDb,
{
/// Create a new registry instance.
pub(crate) fn new(config: Config, db: Db, beacon: BeaconClient) -> Self {
let kapi = KeysApi::new(&config.keys_api_url);
// TODO: add health check for the keys API before proceeding
Expand All @@ -45,6 +51,59 @@ where
Self { db, beacon, sync: handle }
}

/// Handle incoming actions from the API server and update the registry.
///
/// This method will execute until the action stream is closed.
pub(crate) async fn handle_actions(mut self, mut actions: ActionStream) {
while let Some(action) = actions.next().await {
match action {
Action::Register { registration, response } => {
let res = self.register_validators(registration).await;
response.send(res).ok();
}
Action::Deregister { deregistration, response } => {
let res = self.deregister_validators(deregistration).await;
response.send(res).ok();
}
Action::GetRegistrations { response } => {
let res = self.list_registrations().await;
response.send(res).ok();
}
Action::GetValidators { response } => {
let res = self.list_validators().await;
response.send(res).ok();
}
Action::GetValidatorsByPubkeys { pubkeys, response } => {
let res = self.get_validators_by_pubkey(&pubkeys).await;
response.send(res).ok();
}
Action::GetValidatorsByIndices { indices, response } => {
let res = self.get_validators_by_index(indices).await;
response.send(res).ok();
}
Action::GetValidatorByPubkey { pubkey, response } => {
let res = self.get_validators_by_pubkey(&[pubkey]).await;
let first_validator_res = res.map(|mut v| v.pop()).transpose();
response.send(first_validator_res.unwrap_or(Err(RegistryError::NotFound))).ok();
}
Action::GetOperator { signer, response } => {
let res = self.get_operators_by_signer(&[signer]).await;
let first_operator_res = res.map(|mut o| o.pop()).transpose();
response.send(first_operator_res.unwrap_or(Err(RegistryError::NotFound))).ok();
}
Action::GetOperators { response } => {
let res = self.list_operators().await;
response.send(res).ok();
}
Action::GetLookahead { epoch, response } => {
let res = self.get_lookahead(epoch).await;
response.send(res).ok();
}
}
}
}

/// Register validators in the registry.
pub(crate) async fn register_validators(
&mut self,
registration: RegistrationBatch,
Expand Down Expand Up @@ -84,6 +143,7 @@ where
Ok(())
}

/// Deregister validators from the registry.
pub(crate) async fn deregister_validators(
&mut self,
deregistration: DeregistrationBatch,
Expand All @@ -98,11 +158,13 @@ where
Ok(())
}

/// List all registrations in the registry.
pub(crate) async fn list_registrations(&mut self) -> Result<Vec<Registration>, RegistryError> {
self.sync.wait_for_sync().await;
Ok(self.db.list_registrations().await?)
}

/// Get registrations by validator public key.
pub(crate) async fn get_registrations_by_pubkey(
&mut self,
pubkeys: &[BlsPublicKey],
Expand All @@ -111,11 +173,13 @@ where
Ok(self.db.get_registrations_by_pubkey(pubkeys).await?)
}

/// List all validators in the registry.
pub(crate) async fn list_validators(&mut self) -> Result<Vec<RegistryEntry>, RegistryError> {
self.sync.wait_for_sync().await;
Ok(self.db.list_validators().await?)
}

/// Get validators by validator public key.
pub(crate) async fn get_validators_by_pubkey(
&mut self,
pubkeys: &[BlsPublicKey],
Expand All @@ -124,6 +188,7 @@ where
Ok(self.db.get_validators_by_pubkey(pubkeys).await?)
}

/// Get validators by validator index.
pub(crate) async fn get_validators_by_index(
&mut self,
indices: Vec<u64>,
Expand All @@ -132,16 +197,49 @@ where
Ok(self.db.get_validators_by_index(indices).await?)
}

/// List all operators in the registry.
pub(crate) async fn list_operators(&mut self) -> Result<Vec<Operator>, RegistryError> {
self.sync.wait_for_sync().await;
Ok(self.db.list_operators().await?)
}

/// Get operators by signer.
pub(crate) async fn get_operators_by_signer(
&mut self,
signers: &[Address],
) -> Result<Vec<Operator>, RegistryError> {
self.sync.wait_for_sync().await;
Ok(self.db.get_operators_by_signer(signers).await?)
}

/// Get the active validators that will propose in the given epoch
/// that are also registered in the registry.
pub(crate) async fn get_lookahead(&mut self, epoch: u64) -> Result<Lookahead, RegistryError> {
// 1. fetch the proposer duties from the beacon node
let proposer_duties = self.beacon.get_lookahead(epoch, false).await?;
let proposer_pubkeys = proposer_duties
.iter()
.map(|d| BlsPublicKey::from_bytes(&d.public_key).expect("valid BLS pubkey"))
.collect::<Vec<_>>();

// 2. fetch the registry entries from the database
self.sync.wait_for_sync().await;
let registry_entries = self.db.get_validators_by_pubkey(&proposer_pubkeys).await?;

// 3. map registry entries to their proposal slot. example result:
//
// 10936976: { validator_pubkey: 0x1234, operator: 0x5678, gas_limit: 1000000, rpc_endpoint: https://rpc.example.com }
// 10936977: { validator_pubkey: 0x9214, operator: 0x5678, gas_limit: 1000000, rpc_endpoint: https://rpc.example.com }
// 10936978: { validator_pubkey: 0x1983, operator: 0x5678, gas_limit: 1000000, rpc_endpoint: https://rpc.example.com }
let mut lookahead = HashMap::new();
for duty in proposer_duties {
let bls_pubkey = BlsPublicKey::from_bytes(&duty.public_key).expect("valid BLS pubkey");
if let Some(entry) = registry_entries.iter().find(|e| e.validator_pubkey == bls_pubkey)
{
lookahead.insert(duty.slot, entry.clone());
}
}

Ok(lookahead)
}
}
Loading