Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge bulk lookup proofs #163

Merged
merged 8 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions akd/src/append_only_zks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tokio::time::Instant;
use winter_crypto::{Digest, Hasher};

use keyed_priority_queue::{Entry, KeyedPriorityQueue};
use std::collections::HashSet;

/// The default azks key
pub const DEFAULT_AZKS_KEY: u8 = 1u8;
Expand Down Expand Up @@ -132,9 +133,6 @@ impl Azks {
storage: &S,
insertion_set: &[Node<H>],
) -> Result<u64, AkdError> {
let mut load_count: u64 = 0;
let mut current_nodes = vec![NodeKey(NodeLabel::root())];

let prefixes_set = crate::utils::build_prefixes_set(
insertion_set
.iter()
Expand All @@ -143,6 +141,18 @@ impl Azks {
.as_ref(),
);

self.bfs_preload_nodes::<S, H>(storage, prefixes_set).await
}

/// Preloads given nodes using breadth-first search.
pub async fn bfs_preload_nodes<S: Storage + Sync + Send, H: Hasher>(
&self,
storage: &S,
nodes_to_load: HashSet<NodeLabel>,
) -> Result<u64, AkdError> {
let mut load_count: u64 = 0;
let mut current_nodes = vec![NodeKey(NodeLabel::root())];

while !current_nodes.is_empty() {
let nodes = HistoryTreeNode::batch_get_from_storage(
storage,
Expand All @@ -166,7 +176,7 @@ impl Azks {
// Note, the two for loops are needed because otherwise, you'd be accessing remote storage
// individually for each node's state.
for node in &nodes {
if !prefixes_set.contains(&node.label) {
if !nodes_to_load.contains(&node.label) {
// Only continue to traverse nodes which are relevant prefixes to insertion_set
continue;
}
Expand Down Expand Up @@ -331,7 +341,6 @@ impl Azks {
self.get_latest_epoch(),
)
.await?;
println!("Label of child {} is {:?}", i, unwrapped_child.label);
longest_prefix_children[i] = Node {
label: unwrapped_child.label,
hash: unwrapped_child
Expand All @@ -341,7 +350,6 @@ impl Azks {
}
}
}
println!("Lcp label = {:?}", longest_prefix);
Ok(NonMembershipProof {
label,
longest_prefix,
Expand Down
186 changes: 139 additions & 47 deletions akd/src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use log::{debug, error, info};
#[cfg(feature = "rand")]
use rand::{CryptoRng, RngCore};

use crate::node_state::NodeLabel;
use std::collections::HashMap;
use std::marker::{Send, Sync};
use std::sync::Arc;
Expand All @@ -31,6 +32,16 @@ use winter_crypto::Hasher;
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct EpochHash<H: Hasher>(pub u64, pub H::Digest);

#[derive(Clone)]
/// Info needed for a lookup of a user for an epoch
pub struct LookupInfo {
value_state: ValueState,
marker_version: u64,
existent_label: NodeLabel,
marker_label: NodeLabel,
non_existent_label: NodeLabel,
}

#[cfg(feature = "rand")]
impl AkdValue {
/// Gets a random value for a AKD
Expand Down Expand Up @@ -219,8 +230,6 @@ impl<S: Storage + Sync + Send, V: VRFKeyStorage> Directory<S, V> {
.get_root_hash_at_epoch::<_, H>(&self.storage, next_epoch)
.await?;

self.storage.log_metrics(log::Level::Info).await;

Ok(EpochHash(current_epoch, root_hash))
// At the moment the tree root is not being written anywhere. Eventually we
// want to change this to call a write operation to post to a blockchain or some such thing
Expand All @@ -231,70 +240,153 @@ impl<S: Storage + Sync + Send, V: VRFKeyStorage> Directory<S, V> {
// The guard will be dropped at the end of the proof generation
let _guard = self.cache_lock.read().await;

let current_azks = self.retrieve_current_azks().await?;
let current_epoch = current_azks.get_latest_epoch();
let lookup_info = self
.get_lookup_info::<H>(uname.clone(), current_epoch)
.await?;
let lookup_proof = self
.lookup_with_info::<H>(uname, &current_azks, current_epoch, lookup_info)
.await;
lookup_proof
}

async fn lookup_with_info<H: Hasher>(
&self,
uname: AkdLabel,
current_azks: &Azks,
current_epoch: u64,
lookup_info: LookupInfo,
) -> Result<LookupProof<H>, AkdError> {
let current_version = lookup_info.value_state.version;

let lookup_proof = LookupProof {
epoch: current_epoch,
plaintext_value: lookup_info.value_state.plaintext_val,
version: lookup_info.value_state.version,
exisitence_vrf_proof: self
.vrf
.get_label_proof::<H>(&uname, false, current_version)
.await?
.to_bytes()
.to_vec(),
existence_proof: current_azks
.get_membership_proof(&self.storage, lookup_info.existent_label, current_epoch)
.await?,
marker_vrf_proof: self
.vrf
.get_label_proof::<H>(&uname, false, lookup_info.marker_version)
.await?
.to_bytes()
.to_vec(),
marker_proof: current_azks
.get_membership_proof(&self.storage, lookup_info.marker_label, current_epoch)
.await?,
freshness_vrf_proof: self
.vrf
.get_label_proof::<H>(&uname, true, current_version)
.await?
.to_bytes()
.to_vec(),
freshness_proof: current_azks
.get_non_membership_proof(
&self.storage,
lookup_info.non_existent_label,
current_epoch,
)
.await?,
};

Ok(lookup_proof)
}

// TODO(eoz): Call proof generations async
eozturk1 marked this conversation as resolved.
Show resolved Hide resolved
/// Allows efficient batch lookups by preloading necessary nodes for the lookups.
pub async fn batch_lookup<H: Hasher>(
&self,
unames: &[AkdLabel],
) -> Result<Vec<LookupProof<H>>, AkdError> {
let current_azks = self.retrieve_current_azks().await?;
let current_epoch = current_azks.get_latest_epoch();

// Take a union of the labels we will need proofs of for each lookup.
let mut lookup_labels = Vec::new();
let mut lookup_infos = Vec::new();
for uname in unames {
// Save lookup info for later use.
let lookup_info = self
eozturk1 marked this conversation as resolved.
Show resolved Hide resolved
.get_lookup_info::<H>(uname.clone(), current_epoch)
.await?;
lookup_infos.push(lookup_info.clone());

// A lookup proofs consists of the proofs for the following labels.
lookup_labels.push(lookup_info.existent_label);
lookup_labels.push(lookup_info.marker_label);
lookup_labels.push(lookup_info.non_existent_label);
}

// Create a union of set of prefixes we will need for lookups.
let lookup_prefixes_set = crate::utils::build_lookup_prefixes_set(&lookup_labels);

// Load nodes.
current_azks
.bfs_preload_nodes::<_, H>(&self.storage, lookup_prefixes_set)
.await?;

// Ensure we have got all lookup infos needed.
assert_eq!(unames.len(), lookup_infos.len());

let mut lookup_proofs = Vec::new();
for i in 0..unames.len() {
lookup_proofs.push(
self.lookup_with_info::<H>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the info? I think if we've done the proper BFS loading, we should only need to batch-get the value states in order to load them into the cache, then we can do the regular proof generation operations since we'll only be accessing from local memory

Copy link
Contributor Author

@eozturk1 eozturk1 Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to eliminate duplicate get_user_state queries. One in identifying which labels are needed and one in (previously) lookup for the same purpose. This way pre-loaded info is passed to the lookup function, so no re-loads.

Looking at the code for MySQL's get_user_state, are we missing looking up the queried state in cache?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, yes we've excluded get_user_state from the caching logic because of the filter parameters. How would you know a cached value matches "<= epoch" or some weird filter without going to the backing store? It sounds like we might need a new query at the storage layer to retrieve the max versions for a collection of users which is <= a given epoch. (If I'm remembering right). We could move forward with this however and simply open an issue for it, but we'll want to do it properly for batch proof generation. Here's we will hit order N queries just to gather the user states, while not awful, still not ideal when we can do a better query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see! Tracked in #169. I can take a stab at #166, #167 and #169 in a new PR. Does that sound good?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great, let's merge this in!

unames[i].clone(),
&current_azks,
current_epoch,
lookup_infos[i].clone(),
)
.await?,
);
}

Ok(lookup_proofs)
}

async fn get_lookup_info<H: Hasher>(
&self,
uname: AkdLabel,
epoch: u64,
) -> Result<LookupInfo, AkdError> {
match self
.storage
.get_user_state(&uname, ValueStateRetrievalFlag::LeqEpoch(current_epoch))
.get_user_state(&uname, ValueStateRetrievalFlag::LeqEpoch(epoch))
.await
{
Err(_) => {
// Need to throw an error
Err(AkdError::Directory(DirectoryError::NonExistentUser(
uname.0,
current_epoch,
uname.clone().0,
epoch,
)))
}
Ok(latest_st) => {
// Need to account for the case where the latest state is
// added but the database is in the middle of an update
let current_version = latest_st.version;
let marker_version = 1 << get_marker_version(current_version);
let existent_label = self
.vrf
.get_node_label::<H>(&uname, false, current_version)
.await?;
let version = latest_st.version;
let marker_version = 1 << get_marker_version(version);
let existent_label = self.vrf.get_node_label::<H>(&uname, false, version).await?;
let marker_label = self
.vrf
.get_node_label::<H>(&uname, false, marker_version)
.await?;
let non_existent_label = self
.vrf
.get_node_label::<H>(&uname, true, current_version)
.await?;
let current_azks = self.retrieve_current_azks().await?;
Ok(LookupProof {
epoch: current_epoch,
plaintext_value: latest_st.plaintext_val,
version: current_version,
exisitence_vrf_proof: self
.vrf
.get_label_proof::<H>(&uname, false, current_version)
.await?
.to_bytes()
.to_vec(),
existence_proof: current_azks
.get_membership_proof(&self.storage, existent_label, current_epoch)
.await?,
marker_vrf_proof: self
.vrf
.get_label_proof::<H>(&uname, false, marker_version)
.await?
.to_bytes()
.to_vec(),
marker_proof: current_azks
.get_membership_proof(&self.storage, marker_label, current_epoch)
.await?,
freshness_vrf_proof: self
.vrf
.get_label_proof::<H>(&uname, true, current_version)
.await?
.to_bytes()
.to_vec(),
freshness_proof: current_azks
.get_non_membership_proof(&self.storage, non_existent_label, current_epoch)
.await?,
let non_existent_label =
self.vrf.get_node_label::<H>(&uname, true, version).await?;
Ok(LookupInfo {
value_state: latest_st,
marker_version,
existent_label,
marker_label,
non_existent_label,
})
}
}
Expand Down
Loading