Skip to content

Commit

Permalink
Merge bulk lookup proofs (#163)
Browse files Browse the repository at this point in the history
* Keep track of MySQL read/write call stats

* Add support for bulk lookup proofs

* Use warn for failing Docker container search

* Allow Trace to be printed out

* Fix off by one in call stats

* Remove unnecessary  call

* Comment out logging metrics in directory

* Clarify get_sibling_prefix function
  • Loading branch information
eozturk1 authored Mar 9, 2022
1 parent 148d6ce commit 01e3e00
Show file tree
Hide file tree
Showing 7 changed files with 490 additions and 56 deletions.
20 changes: 14 additions & 6 deletions akd/src/append_only_zks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tokio::time::Instant;
use winter_crypto::{Digest, Hasher};

use keyed_priority_queue::{Entry, KeyedPriorityQueue};
use std::collections::HashSet;

/// The default azks key
pub const DEFAULT_AZKS_KEY: u8 = 1u8;
Expand Down Expand Up @@ -132,9 +133,6 @@ impl Azks {
storage: &S,
insertion_set: &[Node<H>],
) -> Result<u64, AkdError> {
let mut load_count: u64 = 0;
let mut current_nodes = vec![NodeKey(NodeLabel::root())];

let prefixes_set = crate::utils::build_prefixes_set(
insertion_set
.iter()
Expand All @@ -143,6 +141,18 @@ impl Azks {
.as_ref(),
);

self.bfs_preload_nodes::<S, H>(storage, prefixes_set).await
}

/// Preloads given nodes using breadth-first search.
pub async fn bfs_preload_nodes<S: Storage + Sync + Send, H: Hasher>(
&self,
storage: &S,
nodes_to_load: HashSet<NodeLabel>,
) -> Result<u64, AkdError> {
let mut load_count: u64 = 0;
let mut current_nodes = vec![NodeKey(NodeLabel::root())];

while !current_nodes.is_empty() {
let nodes = HistoryTreeNode::batch_get_from_storage(
storage,
Expand All @@ -166,7 +176,7 @@ impl Azks {
// Note, the two for loops are needed because otherwise, you'd be accessing remote storage
// individually for each node's state.
for node in &nodes {
if !prefixes_set.contains(&node.label) {
if !nodes_to_load.contains(&node.label) {
// Only continue to traverse nodes which are relevant prefixes to insertion_set
continue;
}
Expand Down Expand Up @@ -331,7 +341,6 @@ impl Azks {
self.get_latest_epoch(),
)
.await?;
println!("Label of child {} is {:?}", i, unwrapped_child.label);
longest_prefix_children[i] = Node {
label: unwrapped_child.label,
hash: unwrapped_child
Expand All @@ -341,7 +350,6 @@ impl Azks {
}
}
}
println!("Lcp label = {:?}", longest_prefix);
Ok(NonMembershipProof {
label,
longest_prefix,
Expand Down
186 changes: 139 additions & 47 deletions akd/src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use log::{debug, error, info};
#[cfg(feature = "rand")]
use rand::{CryptoRng, RngCore};

use crate::node_state::NodeLabel;
use std::collections::HashMap;
use std::marker::{Send, Sync};
use std::sync::Arc;
Expand All @@ -31,6 +32,16 @@ use winter_crypto::Hasher;
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct EpochHash<H: Hasher>(pub u64, pub H::Digest);

#[derive(Clone)]
/// Info needed for a lookup of a user for an epoch
pub struct LookupInfo {
value_state: ValueState,
marker_version: u64,
existent_label: NodeLabel,
marker_label: NodeLabel,
non_existent_label: NodeLabel,
}

#[cfg(feature = "rand")]
impl AkdValue {
/// Gets a random value for a AKD
Expand Down Expand Up @@ -219,8 +230,6 @@ impl<S: Storage + Sync + Send, V: VRFKeyStorage> Directory<S, V> {
.get_root_hash_at_epoch::<_, H>(&self.storage, next_epoch)
.await?;

self.storage.log_metrics(log::Level::Info).await;

Ok(EpochHash(current_epoch, root_hash))
// At the moment the tree root is not being written anywhere. Eventually we
// want to change this to call a write operation to post to a blockchain or some such thing
Expand All @@ -231,70 +240,153 @@ impl<S: Storage + Sync + Send, V: VRFKeyStorage> Directory<S, V> {
// The guard will be dropped at the end of the proof generation
let _guard = self.cache_lock.read().await;

let current_azks = self.retrieve_current_azks().await?;
let current_epoch = current_azks.get_latest_epoch();
let lookup_info = self
.get_lookup_info::<H>(uname.clone(), current_epoch)
.await?;
let lookup_proof = self
.lookup_with_info::<H>(uname, &current_azks, current_epoch, lookup_info)
.await;
lookup_proof
}

async fn lookup_with_info<H: Hasher>(
&self,
uname: AkdLabel,
current_azks: &Azks,
current_epoch: u64,
lookup_info: LookupInfo,
) -> Result<LookupProof<H>, AkdError> {
let current_version = lookup_info.value_state.version;

let lookup_proof = LookupProof {
epoch: current_epoch,
plaintext_value: lookup_info.value_state.plaintext_val,
version: lookup_info.value_state.version,
exisitence_vrf_proof: self
.vrf
.get_label_proof::<H>(&uname, false, current_version)
.await?
.to_bytes()
.to_vec(),
existence_proof: current_azks
.get_membership_proof(&self.storage, lookup_info.existent_label, current_epoch)
.await?,
marker_vrf_proof: self
.vrf
.get_label_proof::<H>(&uname, false, lookup_info.marker_version)
.await?
.to_bytes()
.to_vec(),
marker_proof: current_azks
.get_membership_proof(&self.storage, lookup_info.marker_label, current_epoch)
.await?,
freshness_vrf_proof: self
.vrf
.get_label_proof::<H>(&uname, true, current_version)
.await?
.to_bytes()
.to_vec(),
freshness_proof: current_azks
.get_non_membership_proof(
&self.storage,
lookup_info.non_existent_label,
current_epoch,
)
.await?,
};

Ok(lookup_proof)
}

// TODO(eoz): Call proof generations async
/// Allows efficient batch lookups by preloading necessary nodes for the lookups.
pub async fn batch_lookup<H: Hasher>(
&self,
unames: &[AkdLabel],
) -> Result<Vec<LookupProof<H>>, AkdError> {
let current_azks = self.retrieve_current_azks().await?;
let current_epoch = current_azks.get_latest_epoch();

// Take a union of the labels we will need proofs of for each lookup.
let mut lookup_labels = Vec::new();
let mut lookup_infos = Vec::new();
for uname in unames {
// Save lookup info for later use.
let lookup_info = self
.get_lookup_info::<H>(uname.clone(), current_epoch)
.await?;
lookup_infos.push(lookup_info.clone());

// A lookup proofs consists of the proofs for the following labels.
lookup_labels.push(lookup_info.existent_label);
lookup_labels.push(lookup_info.marker_label);
lookup_labels.push(lookup_info.non_existent_label);
}

// Create a union of set of prefixes we will need for lookups.
let lookup_prefixes_set = crate::utils::build_lookup_prefixes_set(&lookup_labels);

// Load nodes.
current_azks
.bfs_preload_nodes::<_, H>(&self.storage, lookup_prefixes_set)
.await?;

// Ensure we have got all lookup infos needed.
assert_eq!(unames.len(), lookup_infos.len());

let mut lookup_proofs = Vec::new();
for i in 0..unames.len() {
lookup_proofs.push(
self.lookup_with_info::<H>(
unames[i].clone(),
&current_azks,
current_epoch,
lookup_infos[i].clone(),
)
.await?,
);
}

Ok(lookup_proofs)
}

async fn get_lookup_info<H: Hasher>(
&self,
uname: AkdLabel,
epoch: u64,
) -> Result<LookupInfo, AkdError> {
match self
.storage
.get_user_state(&uname, ValueStateRetrievalFlag::LeqEpoch(current_epoch))
.get_user_state(&uname, ValueStateRetrievalFlag::LeqEpoch(epoch))
.await
{
Err(_) => {
// Need to throw an error
Err(AkdError::Directory(DirectoryError::NonExistentUser(
uname.0,
current_epoch,
uname.clone().0,
epoch,
)))
}
Ok(latest_st) => {
// Need to account for the case where the latest state is
// added but the database is in the middle of an update
let current_version = latest_st.version;
let marker_version = 1 << get_marker_version(current_version);
let existent_label = self
.vrf
.get_node_label::<H>(&uname, false, current_version)
.await?;
let version = latest_st.version;
let marker_version = 1 << get_marker_version(version);
let existent_label = self.vrf.get_node_label::<H>(&uname, false, version).await?;
let marker_label = self
.vrf
.get_node_label::<H>(&uname, false, marker_version)
.await?;
let non_existent_label = self
.vrf
.get_node_label::<H>(&uname, true, current_version)
.await?;
let current_azks = self.retrieve_current_azks().await?;
Ok(LookupProof {
epoch: current_epoch,
plaintext_value: latest_st.plaintext_val,
version: current_version,
exisitence_vrf_proof: self
.vrf
.get_label_proof::<H>(&uname, false, current_version)
.await?
.to_bytes()
.to_vec(),
existence_proof: current_azks
.get_membership_proof(&self.storage, existent_label, current_epoch)
.await?,
marker_vrf_proof: self
.vrf
.get_label_proof::<H>(&uname, false, marker_version)
.await?
.to_bytes()
.to_vec(),
marker_proof: current_azks
.get_membership_proof(&self.storage, marker_label, current_epoch)
.await?,
freshness_vrf_proof: self
.vrf
.get_label_proof::<H>(&uname, true, current_version)
.await?
.to_bytes()
.to_vec(),
freshness_proof: current_azks
.get_non_membership_proof(&self.storage, non_existent_label, current_epoch)
.await?,
let non_existent_label =
self.vrf.get_node_label::<H>(&uname, true, version).await?;
Ok(LookupInfo {
value_state: latest_st,
marker_version,
existent_label,
marker_label,
non_existent_label,
})
}
}
Expand Down
Loading

0 comments on commit 01e3e00

Please sign in to comment.