Skip to content

Commit

Permalink
test(sync): test lookahead sync with external source
Browse files Browse the repository at this point in the history
  • Loading branch information
mempirate committed Jan 9, 2025
1 parent 76bcf19 commit da00851
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 7 deletions.
6 changes: 3 additions & 3 deletions src/db/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tracing::info;

use super::{BlsPublicKey, DbResult, Operator, Registration, RegistryDb};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub(crate) struct InMemoryDb {
validator_registrations: Arc<RwLock<Vec<Registration>>>,
operator_registrations: Arc<RwLock<Vec<Operator>>>,
Expand All @@ -18,7 +18,7 @@ impl RegistryDb for InMemoryDb {
keys_count = registration.validator_pubkeys.len(),
sig_count = registration.signatures.len(),
digest = ?registration.digest(),
"NoOpDb: register_validators"
"InMemoryDb: register_validators"
);

let mut registrations = self.validator_registrations.write().unwrap();
Expand All @@ -28,7 +28,7 @@ impl RegistryDb for InMemoryDb {
}

async fn register_operator(&self, operator: Operator) -> DbResult<()> {
info!(signer = %operator.signer, "NoOpDb: register_operator");
info!(signer = %operator.signer, "InMemoryDb: register_operator");

let mut operators = self.operator_registrations.write().unwrap();
operators.push(operator);
Expand Down
3 changes: 2 additions & 1 deletion src/primitives/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use alloy::primitives::FixedBytes;
use derive_more::derive::{Deref, DerefMut, From};
use serde::{Deserialize, Serialize};

Expand All @@ -20,4 +21,4 @@ impl BlsPublicKey {

pub(crate) type BlsSignature = bls::Signature;

pub(crate) type Digest = [u8; 32];
pub(crate) type Digest = FixedBytes<32>;
3 changes: 2 additions & 1 deletion src/primitives/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ impl Registration {
hasher.update(self.gas_limit.to_be_bytes());
hasher.update(self.expiry.to_be_bytes());

hasher.finalize().into()
let arr: [u8; 32] = hasher.finalize().into();
arr.into()
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@ impl BeaconClient {
};
}
}

/// Gets the current epoch from the sync status `head_slot`.
pub(super) async fn get_epoch(&self) -> Result<u64, Error> {
loop {
match self.client.get_sync_status().await {
Ok(status) => break Ok(status.head_slot / 32),
Err(e) => {
warn!(error = ?e, "Failed to get epoch, retrying...");
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
}
}
}
}

/// An epoch transition event. Originates from the payload attribute stream, when the
Expand Down
62 changes: 60 additions & 2 deletions src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,15 @@ where
self.sync_lookahead(lookahead).await;

info!(elapsed = ?start.elapsed(), "Transition handled");
let _ = self.state.send(SyncState::Synced);
}

/// Syncs contract events from the last known block number to the given block number.
async fn sync_contract_events(&mut self, block_number: u64) {
// TODO:
// 1. Get contract logs from self.last_block_number to block_number
// 2. Sync to database
// 3. Update last_block_number = block_number
todo!()
}

/// Syncs the lookahead with external data sources.
Expand All @@ -159,7 +160,7 @@ where
let start = std::time::Instant::now();

let Some(source) = self.source.as_ref() else {
error!("No external source configured, skipping...");
info!("No external source configured, skipping...");
return;
};

Expand Down Expand Up @@ -199,3 +200,60 @@ where
}
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use crate::{db::InMemoryDb, primitives::registry::RegistryEntry, sources::mock::MockSource};

use super::*;

#[tokio::test]
async fn test_external_source_sync() -> eyre::Result<()> {
let _ = tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).try_init();

let Ok(beacon_url) = std::env::var("BEACON_URL") else {
tracing::warn!("Skipping test because of missing BEACON_URL");
return Ok(())
};

let db = InMemoryDb::default();
let (mut syncer, mut handle) = Syncer::new(beacon_url, db.clone());

let mut source = MockSource::new();

// Get current epoch and lookahead
let epoch = syncer.beacon_client.get_epoch().await?;
let lookahead = syncer.beacon_client.get_lookahead(epoch, true).await?;

let pubkey = lookahead.first().unwrap().public_key.clone();
let pubkey = BlsPublicKey::from_bytes(&pubkey).unwrap();

let operator = Address::default();

for duty in lookahead {
let entry = RegistryEntry {
validator_pubkey: BlsPublicKey::from_bytes(&duty.public_key).unwrap(),
operator,
gas_limit: 0,
rpc_endpoint: "https://rick.com".parse().unwrap(),
};

source.add_entry(entry);
}

syncer.set_source(source);
syncer.spawn();

// Wait for state to change to `Syncing`
handle.state.changed().await.unwrap();
// Wait for syncing to complete
handle.wait_for_sync().await;

// Check validator registration
assert!(db.get_validator_registration(pubkey.clone()).await?.is_some());

Ok(())
}
}

0 comments on commit da00851

Please sign in to comment.