diff --git a/akd/src/append_only_zks.rs b/akd/src/append_only_zks.rs index 9efdf62a..3e3ed165 100644 --- a/akd/src/append_only_zks.rs +++ b/akd/src/append_only_zks.rs @@ -24,6 +24,7 @@ use tokio::time::Instant; use winter_crypto::{Digest, Hasher}; use keyed_priority_queue::{Entry, KeyedPriorityQueue}; +use std::collections::HashSet; /// The default azks key pub const DEFAULT_AZKS_KEY: u8 = 1u8; @@ -132,9 +133,6 @@ impl Azks { storage: &S, insertion_set: &[Node], ) -> Result { - let mut load_count: u64 = 0; - let mut current_nodes = vec![NodeKey(NodeLabel::root())]; - let prefixes_set = crate::utils::build_prefixes_set( insertion_set .iter() @@ -143,6 +141,18 @@ impl Azks { .as_ref(), ); + self.bfs_preload_nodes::(storage, prefixes_set).await + } + + /// Preloads given nodes using breadth-first search. + pub async fn bfs_preload_nodes( + &self, + storage: &S, + nodes_to_load: HashSet, + ) -> Result { + let mut load_count: u64 = 0; + let mut current_nodes = vec![NodeKey(NodeLabel::root())]; + while !current_nodes.is_empty() { let nodes = HistoryTreeNode::batch_get_from_storage( storage, @@ -166,7 +176,7 @@ impl Azks { // Note, the two for loops are needed because otherwise, you'd be accessing remote storage // individually for each node's state. for node in &nodes { - if !prefixes_set.contains(&node.label) { + if !nodes_to_load.contains(&node.label) { // Only continue to traverse nodes which are relevant prefixes to insertion_set continue; } @@ -331,7 +341,6 @@ impl Azks { self.get_latest_epoch(), ) .await?; - println!("Label of child {} is {:?}", i, unwrapped_child.label); longest_prefix_children[i] = Node { label: unwrapped_child.label, hash: unwrapped_child @@ -341,7 +350,6 @@ impl Azks { } } } - println!("Lcp label = {:?}", longest_prefix); Ok(NonMembershipProof { label, longest_prefix, diff --git a/akd/src/directory.rs b/akd/src/directory.rs index 111acd47..682bfa40 100644 --- a/akd/src/directory.rs +++ b/akd/src/directory.rs @@ -22,6 +22,7 @@ use log::{debug, error, info}; #[cfg(feature = "rand")] use rand::{CryptoRng, RngCore}; +use crate::node_state::NodeLabel; use std::collections::HashMap; use std::marker::{Send, Sync}; use std::sync::Arc; @@ -31,6 +32,16 @@ use winter_crypto::Hasher; #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub struct EpochHash(pub u64, pub H::Digest); +#[derive(Clone)] +/// Info needed for a lookup of a user for an epoch +pub struct LookupInfo { + value_state: ValueState, + marker_version: u64, + existent_label: NodeLabel, + marker_label: NodeLabel, + non_existent_label: NodeLabel, +} + #[cfg(feature = "rand")] impl AkdValue { /// Gets a random value for a AKD @@ -219,8 +230,6 @@ impl Directory { .get_root_hash_at_epoch::<_, H>(&self.storage, next_epoch) .await?; - self.storage.log_metrics(log::Level::Info).await; - Ok(EpochHash(current_epoch, root_hash)) // At the moment the tree root is not being written anywhere. Eventually we // want to change this to call a write operation to post to a blockchain or some such thing @@ -231,70 +240,153 @@ impl Directory { // The guard will be dropped at the end of the proof generation let _guard = self.cache_lock.read().await; + let current_azks = self.retrieve_current_azks().await?; + let current_epoch = current_azks.get_latest_epoch(); + let lookup_info = self + .get_lookup_info::(uname.clone(), current_epoch) + .await?; + let lookup_proof = self + .lookup_with_info::(uname, ¤t_azks, current_epoch, lookup_info) + .await; + lookup_proof + } + + async fn lookup_with_info( + &self, + uname: AkdLabel, + current_azks: &Azks, + current_epoch: u64, + lookup_info: LookupInfo, + ) -> Result, AkdError> { + let current_version = lookup_info.value_state.version; + + let lookup_proof = LookupProof { + epoch: current_epoch, + plaintext_value: lookup_info.value_state.plaintext_val, + version: lookup_info.value_state.version, + exisitence_vrf_proof: self + .vrf + .get_label_proof::(&uname, false, current_version) + .await? + .to_bytes() + .to_vec(), + existence_proof: current_azks + .get_membership_proof(&self.storage, lookup_info.existent_label, current_epoch) + .await?, + marker_vrf_proof: self + .vrf + .get_label_proof::(&uname, false, lookup_info.marker_version) + .await? + .to_bytes() + .to_vec(), + marker_proof: current_azks + .get_membership_proof(&self.storage, lookup_info.marker_label, current_epoch) + .await?, + freshness_vrf_proof: self + .vrf + .get_label_proof::(&uname, true, current_version) + .await? + .to_bytes() + .to_vec(), + freshness_proof: current_azks + .get_non_membership_proof( + &self.storage, + lookup_info.non_existent_label, + current_epoch, + ) + .await?, + }; + + Ok(lookup_proof) + } + + // TODO(eoz): Call proof generations async + /// Allows efficient batch lookups by preloading necessary nodes for the lookups. + pub async fn batch_lookup( + &self, + unames: &[AkdLabel], + ) -> Result>, AkdError> { let current_azks = self.retrieve_current_azks().await?; let current_epoch = current_azks.get_latest_epoch(); + // Take a union of the labels we will need proofs of for each lookup. + let mut lookup_labels = Vec::new(); + let mut lookup_infos = Vec::new(); + for uname in unames { + // Save lookup info for later use. + let lookup_info = self + .get_lookup_info::(uname.clone(), current_epoch) + .await?; + lookup_infos.push(lookup_info.clone()); + + // A lookup proofs consists of the proofs for the following labels. + lookup_labels.push(lookup_info.existent_label); + lookup_labels.push(lookup_info.marker_label); + lookup_labels.push(lookup_info.non_existent_label); + } + + // Create a union of set of prefixes we will need for lookups. + let lookup_prefixes_set = crate::utils::build_lookup_prefixes_set(&lookup_labels); + + // Load nodes. + current_azks + .bfs_preload_nodes::<_, H>(&self.storage, lookup_prefixes_set) + .await?; + + // Ensure we have got all lookup infos needed. + assert_eq!(unames.len(), lookup_infos.len()); + + let mut lookup_proofs = Vec::new(); + for i in 0..unames.len() { + lookup_proofs.push( + self.lookup_with_info::( + unames[i].clone(), + ¤t_azks, + current_epoch, + lookup_infos[i].clone(), + ) + .await?, + ); + } + + Ok(lookup_proofs) + } + + async fn get_lookup_info( + &self, + uname: AkdLabel, + epoch: u64, + ) -> Result { match self .storage - .get_user_state(&uname, ValueStateRetrievalFlag::LeqEpoch(current_epoch)) + .get_user_state(&uname, ValueStateRetrievalFlag::LeqEpoch(epoch)) .await { Err(_) => { // Need to throw an error Err(AkdError::Directory(DirectoryError::NonExistentUser( - uname.0, - current_epoch, + uname.clone().0, + epoch, ))) } Ok(latest_st) => { // Need to account for the case where the latest state is // added but the database is in the middle of an update - let current_version = latest_st.version; - let marker_version = 1 << get_marker_version(current_version); - let existent_label = self - .vrf - .get_node_label::(&uname, false, current_version) - .await?; + let version = latest_st.version; + let marker_version = 1 << get_marker_version(version); + let existent_label = self.vrf.get_node_label::(&uname, false, version).await?; let marker_label = self .vrf .get_node_label::(&uname, false, marker_version) .await?; - let non_existent_label = self - .vrf - .get_node_label::(&uname, true, current_version) - .await?; - let current_azks = self.retrieve_current_azks().await?; - Ok(LookupProof { - epoch: current_epoch, - plaintext_value: latest_st.plaintext_val, - version: current_version, - exisitence_vrf_proof: self - .vrf - .get_label_proof::(&uname, false, current_version) - .await? - .to_bytes() - .to_vec(), - existence_proof: current_azks - .get_membership_proof(&self.storage, existent_label, current_epoch) - .await?, - marker_vrf_proof: self - .vrf - .get_label_proof::(&uname, false, marker_version) - .await? - .to_bytes() - .to_vec(), - marker_proof: current_azks - .get_membership_proof(&self.storage, marker_label, current_epoch) - .await?, - freshness_vrf_proof: self - .vrf - .get_label_proof::(&uname, true, current_version) - .await? - .to_bytes() - .to_vec(), - freshness_proof: current_azks - .get_non_membership_proof(&self.storage, non_existent_label, current_epoch) - .await?, + let non_existent_label = + self.vrf.get_node_label::(&uname, true, version).await?; + Ok(LookupInfo { + value_state: latest_st, + marker_version, + existent_label, + marker_label, + non_existent_label, }) } } diff --git a/akd/src/node_state.rs b/akd/src/node_state.rs index 7021f5ef..5a4f6de2 100644 --- a/akd/src/node_state.rs +++ b/akd/src/node_state.rs @@ -148,6 +148,37 @@ impl NodeLabel { Self::new(out_val, len) } + // The sibling of a node in a binary tree has the same label as its sibling + // except its last bit is flipped (e.g., 000 and 001 are siblings). + // This function returns the sibling prefix of a specified length. + // The rest of the node label after the flipped bit is padded with zeroes. + // For instance, 010100 (length = 6) with sibling prefix length = 3 is 01[1]000 (length = 3) + // -- [bit] denoting flipped bit. + pub(crate) fn get_sibling_prefix(&self, mut len: u32) -> Self { + if len > self.get_len() { + len = self.get_len(); + } + + if len == 0 { + return Self::new([0u8; 32], 0); + } + + let usize_len: usize = (len - 1).try_into().unwrap(); + let byte_index = usize_len / 8; + let bit_index = usize_len % 8; + + let bit_flip_marker: u8 = 0b1 << (7 - bit_index); + + let mut val = self.get_val(); + val[byte_index] ^= bit_flip_marker; + + let mut out_val = [0u8; 32]; + out_val[..byte_index].clone_from_slice(&self.val[..byte_index]); + out_val[byte_index] = (val[byte_index] >> (7 - bit_index)) << (7 - bit_index); + + Self::new(out_val, len) + } + /// Takes as input a pointer to the caller and another NodeLabel, /// returns a NodeLabel that is the longest common prefix of the two. #[must_use] @@ -398,6 +429,15 @@ pub(crate) fn byte_arr_from_u64(input_int: u64) -> [u8; 32] { output_arr } +// Use test profile here other wise cargo complains function is never used. +#[cfg(test)] +fn byte_arr_from_u64_suffix(input_int: u64) -> [u8; 32] { + let mut output_arr = [0u8; 32]; + let input_arr = input_int.to_be_bytes(); + output_arr[24..32].clone_from_slice(&input_arr[..8]); + output_arr +} + #[cfg(test)] mod tests { use super::*; @@ -752,6 +792,51 @@ mod tests { ) } + #[test] + pub fn test_get_sibling_prefix() { + let label0 = NodeLabel::new(byte_arr_from_u64(0b0 << 63), 1); + let label0_sibling = NodeLabel::new(byte_arr_from_u64(0b1 << 63), 1); + + assert!(label0.get_sibling_prefix(1) == label0_sibling); + + let label1 = NodeLabel::new(byte_arr_from_u64(0b1 << 63), 1); + let label1_sibling = NodeLabel::new(byte_arr_from_u64(0b0 << 63), 1); + + assert!(label1.get_sibling_prefix(1) == label1_sibling); + + let label_rand_len_30 = NodeLabel::new( + byte_arr_from_u64( + 0b1010000000000110001111001000001000001000110100101010111111001110u64, + ), + 30, + ); + let label_rand_len_30_prefix_15_sibling = NodeLabel::new( + byte_arr_from_u64( + 0b1010000000000100000000000000000000000000000000000000000000000000u64, + ), + 15, + ); + + assert!(label_rand_len_30.get_sibling_prefix(15) == label_rand_len_30_prefix_15_sibling); + + let label_rand_len_256 = NodeLabel::new( + byte_arr_from_u64_suffix( + 0b1010000000000110001111001000001000001000110100101010111111001110u64, + ), + 256, + ); + let label_rand_len_256_prefix_256_sibling = NodeLabel::new( + byte_arr_from_u64_suffix( + 0b1010000000000110001111001000001000001000110100101010111111001111u64, + ), + 256, + ); + + assert!( + label_rand_len_256.get_sibling_prefix(256) == label_rand_len_256_prefix_256_sibling + ); + } + // Test for serialization / deserialization #[test] diff --git a/akd/src/utils.rs b/akd/src/utils.rs index e3e80905..65f1be94 100644 --- a/akd/src/utils.rs +++ b/akd/src/utils.rs @@ -27,6 +27,24 @@ pub(crate) fn build_prefixes_set(labels: &[NodeLabel]) -> HashSet { prefixes_set } +pub(crate) fn build_lookup_prefixes_set(labels: &[NodeLabel]) -> HashSet { + let mut lookup_prefixes_set = HashSet::new(); + for label in labels { + // We need the actual node for lookup too + lookup_prefixes_set.insert(*label); + for len in 0..(label.get_len() + 1) { + // Sibling prefixes unfortunately do not cover all the nodes we will need for + // a lookup proof. Although we can figure out which nodes are exactly needed + // this will require basically doing a pre-lookup. + // Instead here we load the prefixes as well. + // This combination (sibling- + self-prefixes) covers all the nodes we need. + lookup_prefixes_set.insert(label.get_prefix(len)); + lookup_prefixes_set.insert(label.get_sibling_prefix(len)); + } + } + lookup_prefixes_set +} + pub(crate) fn empty_node_hash() -> H::Digest { H::merge(&[H::hash(&EMPTY_VALUE), hash_label::(EMPTY_LABEL)]) } diff --git a/akd_mysql/src/mysql.rs b/akd_mysql/src/mysql.rs index 13225f25..02a2f539 100644 --- a/akd_mysql/src/mysql.rs +++ b/akd_mysql/src/mysql.rs @@ -16,7 +16,7 @@ use akd::storage::types::{ }; use akd::storage::{Storable, Storage}; use async_trait::async_trait; -use log::{debug, error, info, trace, warn}; +use log::{debug, error, info, warn}; use mysql_async::prelude::*; use mysql_async::*; @@ -79,7 +79,9 @@ pub struct AsyncMySqlDatabase { trans: LocalTransaction, num_reads: Arc>, + read_call_stats: Arc>>, num_writes: Arc>, + write_call_stats: Arc>>, time_read: Arc>, time_write: Arc>, @@ -118,7 +120,9 @@ impl Clone for AsyncMySqlDatabase { trans: LocalTransaction::new(), num_reads: self.num_reads.clone(), + read_call_stats: self.read_call_stats.clone(), num_writes: self.num_writes.clone(), + write_call_stats: self.write_call_stats.clone(), time_read: self.time_read.clone(), time_write: self.time_write.clone(), @@ -169,7 +173,9 @@ impl<'a> AsyncMySqlDatabase { trans: LocalTransaction::new(), num_reads: Arc::new(tokio::sync::RwLock::new(0)), + read_call_stats: Arc::new(tokio::sync::RwLock::new(HashMap::new())), num_writes: Arc::new(tokio::sync::RwLock::new(0)), + write_call_stats: Arc::new(tokio::sync::RwLock::new(HashMap::new())), time_read: Arc::new(tokio::sync::RwLock::new(Duration::from_millis(0))), time_write: Arc::new(tokio::sync::RwLock::new(Duration::from_millis(0))), @@ -412,6 +418,8 @@ impl<'a> AsyncMySqlDatabase { trans: Option>, ) -> Result<()> { *(self.num_writes.write().await) += 1; + self.record_call_stats('w', "internal_set".to_string(), "".to_string()) + .await; debug!("BEGIN MySQL set"); let tic = Instant::now(); @@ -454,6 +462,8 @@ impl<'a> AsyncMySqlDatabase { } *(self.num_writes.write().await) += records.len() as u64; + self.record_call_stats('w', "internal_batch_set".to_string(), "".to_string()) + .await; debug!("BEGIN Computing mysql parameters"); #[allow(clippy::needless_collect)] @@ -543,6 +553,19 @@ impl<'a> AsyncMySqlDatabase { Ok(()) } + async fn record_call_stats(&self, call_type: char, caller_name: String, data_type: String) { + let mut stats; + if call_type == 'r' { + stats = self.read_call_stats.write().await; + } else if call_type == 'w' { + stats = self.write_call_stats.write().await; + } else { + panic!("Unknown call type to record call stats for.") + } + let call_count = (*stats).entry(caller_name + "~" + &data_type).or_insert(0); + *call_count += 1; + } + fn try_dockers() -> std::io::Result { let potential_docker_paths = vec![ "/usr/local/bin/docker", @@ -674,12 +697,20 @@ impl Storage for AsyncMySqlDatabase { } let mut r = self.num_reads.write().await; + let mut rcs = self.read_call_stats.write().await; let mut w = self.num_writes.write().await; + let mut wcs = self.write_call_stats.write().await; let mut tr = self.time_read.write().await; let mut tw = self.time_write.write().await; + // Sort call stats for consistency. + let mut rcs_vec = (*rcs).iter().collect::>(); + let mut wcs_vec = (*wcs).iter().collect::>(); + rcs_vec.sort_by_key(|rc| rc.0); + wcs_vec.sort_by_key(|wc| wc.0); + let msg = format!( - "MySQL writes: {}, MySQL reads: {}, Time read: {} s, Time write: {} s\n\t{}\n\t{}\n\t{}", + "MySQL writes: {}, MySQL reads: {}, Time read: {} s, Time write: {} s\n\t{}\n\t{}\n\t{}\nRead call stats: {:?}\nWrite call stats: {:?}\n", *w, *r, (*tr).as_secs_f64(), @@ -687,15 +718,21 @@ impl Storage for AsyncMySqlDatabase { tree_size, node_state_size, value_state_size, + rcs_vec, + wcs_vec, ); *r = 0; + *rcs = HashMap::new(); *w = 0; + *wcs = HashMap::new(); *tr = Duration::from_millis(0); *tw = Duration::from_millis(0); match level { - log::Level::Trace => trace!("{}", msg), + // Currently logs cannot be captured unless they are + // println!. Normally Level::Trace should use the trace! macro. + log::Level::Trace => println!("{}", msg), log::Level::Debug => debug!("{}", msg), log::Level::Info => info!("{}", msg), log::Level::Warn => warn!("{}", msg), @@ -890,6 +927,12 @@ impl Storage for AsyncMySqlDatabase { id: St::Key, ) -> core::result::Result { *(self.num_reads.write().await) += 1; + self.record_call_stats( + 'r', + "get_direct:".to_string(), + format!("{:?}", St::data_type()), + ) + .await; debug!("BEGIN MySQL get {:?}", id); let result = async { @@ -1085,6 +1128,12 @@ impl Storage for AsyncMySqlDatabase { }; *(self.num_reads.write().await) += 1; + self.record_call_stats( + 'r', + "batch_get".to_string(), + format!("{:?}", St::data_type()), + ) + .await; match result.await { Ok(result_vec) => { @@ -1105,6 +1154,9 @@ impl Storage for AsyncMySqlDatabase { // This is the same as previous logic under "get_all" *(self.num_reads.write().await) += 1; + self.record_call_stats('r', "get_user_data".to_string(), "".to_string()) + .await; + // DO NOT log the user info, it's PII in the future debug!("BEGIN MySQL get user data"); let result = async { @@ -1198,6 +1250,8 @@ impl Storage for AsyncMySqlDatabase { flag: ValueStateRetrievalFlag, ) -> core::result::Result { *(self.num_reads.write().await) += 1; + self.record_call_stats('r', "get_user_state".to_string(), "".to_string()) + .await; debug!("BEGIN MySQL get user state (flag {:?})", flag); let result = async { @@ -1309,6 +1363,8 @@ impl Storage for AsyncMySqlDatabase { flag: ValueStateRetrievalFlag, ) -> core::result::Result, StorageError> { *(self.num_reads.write().await) += 1; + self.record_call_stats('r', "get_user_state_versions".to_string(), "".to_string()) + .await; let mut results = HashMap::new(); @@ -1502,6 +1558,8 @@ impl Storage for AsyncMySqlDatabase { epoch_in_question: u64, ) -> core::result::Result { *(self.num_reads.write().await) += 1; + self.record_call_stats('r', "get_epoch_lte_epoch".to_string(), "".to_string()) + .await; let result = async { let tic = Instant::now(); diff --git a/integration_tests/src/mysql_tests.rs b/integration_tests/src/mysql_tests.rs index 3c72285e..12b3e11f 100644 --- a/integration_tests/src/mysql_tests.rs +++ b/integration_tests/src/mysql_tests.rs @@ -62,3 +62,57 @@ async fn test_directory_operations() { info!("\n\n******** Completed MySQL Directory Operations Integration Test ********\n\n"); } + +#[tokio::test] +#[serial_test::serial] +async fn test_lookups() { + crate::test_util::log_init(log::Level::Info); + + info!("\n\n******** Starting MySQL Lookup Tests ********\n\n"); + + if AsyncMySqlDatabase::test_guard() { + // create the "test" database + if let Err(error) = AsyncMySqlDatabase::create_test_db( + "localhost", + Option::from("root"), + Option::from("example"), + Option::from(8001), + ) + .await + { + panic!("Error creating test database: {}", error); + } + + // connect to the newly created test db + let mysql_db = AsyncMySqlDatabase::new( + "localhost", + "test_db", + Option::from("root"), + Option::from("example"), + Option::from(8001), + MySqlCacheOptions::Default, + 200, + ) + .await; + + // delete all data from the db + if let Err(error) = mysql_db.delete_data().await { + error!("Error cleaning mysql prior to test suite: {}", error); + } + + let vrf = HardCodedAkdVRF {}; + crate::test_util::test_lookups::<_, HardCodedAkdVRF>(&mysql_db, &vrf, 50, 5, 100).await; + + // clean the test infra + if let Err(mysql_async::Error::Server(error)) = mysql_db.drop_tables().await { + error!( + "ERROR: Failed to clean MySQL test database with error {}", + error + ); + } + } else { + warn!("WARN: Skipping MySQL test due to test guard noting that the docker container appears to not be running."); + } + + info!("\n\n******** Completed MySQL Lookup Tests ********\n\n"); +} diff --git a/integration_tests/src/test_util.rs b/integration_tests/src/test_util.rs index 43123489..b8e264da 100644 --- a/integration_tests/src/test_util.rs +++ b/integration_tests/src/test_util.rs @@ -242,3 +242,122 @@ pub(crate) async fn directory_test_suite< } } } + +pub(crate) async fn test_lookups( + mysql_db: &S, + vrf: &V, + num_users: u64, + num_epochs: u64, + num_lookups: usize, +) { + // generate the test data + let mut rng = thread_rng(); + + let mut users: Vec = vec![]; + for _ in 0..num_users { + users.push( + thread_rng() + .sample_iter(&Alphanumeric) + .take(30) + .map(char::from) + .collect(), + ); + } + + // create & test the directory + let maybe_dir = Directory::<_, _>::new::(mysql_db, vrf, false).await; + match maybe_dir { + Err(akd_error) => panic!("Error initializing directory: {:?}", akd_error), + Ok(dir) => { + info!("AKD Directory started. Beginning tests"); + + // Publish `num_epochs` epochs of user material + for i in 1..=num_epochs { + let mut data = Vec::new(); + for value in users.iter() { + data.push((AkdLabel(value.clone()), AkdValue(format!("{}", i)))); + } + + if let Err(error) = dir.publish::(data, true).await { + panic!("Error publishing batch {:?}", error); + } else { + info!("Published epoch {}", i); + } + } + + // Perform `num_lookup` random lookup proofs on the published users + let azks = dir.retrieve_current_azks().await.unwrap(); + let root_hash = dir.get_root_hash::(&azks).await.unwrap(); + + // Pick a set of users to lookup + let mut labels = Vec::new(); + for user in users.iter().choose_multiple(&mut rng, num_lookups) { + let label = AkdLabel(user.clone()); + labels.push(label); + } + + println!("Metrics after publish(es)."); + reset_mysql_db::(&mysql_db).await; + + let start = Instant::now(); + // Lookup selected users one by one + for label in labels.clone() { + match dir.lookup::(label.clone()).await { + Err(error) => panic!("Error looking up user information {:?}", error), + Ok(proof) => { + let vrf_pk = dir.get_public_key().await.unwrap(); + if let Err(error) = + akd::client::lookup_verify::(&vrf_pk, root_hash, label, proof) + { + panic!("Lookup proof failed to verify {:?}", error); + } + } + } + } + println!( + "Individual {} lookups took {}ms.", + num_lookups, + start.elapsed().as_millis() + ); + + println!("Metrics after individual lookups:"); + reset_mysql_db::(&mysql_db).await; + + let start = Instant::now(); + // Bulk lookup selected users + match dir.batch_lookup::(&labels).await { + Err(error) => panic!("Error batch looking up user information {:?}", error), + Ok(proofs) => { + assert_eq!(labels.len(), proofs.len()); + + let vrf_pk = dir.get_public_key().await.unwrap(); + for i in 0..proofs.len() { + let label = labels[i].clone(); + let proof = proofs[i].clone(); + if let Err(error) = + akd::client::lookup_verify::(&vrf_pk, root_hash, label, proof) + { + panic!("Batch lookup failed to verify for index {} {:?}", i, error); + } + } + } + } + println!( + "Bulk {} lookups took {}ms.", + num_lookups, + start.elapsed().as_millis() + ); + + println!("Metrics after lookup proofs: "); + reset_mysql_db::(&mysql_db).await; + } + } +} + +// Reset MySQL database by logging metrics which resets the metrics, and flushing cache. +// These allow us to accurately assess the additional efficiency of +// bulk lookup proofs. +async fn reset_mysql_db(mysql_db: &S) { + mysql_db.log_metrics(Level::Trace).await; + mysql_db.flush_cache().await; +}