From 3c5b89fd1cfd3fd57b31e94ed93ce0b138df02be Mon Sep 17 00:00:00 2001 From: Sean Lawlor Date: Tue, 15 Mar 2022 08:58:08 -0400 Subject: [PATCH] Support tombstone'd value states in the storage-layer (#161) * Adjust the public crate-level types which are commonly utilized by clients (not storage implementers) * Move AkdValue and AkdLabel to binary vectors instead of strings * Identify tombstones in client & add flag to support them, verification of hash coming * Tombstoning at the storage layer * Verify the hash of the plaintext value compared to the hash in the existence proof * Adding test coverage around tombstoned entries in the key history. * Debugging statements and rustfmt + clippy * CI clippy * Cleanup of leaf hashing logic * PR review * Clippy still failing client-side on Mac, so fixing CI indentified clippy lints * merge clippy's * rustfmt after clippy :sad_eyes: * PR review: move tombstone to empty array of data and add more test coverage * Addressing: #170. Adding a top N key updates to the history call. Also forcing key history proofs to be in max -> min, such that they go from highest version to lowest * clippy lint caught in CI Co-authored-by: Sean Lawlor --- .vscode/launch.json | 4 +- akd/src/append_only_zks.rs | 39 +- akd/src/auditor.rs | 2 +- akd/src/client.rs | 54 +- akd/src/directory.rs | 834 ++++++++++++++++++++--------- akd/src/ecvrf/ecvrf_impl.rs | 2 +- akd/src/ecvrf/no_vrf.rs | 2 +- akd/src/ecvrf/traits.rs | 2 +- akd/src/history_tree_node.rs | 36 +- akd/src/lib.rs | 77 ++- akd/src/serialization.rs | 87 ++- akd/src/storage/memory.rs | 181 +++++-- akd/src/storage/mod.rs | 17 +- akd/src/storage/tests.rs | 161 ++++-- akd/src/storage/transaction.rs | 8 +- akd/src/storage/types.rs | 52 +- akd/src/tests.rs | 20 +- akd/src/utils.rs | 6 + akd_client/src/tests.rs | 258 ++++++--- akd_client/src/types.rs | 7 + akd_client/src/utils.rs | 9 + akd_client/src/verify.rs | 58 +- akd_mysql/src/mysql.rs | 65 ++- akd_mysql/src/mysql_storables.rs | 9 +- integration_tests/src/test_util.rs | 23 +- poc/src/directory_host.rs | 29 +- poc/src/main.rs | 18 +- 27 files changed, 1439 insertions(+), 621 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 27800efd..5736ae6f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -13,9 +13,7 @@ "test", "--no-run", "--lib", - "--package=akd_client", - "--features=sha3_256,nostd", - "--no-default-features" + "--package=akd_client" ], }, "args": [], diff --git a/akd/src/append_only_zks.rs b/akd/src/append_only_zks.rs index ae867abc..6cf75ecb 100644 --- a/akd/src/append_only_zks.rs +++ b/akd/src/append_only_zks.rs @@ -21,7 +21,7 @@ use async_recursion::async_recursion; use log::{debug, info}; use std::marker::{Send, Sync}; use tokio::time::Instant; -use winter_crypto::{Digest, Hasher}; +use winter_crypto::Hasher; use keyed_priority_queue::{Entry, KeyedPriorityQueue}; use std::collections::HashSet; @@ -99,7 +99,7 @@ impl Azks { let new_leaf = get_leaf_node::( storage, node.label, - node.hash.as_bytes().as_ref(), + &node.hash, NodeLabel::root(), self.latest_epoch, ) @@ -107,7 +107,7 @@ impl Azks { let mut root_node = HistoryTreeNode::get_from_storage( storage, - NodeKey(NodeLabel::root()), + &NodeKey(NodeLabel::root()), self.get_latest_epoch(), ) .await?; @@ -156,7 +156,7 @@ impl Azks { while !current_nodes.is_empty() { let nodes = HistoryTreeNode::batch_get_from_storage( storage, - current_nodes, + ¤t_nodes, self.get_latest_epoch(), ) .await?; @@ -169,7 +169,7 @@ impl Azks { for node in &nodes { node_states.push(get_state_map_key(node, node.get_latest_epoch())); } - let states = storage.batch_get::(node_states).await?; + let states = storage.batch_get::(&node_states).await?; load_count += states.len() as u64; // Now that states are loaded in the cache, you can read and access them. @@ -220,13 +220,12 @@ impl Azks { ); self.increment_epoch(); - self.preload_nodes_for_insertion::(storage, &insertion_set) - .await?; + let mut hash_q = KeyedPriorityQueue::::new(); let mut priorities: i32 = 0; let mut root_node = HistoryTreeNode::get_from_storage( storage, - NodeKey(NodeLabel::root()), + &NodeKey(NodeLabel::root()), self.get_latest_epoch(), ) .await?; @@ -243,7 +242,7 @@ impl Azks { get_leaf_node::( storage, node.label, - node.hash.as_bytes().as_ref(), + &node.hash, NodeLabel::root(), self.latest_epoch, ) @@ -264,7 +263,7 @@ impl Azks { while let Some((next_node_label, _)) = hash_q.pop() { let mut next_node: HistoryTreeNode = HistoryTreeNode::get_from_storage( storage, - NodeKey(next_node_label), + &NodeKey(next_node_label), self.get_latest_epoch(), ) .await?; @@ -317,7 +316,7 @@ impl Azks { .await?; let lcp_node: HistoryTreeNode = HistoryTreeNode::get_from_storage( storage, - NodeKey(lcp_node_label), + &NodeKey(lcp_node_label), self.get_latest_epoch(), ) .await?; @@ -331,16 +330,17 @@ impl Azks { for (i, child) in state.child_states.iter().enumerate() { match child { None => { - println!("i = {}, empty", i); + debug!("i = {}, empty", i); continue; } Some(child) => { let unwrapped_child: HistoryTreeNode = HistoryTreeNode::get_from_storage( storage, - NodeKey(child.label), + &NodeKey(child.label), self.get_latest_epoch(), ) .await?; + debug!("Label of child {} is {:?}", i, unwrapped_child.label); longest_prefix_children[i] = Node { label: unwrapped_child.label, hash: unwrapped_child @@ -350,6 +350,7 @@ impl Azks { } } } + debug!("Lcp label = {:?}", longest_prefix); Ok(NonMembershipProof { label, longest_prefix, @@ -378,7 +379,7 @@ impl Azks { // between these epochs. let node = HistoryTreeNode::get_from_storage( storage, - NodeKey(NodeLabel::root()), + &NodeKey(NodeLabel::root()), self.get_latest_epoch(), ) .await?; @@ -441,7 +442,7 @@ impl Azks { Some(child_node_state) => { let child_node = HistoryTreeNode::get_from_storage( storage, - NodeKey(child_node_state.label), + &NodeKey(child_node_state.label), self.get_latest_epoch(), ) .await?; @@ -488,7 +489,7 @@ impl Azks { } let root_node: HistoryTreeNode = HistoryTreeNode::get_from_storage( storage, - NodeKey(NodeLabel::root()), + &NodeKey(NodeLabel::root()), self.get_latest_epoch(), ) .await?; @@ -518,7 +519,7 @@ impl Azks { let mut layer_proofs = Vec::new(); let mut curr_node: HistoryTreeNode = HistoryTreeNode::get_from_storage( storage, - NodeKey(NodeLabel::root()), + &NodeKey(NodeLabel::root()), self.get_latest_epoch(), ) .await?; @@ -561,7 +562,7 @@ impl Azks { }); let new_curr_node: HistoryTreeNode = HistoryTreeNode::get_from_storage( storage, - NodeKey( + &NodeKey( curr_node .get_child_label_at_epoch::<_, H>(storage, epoch, dir) .await?, @@ -576,7 +577,7 @@ impl Azks { if !equal { let new_curr_node: HistoryTreeNode = HistoryTreeNode::get_from_storage( storage, - NodeKey(prev_node), + &NodeKey(prev_node), self.get_latest_epoch(), ) .await?; diff --git a/akd/src/auditor.rs b/akd/src/auditor.rs index 4afc97bd..40ed1e51 100644 --- a/akd/src/auditor.rs +++ b/akd/src/auditor.rs @@ -12,10 +12,10 @@ use std::marker::{Send, Sync}; use winter_crypto::Hasher; use crate::{ - append_only_zks::Azks, errors::{AkdError, AzksError}, proof_structs::AppendOnlyProof, storage::memory::AsyncInMemoryDatabase, + Azks, }; /// Verifies an audit proof, given start and end hashes for a merkle patricia tree. diff --git a/akd/src/client.rs b/akd/src/client.rs index a6a79d18..5b41c689 100644 --- a/akd/src/client.rs +++ b/akd/src/client.rs @@ -105,7 +105,6 @@ pub fn lookup_verify( akd_key: AkdLabel, proof: LookupProof, ) -> Result<(), AkdError> { - let _plaintext_value = proof.plaintext_value; let version = proof.version; let marker_version = 1 << get_marker_version(version); @@ -113,6 +112,12 @@ pub fn lookup_verify( let marker_proof = proof.marker_proof; let freshness_proof = proof.freshness_proof; + if hash_plaintext_value::(&proof.plaintext_value) != existence_proof.hash_val { + return Err(AkdError::Directory(DirectoryError::VerifyLookupProof( + "Hash of plaintext value did not match expected hash in existence proof".to_string(), + ))); + } + let fresh_label = existence_proof.label; vrf_pk.verify_label::( &akd_key, @@ -148,25 +153,32 @@ pub fn lookup_verify( } /// Verifies a key history proof, given the corresponding sequence of hashes. +/// Returns a vector of whether the validity of a hash could be verified. +/// When false, the value <=> hash validity at the position could not be +/// verified because the value has been removed ("tombstoned") from the storage layer. pub fn key_history_verify( vrf_pk: &VRFPublicKey, root_hashes: Vec, previous_root_hashes: Vec>, uname: AkdLabel, proof: HistoryProof, -) -> Result<(), AkdError> { + allow_tombstones: bool, +) -> Result, AkdError> { + let mut tombstones = vec![]; for (count, update_proof) in proof.proofs.into_iter().enumerate() { let root_hash = root_hashes[count]; let previous_root_hash = previous_root_hashes[count]; - verify_single_update_proof::( + let is_tombstone = verify_single_update_proof::( root_hash, vrf_pk, previous_root_hash, update_proof, &uname, + allow_tombstones, )?; + tombstones.push(is_tombstone); } - Ok(()) + Ok(tombstones) } /// Verifies a single update proof @@ -176,9 +188,9 @@ fn verify_single_update_proof( previous_root_hash: Option, proof: UpdateProof, uname: &AkdLabel, -) -> Result<(), AkdError> { + allow_tombstones: bool, +) -> Result { let epoch = proof.epoch; - let _plaintext_value = &proof.plaintext_value; let version = proof.version; let existence_vrf_proof = proof.existence_vrf_proof; @@ -190,6 +202,27 @@ fn verify_single_update_proof( let non_existence_before_ep = &proof.non_existence_before_ep; + let (is_tombstone, value_hash_valid) = match (allow_tombstones, &proof.plaintext_value) { + (true, bytes) if bytes.0 == crate::TOMBSTONE => { + // A tombstone was encountered, we need to just take the + // hash of the value at "face value" since we don't have + // the real value available + (true, true) + } + (_, bytes) => { + // No tombstone so hash the value found, and compare to the existence proof's value + ( + false, + hash_plaintext_value::(bytes) == existence_at_ep.hash_val, + ) + } + }; + if !value_hash_valid { + return Err(AkdError::Directory(DirectoryError::VerifyKeyHistoryProof( + format!("Hash of plaintext value (v: {}) did not match expected hash in existence proof at epoch {}", version, epoch), + ))); + } + // ***** PART 1 *************************** // Verify the VRF and membership proof for the corresponding label for the version being updated to. vrf_pk.verify_label::( @@ -289,7 +322,9 @@ fn verify_single_update_proof( } } - Ok(()) + // return indicator of if the value <=> hash mapping was verified + // or if the hash was simply taken at face-value. True = hash mapping verified + Ok(is_tombstone) } /// Hashes all the children of a node, as well as their labels @@ -316,3 +351,8 @@ fn hash_layer(hashes: Vec, parent_label: NodeLabel) -> H:: new_hash = H::merge(&[new_hash, hash_label::(parent_label)]); new_hash } + +fn hash_plaintext_value(value: &crate::AkdValue) -> H::Digest { + let single_hash = crate::utils::value_to_bytes::(value); + H::merge(&[H::hash(&EMPTY_VALUE), single_hash]) +} diff --git a/akd/src/directory.rs b/akd/src/directory.rs index 89966201..b25dcb3a 100644 --- a/akd/src/directory.rs +++ b/akd/src/directory.rs @@ -46,7 +46,7 @@ pub struct LookupInfo { impl AkdValue { /// Gets a random value for a AKD pub fn random(rng: &mut R) -> Self { - Self(get_random_str(rng)) + Self::from_utf8_str(&get_random_str(rng)) } } @@ -54,7 +54,7 @@ impl AkdValue { impl AkdLabel { /// Creates a random key for a AKD pub fn random(rng: &mut R) -> Self { - Self(get_random_str(rng)) + Self::from_utf8_str(&get_random_str(rng)) } } @@ -103,7 +103,6 @@ impl Directory { pub async fn publish( &self, updates: Vec<(AkdLabel, AkdValue)>, - use_transaction: bool, ) -> Result, AkdError> { if self.read_only { return Err(AkdError::Directory(DirectoryError::ReadOnlyDirectory( @@ -123,7 +122,7 @@ impl Directory { let mut keys: Vec = updates.iter().map(|(uname, _val)| uname.clone()).collect(); // sort the keys, as inserting in primary-key order is more efficient for MySQL - keys.sort_by(|a, b| a.0.cmp(&b.0)); + keys.sort_by(|a, b| a.cmp(b)); // we're only using the maximum "version" of the user's state at the last epoch // they were seen in the directory. Therefore we've minimized the call to only @@ -151,7 +150,7 @@ impl Directory { .await?; // Currently there's no blinding factor for the commitment. // We'd want to change this later. - let value_to_add = H::hash(&Self::value_to_bytes(&val)); + let value_to_add = crate::utils::value_to_bytes::(&val); update_set.push(Node:: { label, hash: value_to_add, @@ -172,7 +171,7 @@ impl Directory { .get_node_label::(&uname, false, latest_version) .await?; let stale_value_to_add = H::hash(&[0u8]); - let fresh_value_to_add = H::hash(&Self::value_to_bytes(&val)); + let fresh_value_to_add = crate::utils::value_to_bytes::(&val); update_set.push(Node:: { label: stale_label, hash: stale_value_to_add, @@ -189,13 +188,11 @@ impl Directory { } let insertion_set: Vec> = update_set.to_vec(); - if use_transaction { - if let false = self.storage.begin_transaction().await { - error!("Transaction is already active"); - return Err(AkdError::Storage(StorageError::Transaction( - "Transaction is already active".to_string(), - ))); - } + if let false = self.storage.begin_transaction().await { + error!("Transaction is already active"); + return Err(AkdError::Storage(StorageError::Transaction( + "Transaction is already active".to_string(), + ))); } info!("Starting database insertion"); @@ -209,15 +206,15 @@ impl Directory { updates.push(DbRecord::ValueState(update)); } self.storage.batch_set(updates).await?; - if use_transaction { - debug!("Committing transaction"); - if let Err(err) = self.storage.commit_transaction().await { - // ignore any rollback error(s) - let _ = self.storage.rollback_transaction().await; - return Err(AkdError::Storage(err)); - } else { - debug!("Transaction committed"); - } + + // now commit the transaction + debug!("Committing transaction"); + if let Err(err) = self.storage.commit_transaction().await { + // ignore any rollback error(s) + let _ = self.storage.rollback_transaction().await; + return Err(AkdError::Storage(err)); + } else { + debug!("Transaction committed"); } let root_hash = current_azks @@ -356,10 +353,19 @@ impl Directory { .get_user_state(&uname, ValueStateRetrievalFlag::LeqEpoch(epoch)) .await { - Err(_) => Err(AkdError::Storage(StorageError::NotFound(format!( - "User {} at epoch {}", - uname.0, epoch - )))), + Err(_) => { + // Need to throw an error + match std::str::from_utf8(&uname) { + Ok(name) => Err(AkdError::Storage(StorageError::NotFound(format!( + "User {} at epoch {}", + name, epoch + )))), + _ => Err(AkdError::Storage(StorageError::NotFound(format!( + "User {:?} at epoch {}", + uname, epoch + )))), + } + } Ok(latest_st) => { // Need to account for the case where the latest state is // added but the database is in the middle of an update @@ -395,25 +401,81 @@ impl Directory { // The guard will be dropped at the end of the proof generation let _guard = self.cache_lock.read().await; - let username = uname.0.to_string(); + let username = uname.to_vec(); let current_azks = self.retrieve_current_azks().await?; let current_epoch = current_azks.get_latest_epoch(); if let Ok(this_user_data) = self.storage.get_user_data(uname).await { + let mut user_data = this_user_data.states; + // reverse sort from highest epoch to lowest + user_data.sort_by(|a, b| b.epoch.partial_cmp(&a.epoch).unwrap()); + let mut proofs = Vec::>::new(); - for user_state in &this_user_data.states { + for user_state in user_data { // Ignore states in storage that are ahead of current directory epoch if user_state.epoch <= current_epoch { - let proof = self.create_single_update_proof(uname, user_state).await?; + let proof = self.create_single_update_proof(uname, &user_state).await?; proofs.push(proof); } } Ok(HistoryProof { proofs }) } else { - Err(AkdError::Storage(StorageError::NotFound(format!( - "User {} at epoch {}", - username, current_epoch - )))) + match std::str::from_utf8(&username) { + Ok(name) => Err(AkdError::Storage(StorageError::NotFound(format!( + "User {} at epoch {}", + name, current_epoch + )))), + _ => Err(AkdError::Storage(StorageError::NotFound(format!( + "User {:?} at epoch {}", + username, current_epoch + )))), + } + } + } + + /// Takes in the current state of the server and a label along with + /// a "top" number of key updates to generate a proof for. + /// + /// If the label is present in the current state, + /// this function returns all the values & proof of validity + /// up to `top_n_updates` results. + pub async fn limited_key_history( + &self, + top_n_updates: usize, + uname: &AkdLabel, + ) -> Result, AkdError> { + // The guard will be dropped at the end of the proof generation + let _guard = self.cache_lock.read().await; + + let current_azks = self.retrieve_current_azks().await?; + let current_epoch = current_azks.get_latest_epoch(); + + let mut user_data = self.storage.get_user_data(uname).await?.states; + // reverse sort from highest epoch to lowest + user_data.sort_by(|a, b| b.epoch.partial_cmp(&a.epoch).unwrap()); + + let limited_history = user_data + .into_iter() + .take(top_n_updates) + .collect::>(); + + if limited_history.is_empty() { + let msg = if let Ok(username_str) = std::str::from_utf8(uname) { + format!("User {}", username_str) + } else { + format!("User {:?}", uname) + }; + Err(AkdError::Storage(StorageError::NotFound(msg))) + } else { + let mut proofs = Vec::>::new(); + for user_state in limited_history { + // Ignore states in storage that are ahead of current directory epoch + if user_state.epoch <= current_epoch { + let proof = self.create_single_update_proof(uname, &user_state).await?; + proofs.push(proof); + } + } + Ok(HistoryProof { proofs }) } } @@ -507,11 +569,11 @@ impl Directory { ) -> Result { let got = if ignore_cache { storage - .get_direct::(crate::append_only_zks::DEFAULT_AZKS_KEY) + .get_direct::(&crate::append_only_zks::DEFAULT_AZKS_KEY) .await? } else { storage - .get::(crate::append_only_zks::DEFAULT_AZKS_KEY) + .get::(&crate::append_only_zks::DEFAULT_AZKS_KEY) .await? }; match got { @@ -532,13 +594,6 @@ impl Directory { Ok(self.vrf.get_vrf_public_key().await?) } - // FIXME: Make a real commitment here, alongwith a blinding factor. See issue #123 - /// Gets the bytes for a value. - pub fn value_to_bytes(_value: &AkdValue) -> [u8; 64] { - [0u8; 64] - // unimplemented!() - } - async fn create_single_update_proof( &self, uname: &AkdLabel, @@ -761,10 +816,10 @@ mod tests { let vrf = HardCodedAkdVRF {}; let akd = Directory::<_, _>::new::(&db, &vrf, false).await?; - akd.publish::( - vec![(AkdLabel("hello".to_string()), AkdValue("world".to_string()))], - false, - ) + akd.publish::(vec![( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + )]) .await?; Ok(()) } @@ -775,26 +830,26 @@ mod tests { let vrf = HardCodedAkdVRF {}; let akd = Directory::<_, _>::new::(&db, &vrf, false).await?; - akd.publish::( - vec![ - (AkdLabel("hello".to_string()), AkdValue("world".to_string())), - ( - AkdLabel("hello2".to_string()), - AkdValue("world2".to_string()), - ), - ], - false, - ) + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2"), + ), + ]) .await?; - let lookup_proof = akd.lookup(AkdLabel("hello".to_string())).await?; + let lookup_proof = akd.lookup(AkdLabel::from_utf8_str("hello")).await?; let current_azks = akd.retrieve_current_azks().await?; let root_hash = akd.get_root_hash::(¤t_azks).await?; let vrf_pk = akd.get_public_key().await?; lookup_verify::>( &vrf_pk, root_hash, - AkdLabel("hello".to_string()), + AkdLabel::from_utf8_str("hello"), lookup_proof, )?; Ok(()) @@ -806,85 +861,73 @@ mod tests { let vrf = HardCodedAkdVRF {}; let akd = Directory::<_, _>::new::(&db, &vrf, false).await?; - akd.publish::( - vec![ - (AkdLabel("hello".to_string()), AkdValue("world".to_string())), - ( - AkdLabel("hello2".to_string()), - AkdValue("world2".to_string()), - ), - ], - false, - ) + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2"), + ), + ]) .await?; - akd.publish::( - vec![ - (AkdLabel("hello".to_string()), AkdValue("world".to_string())), - ( - AkdLabel("hello2".to_string()), - AkdValue("world2".to_string()), - ), - ], - false, - ) + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2"), + ), + ]) .await?; - akd.publish::( - vec![ - ( - AkdLabel("hello".to_string()), - AkdValue("world3".to_string()), - ), - ( - AkdLabel("hello2".to_string()), - AkdValue("world4".to_string()), - ), - ], - false, - ) + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world3"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world4"), + ), + ]) .await?; - akd.publish::( - vec![ - ( - AkdLabel("hello3".to_string()), - AkdValue("world".to_string()), - ), - ( - AkdLabel("hello4".to_string()), - AkdValue("world2".to_string()), - ), - ], - false, - ) + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello3"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello4"), + AkdValue::from_utf8_str("world2"), + ), + ]) .await?; - akd.publish::( - vec![( - AkdLabel("hello".to_string()), - AkdValue("world_updated".to_string()), - )], - false, - ) + akd.publish::(vec![( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world_updated"), + )]) .await?; - akd.publish::( - vec![ - ( - AkdLabel("hello3".to_string()), - AkdValue("world6".to_string()), - ), - ( - AkdLabel("hello4".to_string()), - AkdValue("world12".to_string()), - ), - ], - false, - ) + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello3"), + AkdValue::from_utf8_str("world6"), + ), + ( + AkdLabel::from_utf8_str("hello4"), + AkdValue::from_utf8_str("world12"), + ), + ]) .await?; - let history_proof = akd.key_history(&AkdLabel("hello".to_string())).await?; + let history_proof = akd.key_history(&AkdLabel::from_utf8_str("hello")).await?; let (root_hashes, previous_root_hashes) = get_key_history_hashes(&akd, &history_proof).await?; let vrf_pk = akd.get_public_key().await?; @@ -892,41 +935,45 @@ mod tests { &vrf_pk, root_hashes, previous_root_hashes, - AkdLabel("hello".to_string()), + AkdLabel::from_utf8_str("hello"), history_proof, + false, )?; - let history_proof = akd.key_history(&AkdLabel("hello2".to_string())).await?; + let history_proof = akd.key_history(&AkdLabel::from_utf8_str("hello2")).await?; let (root_hashes, previous_root_hashes) = get_key_history_hashes(&akd, &history_proof).await?; key_history_verify::( &vrf_pk, root_hashes, previous_root_hashes, - AkdLabel("hello2".to_string()), + AkdLabel::from_utf8_str("hello2"), history_proof, + false, )?; - let history_proof = akd.key_history(&AkdLabel("hello3".to_string())).await?; + let history_proof = akd.key_history(&AkdLabel::from_utf8_str("hello3")).await?; let (root_hashes, previous_root_hashes) = get_key_history_hashes(&akd, &history_proof).await?; key_history_verify::( &vrf_pk, root_hashes, previous_root_hashes, - AkdLabel("hello3".to_string()), + AkdLabel::from_utf8_str("hello3"), history_proof, + false, )?; - let history_proof = akd.key_history(&AkdLabel("hello4".to_string())).await?; + let history_proof = akd.key_history(&AkdLabel::from_utf8_str("hello4")).await?; let (root_hashes, previous_root_hashes) = get_key_history_hashes(&akd, &history_proof).await?; key_history_verify::( &vrf_pk, root_hashes, previous_root_hashes, - AkdLabel("hello4".to_string()), + AkdLabel::from_utf8_str("hello4"), history_proof, + false, )?; Ok(()) @@ -939,82 +986,70 @@ mod tests { let vrf = HardCodedAkdVRF {}; let mut akd = Directory::<_, _>::new::(&db, &vrf, false).await?; - akd.publish::( - vec![ - (AkdLabel("hello".to_string()), AkdValue("world".to_string())), - ( - AkdLabel("hello2".to_string()), - AkdValue("world2".to_string()), - ), - ], - false, - ) + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2"), + ), + ]) .await?; - akd.publish::( - vec![ - (AkdLabel("hello".to_string()), AkdValue("world".to_string())), - ( - AkdLabel("hello2".to_string()), - AkdValue("world2".to_string()), - ), - ], - false, - ) + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2"), + ), + ]) .await?; - akd.publish::( - vec![ - ( - AkdLabel("hello".to_string()), - AkdValue("world3".to_string()), - ), - ( - AkdLabel("hello2".to_string()), - AkdValue("world4".to_string()), - ), - ], - false, - ) + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world3"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world4"), + ), + ]) .await?; - akd.publish::( - vec![ - ( - AkdLabel("hello3".to_string()), - AkdValue("world".to_string()), - ), - ( - AkdLabel("hello4".to_string()), - AkdValue("world2".to_string()), - ), - ], - false, - ) + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello3"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello4"), + AkdValue::from_utf8_str("world2"), + ), + ]) .await?; - akd.publish::( - vec![( - AkdLabel("hello".to_string()), - AkdValue("world_updated".to_string()), - )], - false, - ) + akd.publish::(vec![( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world_updated"), + )]) .await?; - akd.publish::( - vec![ - ( - AkdLabel("hello3".to_string()), - AkdValue("world6".to_string()), - ), - ( - AkdLabel("hello4".to_string()), - AkdValue("world12".to_string()), - ), - ], - false, - ) + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello3"), + AkdValue::from_utf8_str("world6"), + ), + ( + AkdLabel::from_utf8_str("hello4"), + AkdValue::from_utf8_str("world12"), + ), + ]) .await?; let current_azks = akd.retrieve_current_azks().await?; @@ -1098,50 +1133,44 @@ mod tests { let akd = Directory::<_, _>::new::(&db, &vrf, false).await?; // Publish twice - akd.publish::( - vec![ - (AkdLabel("hello".to_string()), AkdValue("world".to_string())), - ( - AkdLabel("hello2".to_string()), - AkdValue("world2".to_string()), - ), - ], - false, - ) + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2"), + ), + ]) .await?; - akd.publish::( - vec![ - ( - AkdLabel("hello".to_string()), - AkdValue("world_2".to_string()), - ), - ( - AkdLabel("hello2".to_string()), - AkdValue("world2_2".to_string()), - ), - ], - false, - ) + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world_2"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2_2"), + ), + ]) .await?; // Make the current azks a "checkpoint" to reset to later let checkpoint_azks = akd.retrieve_current_azks().await.unwrap(); // Publish for the third time - akd.publish::( - vec![ - ( - AkdLabel("hello".to_string()), - AkdValue("world_3".to_string()), - ), - ( - AkdLabel("hello2".to_string()), - AkdValue("world2_3".to_string()), - ), - ], - false, - ) + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world_3"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2_3"), + ), + ]) .await?; // Reset the azks record back to previous epoch, to emulate an akd reader @@ -1154,7 +1183,7 @@ mod tests { // History proof should not contain the third epoch's update but still verify let history_proof = akd - .key_history::(&AkdLabel("hello".to_string())) + .key_history::(&AkdLabel::from_utf8_str("hello")) .await?; let (root_hashes, previous_root_hashes) = get_key_history_hashes(&akd, &history_proof).await?; @@ -1164,20 +1193,23 @@ mod tests { &vrf_pk, root_hashes, previous_root_hashes, - AkdLabel("hello".to_string()), + AkdLabel::from_utf8_str("hello"), history_proof, + false, )?; // Lookup proof should contain the checkpoint epoch's value and still verify - let lookup_proof = akd.lookup::(AkdLabel("hello".to_string())).await?; + let lookup_proof = akd + .lookup::(AkdLabel::from_utf8_str("hello")) + .await?; assert_eq!( - AkdValue("world_2".to_string()), + AkdValue::from_utf8_str("world_2"), lookup_proof.plaintext_value ); lookup_verify::( &vrf_pk, root_hash, - AkdLabel("hello".to_string()), + AkdLabel::from_utf8_str("hello"), lookup_proof, )?; @@ -1212,7 +1244,7 @@ mod tests { // create another read-only dir now that the AZKS exists in the storage layer, and try to publish which should fail let akd = Directory::<_, _>::new::(&db, &vrf, true).await?; - assert!(matches!(akd.publish::(vec![], true).await, Err(_))); + assert!(matches!(akd.publish::(vec![]).await, Err(_))); Ok(()) } @@ -1225,16 +1257,16 @@ mod tests { let writer = Directory::<_, _>::new::(&db, &vrf, false).await?; writer - .publish::( - vec![ - (AkdLabel("hello".to_string()), AkdValue("world".to_string())), - ( - AkdLabel("hello2".to_string()), - AkdValue("world2".to_string()), - ), - ], - false, - ) + .publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2"), + ), + ]) .await?; // reader will not write the AZKS but will be "polling" for AZKS changes @@ -1250,23 +1282,20 @@ mod tests { }); // verify a lookup proof, which will populate the cache - async_poll_helper_proof(&reader, AkdValue("world".to_string())).await?; + async_poll_helper_proof(&reader, AkdValue::from_utf8_str("world")).await?; // publish epoch 2 writer - .publish::( - vec![ - ( - AkdLabel("hello".to_string()), - AkdValue("world_2".to_string()), - ), - ( - AkdLabel("hello2".to_string()), - AkdValue("world2_2".to_string()), - ), - ], - false, - ) + .publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world_2"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2_2"), + ), + ]) .await?; // assert that the change is picked up in a reasonable time-frame and the cache is flushed @@ -1274,7 +1303,7 @@ mod tests { tokio::time::timeout(tokio::time::Duration::from_secs(10), rx.recv()).await; assert!(matches!(notification, Ok(Some(())))); - async_poll_helper_proof(&reader, AkdValue("world_2".to_string())).await?; + async_poll_helper_proof(&reader, AkdValue::from_utf8_str("world_2")).await?; Ok(()) } @@ -1288,12 +1317,283 @@ mod tests { value: AkdValue, ) -> Result<(), AkdError> { // reader should read "hello" and this will populate the "cache" a log - let lookup_proof = reader.lookup(AkdLabel("hello".to_string())).await?; + let lookup_proof = reader.lookup(AkdLabel::from_utf8_str("hello")).await?; assert_eq!(value, lookup_proof.plaintext_value); let current_azks = reader.retrieve_current_azks().await?; let root_hash = reader.get_root_hash::(¤t_azks).await?; let pk = reader.get_public_key().await?; - lookup_verify::(&pk, root_hash, AkdLabel("hello".to_string()), lookup_proof)?; + lookup_verify::( + &pk, + root_hash, + AkdLabel::from_utf8_str("hello"), + lookup_proof, + )?; + Ok(()) + } + + #[tokio::test] + async fn test_limited_key_history() -> Result<(), AkdError> { + let db = AsyncInMemoryDatabase::new(); + let vrf = HardCodedAkdVRF {}; + // epoch 0 + let akd = Directory::<_, _>::new::(&db, &vrf, false).await?; + + // epoch 1 + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2"), + ), + ]) + .await?; + + // epoch 2 + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2"), + ), + ]) + .await?; + + // epoch 3 + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world3"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world4"), + ), + ]) + .await?; + + // epoch 4 + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello3"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello4"), + AkdValue::from_utf8_str("world2"), + ), + ]) + .await?; + + // epoch 5 + akd.publish::(vec![( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world_updated"), + )]) + .await?; + + // epoch 6 + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello3"), + AkdValue::from_utf8_str("world6"), + ), + ( + AkdLabel::from_utf8_str("hello4"), + AkdValue::from_utf8_str("world12"), + ), + ]) + .await?; + + // epoch 7 + akd.publish::(vec![ + ( + AkdLabel::from_utf8_str("hello3"), + AkdValue::from_utf8_str("world7"), + ), + ( + AkdLabel::from_utf8_str("hello4"), + AkdValue::from_utf8_str("world13"), + ), + ]) + .await?; + + let vrf_pk = akd.get_public_key().await?; + + // "hello" was updated in epochs 1,2,3,5. Pull the latest item from the history (i.e. a lookup proof) + let history_proof = akd + .limited_key_history::(1, &AkdLabel::from_utf8_str("hello")) + .await?; + assert_eq!(1, history_proof.proofs.len()); + assert_eq!(5, history_proof.proofs[0].epoch); + + let (root_hashes, previous_root_hashes) = + get_key_history_hashes(&akd, &history_proof).await?; + key_history_verify::( + &vrf_pk, + root_hashes, + previous_root_hashes, + AkdLabel::from_utf8_str("hello"), + history_proof, + false, + )?; + + // Take the top 3 results, and check that we're getting the right epoch updates + let history_proof = akd + .limited_key_history::(3, &AkdLabel::from_utf8_str("hello")) + .await?; + assert_eq!(3, history_proof.proofs.len()); + assert_eq!(5, history_proof.proofs[0].epoch); + assert_eq!(3, history_proof.proofs[1].epoch); + assert_eq!(2, history_proof.proofs[2].epoch); + + let (root_hashes, previous_root_hashes) = + get_key_history_hashes(&akd, &history_proof).await?; + key_history_verify::( + &vrf_pk, + root_hashes, + previous_root_hashes, + AkdLabel::from_utf8_str("hello"), + history_proof, + false, + )?; + Ok(()) } + + #[tokio::test] + async fn test_tombstoned_key_history() -> Result<(), AkdError> { + let db = AsyncInMemoryDatabase::new(); + let vrf = HardCodedAkdVRF {}; + // epoch 0 + let akd = Directory::<_, _>::new::(&db, &vrf, false).await?; + + // epoch 1 + akd.publish::(vec![( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + )]) + .await?; + + // epoch 2 + akd.publish::(vec![( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world2"), + )]) + .await?; + + // epoch 3 + akd.publish::(vec![( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world3"), + )]) + .await?; + + // epoch 4 + akd.publish::(vec![( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world4"), + )]) + .await?; + + // epoch 5 + akd.publish::(vec![( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world5"), + )]) + .await?; + + // Epochs 1-5, we're going to tombstone 1 & 2 + let vrf_pk = akd.get_public_key().await?; + + // tombstone epochs 1 & 2 + let tombstones = [ + crate::storage::types::ValueStateKey("hello".as_bytes().to_vec(), 1u64), + crate::storage::types::ValueStateKey("hello".as_bytes().to_vec(), 2u64), + ]; + db.tombstone_value_states(&tombstones).await?; + + let history_proof = akd + .key_history::(&AkdLabel::from_utf8_str("hello")) + .await?; + assert_eq!(5, history_proof.proofs.len()); + let (root_hashes, previous_root_hashes) = + get_key_history_hashes(&akd, &history_proof).await?; + + // If we request a proof with tombstones but without saying we're OK with tombstones, throw an err + let tombstones = key_history_verify::( + &vrf_pk, + root_hashes.clone(), + previous_root_hashes.clone(), + AkdLabel::from_utf8_str("hello"), + history_proof.clone(), + false, + ); + assert!(matches!(tombstones, Err(_))); + + // We should be able to verify tombstones assuming the client is accepting + // of tombstoned states + let tombstones = key_history_verify::( + &vrf_pk, + root_hashes, + previous_root_hashes, + AkdLabel::from_utf8_str("hello"), + history_proof, + true, + )?; + assert_eq!(false, tombstones[0]); + assert_eq!(false, tombstones[1]); + assert_eq!(false, tombstones[2]); + assert_eq!(true, tombstones[3]); + assert_eq!(true, tombstones[4]); + + Ok(()) + } + + // // Test coverage on issue #144, verification failures with small trees (<4 nodes) + // #[tokio::test] + // async fn test_simple_lookup_for_small_tree() -> Result<(), AkdError> { + // let db = AsyncInMemoryDatabase::new(); + // let vrf = HardCodedAkdVRF {}; + // // epoch 0 + // let akd = Directory::<_, _>::new::(&db, &vrf, false).await?; + + // let mut updates = vec![]; + // for i in 0..1 { + // updates.push(( + // AkdLabel(format!("hello{}", i).as_bytes().to_vec()), + // AkdValue(format!("hello{}", i).as_bytes().to_vec()), + // )); + // } + + // akd.publish::(updates).await?; + + // let target_label = AkdLabel(format!("hello{}", 0).as_bytes().to_vec()); + + // // retrieve the lookup proof + // let lookup_proof = akd.lookup(target_label.clone()).await?; + // // retrieve the root hash + // let current_azks = akd.retrieve_current_azks().await?; + // let root_hash = akd.get_root_hash::(¤t_azks).await?; + + // let vrf_pk = vrf.get_vrf_public_key().await?; + + // // perform the "traditional" AKD verification + // let akd_result = crate::client::lookup_verify::( + // &vrf_pk, + // root_hash, + // target_label.clone(), + // lookup_proof, + // ); + + // // check the two results to make sure they both verify + // assert!(matches!(akd_result, Ok(()))); + + // Ok(()) + // } } diff --git a/akd/src/ecvrf/ecvrf_impl.rs b/akd/src/ecvrf/ecvrf_impl.rs index ad93ea24..0bf26c17 100644 --- a/akd/src/ecvrf/ecvrf_impl.rs +++ b/akd/src/ecvrf/ecvrf_impl.rs @@ -206,7 +206,7 @@ impl VRFPublicKey { ) -> Result<(), VrfError> { // Initialization of VRF context by providing a curve - let name_hash_bytes = H::hash(uname.0.as_bytes()); + let name_hash_bytes = H::hash(&uname.0); let stale_bytes = if stale { &[0u8] } else { &[1u8] }; let hashed_label = H::merge(&[ diff --git a/akd/src/ecvrf/no_vrf.rs b/akd/src/ecvrf/no_vrf.rs index 025ebf52..83adfe38 100644 --- a/akd/src/ecvrf/no_vrf.rs +++ b/akd/src/ecvrf/no_vrf.rs @@ -82,7 +82,7 @@ pub trait VRFKeyStorage: Clone + Sync + Send { version: u64, ) -> Result { // this function will need to read the VRF key using some function - let name_hash_bytes = H::hash(uname.0.as_bytes()); + let name_hash_bytes = H::hash(uname); let mut stale_bytes = &[1u8]; if stale { stale_bytes = &[0u8]; diff --git a/akd/src/ecvrf/traits.rs b/akd/src/ecvrf/traits.rs index ee92ca2d..968e128e 100644 --- a/akd/src/ecvrf/traits.rs +++ b/akd/src/ecvrf/traits.rs @@ -69,7 +69,7 @@ pub trait VRFKeyStorage: Clone + Sync + Send { version: u64, ) -> Result { let key = self.get_vrf_private_key().await?; - let name_hash_bytes = H::hash(uname.0.as_bytes()); + let name_hash_bytes = H::hash(uname); let stale_bytes = if stale { &[0u8] } else { &[1u8] }; let hashed_label = H::merge(&[ diff --git a/akd/src/history_tree_node.rs b/akd/src/history_tree_node.rs index 78dcf8f6..247dc854 100644 --- a/akd/src/history_tree_node.rs +++ b/akd/src/history_tree_node.rs @@ -136,10 +136,10 @@ impl HistoryTreeNode { pub(crate) async fn get_from_storage( storage: &S, - key: NodeKey, + key: &NodeKey, current_epoch: u64, ) -> Result { - match storage.get::(key.clone()).await? { + match storage.get::(key).await? { DbRecord::HistoryTreeNode(node) => { // Resets a node's last_epoch value if the node in storage is ahead of the current // directory epoch. This could happen when a separate AKD process is in the middle @@ -160,11 +160,10 @@ impl HistoryTreeNode { pub(crate) async fn batch_get_from_storage( storage: &S, - keys: Vec, + keys: &[NodeKey], current_epoch: u64, ) -> Result, StorageError> { - let node_records: Vec = - storage.batch_get::(keys.clone()).await?; + let node_records: Vec = storage.batch_get::(keys).await?; let mut nodes = Vec::::new(); for (i, node) in node_records.into_iter().enumerate() { if let DbRecord::HistoryTreeNode(node) = node { @@ -247,12 +246,12 @@ impl HistoryTreeNode { if hashing { new_leaf.update_hash::<_, H>(storage, epoch).await?; let mut new_self: HistoryTreeNode = - HistoryTreeNode::get_from_storage(storage, NodeKey(self.label), epoch) + HistoryTreeNode::get_from_storage(storage, &NodeKey(self.label), epoch) .await?; new_self.update_hash::<_, H>(storage, epoch).await?; *self = new_self; } else { - *self = HistoryTreeNode::get_from_storage(storage, NodeKey(self.label), epoch) + *self = HistoryTreeNode::get_from_storage(storage, &NodeKey(self.label), epoch) .await?; } @@ -269,7 +268,8 @@ impl HistoryTreeNode { // in the tree and replaced with a new node whose label is equal to the longest common prefix. debug!("BEGIN get parent"); let mut parent = - HistoryTreeNode::get_from_storage(storage, NodeKey(self.parent), epoch).await?; + HistoryTreeNode::get_from_storage(storage, &NodeKey(self.parent), epoch) + .await?; debug!("BEGIN get direction at epoch {}", epoch); let self_dir_in_parent = parent.get_direction_at_ep(storage, self, epoch).await?; @@ -310,7 +310,7 @@ impl HistoryTreeNode { new_leaf.update_hash::<_, H>(storage, epoch).await?; self.update_hash::<_, H>(storage, epoch).await?; new_node = - HistoryTreeNode::get_from_storage(storage, NodeKey(new_node.label), epoch) + HistoryTreeNode::get_from_storage(storage, &NodeKey(new_node.label), epoch) .await?; new_node.update_hash::<_, H>(storage, epoch).await?; } @@ -320,7 +320,7 @@ impl HistoryTreeNode { parent.write_to_storage(storage).await?; debug!("BEGIN retrieve new self"); *self = - HistoryTreeNode::get_from_storage(storage, NodeKey(self.label), epoch).await?; + HistoryTreeNode::get_from_storage(storage, &NodeKey(self.label), epoch).await?; debug!("END insert single leaf (dir_self = Some)"); Ok(()) } @@ -339,7 +339,7 @@ impl HistoryTreeNode { debug!("BEGIN get child node from storage"); let mut child_node = - HistoryTreeNode::get_from_storage(storage, NodeKey(child_st.label), epoch) + HistoryTreeNode::get_from_storage(storage, &NodeKey(child_st.label), epoch) .await?; debug!("BEGIN insert single leaf helper"); child_node @@ -347,13 +347,13 @@ impl HistoryTreeNode { .await?; if hashing { debug!("BEGIN update hashes"); - *self = HistoryTreeNode::get_from_storage(storage, NodeKey(self.label), epoch) + *self = HistoryTreeNode::get_from_storage(storage, &NodeKey(self.label), epoch) .await?; self.update_hash::<_, H>(storage, epoch).await?; self.write_to_storage(storage).await?; } else { debug!("BEGIN retrieve self"); - *self = HistoryTreeNode::get_from_storage(storage, NodeKey(self.label), epoch) + *self = HistoryTreeNode::get_from_storage(storage, &NodeKey(self.label), epoch) .await?; } debug!("END insert single leaf (dir_self = None)"); @@ -432,7 +432,7 @@ impl HistoryTreeNode { } let parent = - &mut HistoryTreeNode::get_from_storage(storage, NodeKey(self.parent), epoch).await?; + &mut HistoryTreeNode::get_from_storage(storage, &NodeKey(self.parent), epoch).await?; if parent.get_latest_epoch() < epoch { let (_, dir_self, _) = parent.label.get_longest_common_prefix_and_dirs(self.label); parent @@ -440,7 +440,7 @@ impl HistoryTreeNode { .await?; parent.write_to_storage(storage).await?; *parent = - HistoryTreeNode::get_from_storage(storage, NodeKey(self.parent), epoch).await?; + HistoryTreeNode::get_from_storage(storage, &NodeKey(self.parent), epoch).await?; } match get_state_map(storage, parent, epoch).await { @@ -827,7 +827,7 @@ pub async fn get_empty_root( pub async fn get_leaf_node( storage: &S, label: NodeLabel, - value: &[u8], + value: &H::Digest, parent: NodeLabel, birth_epoch: u64, ) -> Result { @@ -841,7 +841,7 @@ pub async fn get_leaf_node( let mut new_state: HistoryNodeState = HistoryNodeState::new::(NodeStateKey(node.label, birth_epoch)); - new_state.value = from_digest::(H::merge(&[H::hash(&EMPTY_VALUE), H::hash(value)])); + new_state.value = from_digest::(H::merge(&[H::hash(&EMPTY_VALUE), *value])); set_state_map(storage, new_state).await?; @@ -884,7 +884,7 @@ pub(crate) async fn get_state_map( key: u64, ) -> Result { let state_key = get_state_map_key(node, key); - if let Ok(DbRecord::HistoryNodeState(state)) = storage.get::(state_key).await + if let Ok(DbRecord::HistoryNodeState(state)) = storage.get::(&state_key).await { Ok(state) } else { diff --git a/akd/src/lib.rs b/akd/src/lib.rs index 0c5bc9f0..50df4343 100644 --- a/akd/src/lib.rs +++ b/akd/src/lib.rs @@ -51,7 +51,7 @@ //! ## Adding key-value pairs to the akd //! To add key-value pairs to the akd, we assume that the types of keys and the corresponding values are String. //! After adding key-value pairs to the akd's data structure, it also needs to be committed. To do this, after running the setup, as in the previous step, -//! we use the `publish` function of an akd. The argument of publish is a vector of tuples of type (AkdLabel(String), AkdValue(String)). See below for example usage. +//! we use the `publish` function of an akd. The argument of publish is a vector of tuples of type (AkdLabel::from_utf8_str(String), AkdValue::from_utf8_str(String)). See below for example usage. //! ``` //! use winter_crypto::Hasher; //! use winter_crypto::hashers::Blake3_256; @@ -69,8 +69,8 @@ //! let vrf = HardCodedAkdVRF{}; //! let mut akd = Directory::<_, HardCodedAkdVRF>::new::>(&db, &vrf, false).await.unwrap(); //! // commit the latest changes -//! akd.publish::>(vec![(AkdLabel("hello".to_string()), AkdValue("world".to_string())), -//! (AkdLabel("hello2".to_string()), AkdValue("world2".to_string())),], false) +//! akd.publish::>(vec![(AkdLabel::from_utf8_str("hello"), AkdValue::from_utf8_str("world")), +//! (AkdLabel::from_utf8_str("hello2"), AkdValue::from_utf8_str("world2")),]) //! .await; //! }; //! ``` @@ -95,11 +95,11 @@ //! async { //! let vrf = HardCodedAkdVRF{}; //! let mut akd = Directory::<_, HardCodedAkdVRF>::new::>(&db, &vrf, false).await.unwrap(); -//! akd.publish::>(vec![(AkdLabel("hello".to_string()), AkdValue("world".to_string())), -//! (AkdLabel("hello2".to_string()), AkdValue("world2".to_string())),], false) +//! akd.publish::>(vec![(AkdLabel::from_utf8_str("hello"), AkdValue::from_utf8_str("world")), +//! (AkdLabel::from_utf8_str("hello2"), AkdValue::from_utf8_str("world2")),]) //! .await.unwrap(); //! // Generate latest proof -//! let lookup_proof = akd.lookup::>(AkdLabel("hello".to_string())).await; +//! let lookup_proof = akd.lookup::>(AkdLabel::from_utf8_str("hello")).await; //! }; //! ``` //! ## Verifying a lookup proof @@ -121,11 +121,11 @@ //! async { //! let vrf = HardCodedAkdVRF{}; //! let mut akd = Directory::<_, HardCodedAkdVRF>::new::>(&db, &vrf, false).await.unwrap(); -//! akd.publish::>(vec![(AkdLabel("hello".to_string()), AkdValue("world".to_string())), -//! (AkdLabel("hello2".to_string()), AkdValue("world2".to_string())),], false) +//! akd.publish::>(vec![(AkdLabel::from_utf8_str("hello"), AkdValue::from_utf8_str("world")), +//! (AkdLabel::from_utf8_str("hello2"), AkdValue::from_utf8_str("world2")),]) //! .await.unwrap(); //! // Generate latest proof -//! let lookup_proof = akd.lookup::>(AkdLabel("hello".to_string())).await.unwrap(); +//! let lookup_proof = akd.lookup::>(AkdLabel::from_utf8_str("hello")).await.unwrap(); //! let current_azks = akd.retrieve_current_azks().await.unwrap(); //! // Get the latest commitment, i.e. azks root hash //! let root_hash = akd.get_root_hash::>(¤t_azks).await.unwrap(); @@ -134,7 +134,7 @@ //! client::lookup_verify::>( //! &vrf_pk, //! root_hash, -//! AkdLabel("hello".to_string()), +//! AkdLabel::from_utf8_str("hello"), //! lookup_proof, //! ).unwrap(); //! }; @@ -161,11 +161,11 @@ //! async { //! let vrf = HardCodedAkdVRF{}; //! let mut akd = Directory::<_, HardCodedAkdVRF>::new::>(&db, &vrf, false).await.unwrap(); -//! akd.publish::>(vec![(AkdLabel("hello".to_string()), AkdValue("world".to_string())), -//! (AkdLabel("hello2".to_string()), AkdValue("world2".to_string())),], false) +//! akd.publish::>(vec![(AkdLabel::from_utf8_str("hello"), AkdValue::from_utf8_str("world")), +//! (AkdLabel::from_utf8_str("hello2"), AkdValue::from_utf8_str("world2")),]) //! .await.unwrap(); //! // Generate latest proof -//! let history_proof = akd.key_history::>(&AkdLabel("hello".to_string())).await; +//! let history_proof = akd.key_history::>(&AkdLabel::from_utf8_str("hello")).await; //! }; //! ``` //! ## Verifying a key history proof @@ -186,11 +186,11 @@ //! async { //! let vrf = HardCodedAkdVRF{}; //! let mut akd = Directory::<_, HardCodedAkdVRF>::new::>(&db, &vrf, false).await.unwrap(); -//! akd.publish::>(vec![(AkdLabel("hello".to_string()), AkdValue("world".to_string())), -//! (AkdLabel("hello2".to_string()), AkdValue("world2".to_string())),], false) +//! akd.publish::>(vec![(AkdLabel::from_utf8_str("hello"), AkdValue::from_utf8_str("world")), +//! (AkdLabel::from_utf8_str("hello2"), AkdValue::from_utf8_str("world2")),]) //! .await.unwrap(); //! // Generate latest proof -//! let history_proof = akd.key_history::>(&AkdLabel("hello".to_string())).await.unwrap(); +//! let history_proof = akd.key_history::>(&AkdLabel::from_utf8_str("hello")).await.unwrap(); //! let current_azks = akd.retrieve_current_azks().await.unwrap(); //! // Get the azks root hashes at the required epochs //! let (root_hashes, previous_root_hashes) = akd::directory::get_key_history_hashes::<_, Blake3_256, HardCodedAkdVRF>(&akd, &history_proof).await.unwrap(); @@ -199,8 +199,9 @@ //! &vrf_pk, //! root_hashes, //! previous_root_hashes, -//! AkdLabel("hello".to_string()), +//! AkdLabel::from_utf8_str("hello"), //! history_proof, +//! false, //! ).unwrap(); //! }; //! ``` @@ -225,12 +226,12 @@ //! let vrf = HardCodedAkdVRF{}; //! let mut akd = Directory::<_, HardCodedAkdVRF>::new::>(&db, &vrf, false).await.unwrap(); //! // Commit to the first epoch -//! akd.publish::>(vec![(AkdLabel("hello".to_string()), AkdValue("world".to_string())), -//! (AkdLabel("hello2".to_string()), AkdValue("world2".to_string())),], false) +//! akd.publish::>(vec![(AkdLabel::from_utf8_str("hello"), AkdValue::from_utf8_str("world")), +//! (AkdLabel::from_utf8_str("hello2"), AkdValue::from_utf8_str("world2")),]) //! .await.unwrap(); //! // Commit to the second epoch -//! akd.publish::>(vec![(AkdLabel("hello3".to_string()), AkdValue("world3".to_string())), -//! (AkdLabel("hello4".to_string()), AkdValue("world4".to_string())),], false) +//! akd.publish::>(vec![(AkdLabel::from_utf8_str("hello3"), AkdValue::from_utf8_str("world3")), +//! (AkdLabel::from_utf8_str("hello4"), AkdValue::from_utf8_str("world4")),]) //! .await.unwrap(); //! // Generate audit proof for the evolution from epoch 1 to epoch 2. //! let audit_proof = akd.audit::>(1u64, 2u64).await.unwrap(); @@ -256,12 +257,12 @@ //! let vrf = HardCodedAkdVRF{}; //! let mut akd = Directory::<_, HardCodedAkdVRF>::new::>(&db, &vrf, false).await.unwrap(); //! // Commit to the first epoch -//! akd.publish::>(vec![(AkdLabel("hello".to_string()), AkdValue("world".to_string())), -//! (AkdLabel("hello2".to_string()), AkdValue("world2".to_string())),], false) +//! akd.publish::>(vec![(AkdLabel::from_utf8_str("hello"), AkdValue::from_utf8_str("world")), +//! (AkdLabel::from_utf8_str("hello2"), AkdValue::from_utf8_str("world2")),]) //! .await.unwrap(); //! // Commit to the second epoch -//! akd.publish::>(vec![(AkdLabel("hello3".to_string()), AkdValue("world3".to_string())), -//! (AkdLabel("hello4".to_string()), AkdValue("world4".to_string())),], false) +//! akd.publish::>(vec![(AkdLabel::from_utf8_str("hello3"), AkdValue::from_utf8_str("world3")), +//! (AkdLabel::from_utf8_str("hello4"), AkdValue::from_utf8_str("world4")),]) //! .await.unwrap(); //! // Generate audit proof for the evolution from epoch 1 to epoch 2. //! let audit_proof = akd.audit::>(1u64, 2u64).await.unwrap(); @@ -311,20 +312,32 @@ #[cfg(feature = "rand")] extern crate rand; +// Due to the amount of types an implementing storage layer needs to access, +// it's quite unreasonable to expose them all at the crate root, and a storage +// implementer will simply need to import the necessary inner types which are +// a dependency of ths [`Storage`] trait anyways + pub mod append_only_zks; +pub mod auditor; +pub mod client; pub mod directory; +pub mod ecvrf; +pub mod errors; pub mod history_tree_node; pub mod node_state; pub mod proof_structs; pub mod serialization; pub mod storage; + mod utils; -pub mod auditor; -pub mod client; -pub mod ecvrf; -pub mod errors; +// ========== Type re-exports which are commonly used ========== // +pub use append_only_zks::Azks; +pub use directory::{Directory, EpochHash}; +pub use node_state::{Node, NodeLabel}; +pub use storage::types::{AkdLabel, AkdValue}; +// ========== Constants and type aliases ========== // #[cfg(any(test, feature = "public-tests"))] pub mod test_utils; #[cfg(test)] @@ -343,6 +356,12 @@ pub const EMPTY_LABEL: crate::node_state::NodeLabel = crate::node_state::NodeLab val: [1u8; 32], len: 0, }; +/// A "tombstone" is a false value in an AKD ValueState denoting that a real value has been removed (e.g. data rentention policies). +/// Should a tombstone be encountered, we have to assume that the hash of the value is correct, and we move forward without being able to +/// verify the raw value. We utilize an empty array to save space in the storage layer +/// +/// See [GitHub issue #130](https://github.com/novifinancial/akd/issues/130) for more context +pub const TOMBSTONE: &[u8] = &[]; /// This type is used to indicate a direction for a /// particular node relative to its parent. diff --git a/akd/src/serialization.rs b/akd/src/serialization.rs index 97fc165b..3187bb71 100644 --- a/akd/src/serialization.rs +++ b/akd/src/serialization.rs @@ -93,21 +93,21 @@ mod tests { let akd = Directory::<_, _>::new::>(&db, &vrf, false) .await .unwrap(); - akd.publish::>( - vec![ - (AkdLabel("hello".to_string()), AkdValue("world".to_string())), - ( - AkdLabel("hello2".to_string()), - AkdValue("world2".to_string()), - ), - ], - false, - ) + akd.publish::>(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2"), + ), + ]) .await .unwrap(); // Generate latest proof let lookup_proof = akd - .lookup::>(AkdLabel("hello".to_string())) + .lookup::>(AkdLabel::from_utf8_str("hello")) .await .unwrap(); @@ -126,21 +126,21 @@ mod tests { let akd = Directory::<_, _>::new::>(&db, &vrf, false) .await .unwrap(); - akd.publish::>( - vec![ - (AkdLabel("hello".to_string()), AkdValue("world".to_string())), - ( - AkdLabel("hello2".to_string()), - AkdValue("world2".to_string()), - ), - ], - false, - ) + akd.publish::>(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2"), + ), + ]) .await .unwrap(); // Generate latest proof let history_proof = akd - .key_history::>(&AkdLabel("hello".to_string())) + .key_history::>(&AkdLabel::from_utf8_str("hello")) .await .unwrap(); @@ -160,32 +160,29 @@ mod tests { .await .unwrap(); // Commit to the first epoch - akd.publish::>( - vec![ - (AkdLabel("hello".to_string()), AkdValue("world".to_string())), - ( - AkdLabel("hello2".to_string()), - AkdValue("world2".to_string()), - ), - ], - false, - ) + akd.publish::>(vec![ + ( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + ), + ( + AkdLabel::from_utf8_str("hello2"), + AkdValue::from_utf8_str("world2"), + ), + ]) .await .unwrap(); // Commit to the second epoch - akd.publish::>( - vec![ - ( - AkdLabel("hello3".to_string()), - AkdValue("world3".to_string()), - ), - ( - AkdLabel("hello4".to_string()), - AkdValue("world4".to_string()), - ), - ], - false, - ) + akd.publish::>(vec![ + ( + AkdLabel::from_utf8_str("hello3"), + AkdValue::from_utf8_str("world3"), + ), + ( + AkdLabel::from_utf8_str("hello4"), + AkdValue::from_utf8_str("world4"), + ), + ]) .await .unwrap(); // Generate audit proof for the evolution from epoch 1 to epoch 2. diff --git a/akd/src/storage/memory.rs b/akd/src/storage/memory.rs index 12c04790..273d5a84 100644 --- a/akd/src/storage/memory.rs +++ b/akd/src/storage/memory.rs @@ -20,13 +20,17 @@ use log::{debug, error, info, trace, warn}; use std::collections::HashMap; use std::sync::Arc; +type Epoch = u64; +type UserValueMap = HashMap; +type UserStates = HashMap, UserValueMap>; + // ===== Basic In-Memory database ==== // /// This struct represents a basic in-memory database. #[derive(Debug)] pub struct AsyncInMemoryDatabase { db: Arc, DbRecord>>>, - user_info: Arc>>>, + user_info: Arc>, trans: Transaction, } @@ -100,12 +104,20 @@ impl Storage for AsyncInMemoryDatabase { } if let DbRecord::ValueState(value_state) = &record { - let mut guard = self.user_info.write().await; - let username = value_state.username.0.clone(); - guard - .entry(username) - .or_insert_with(Vec::new) - .push(value_state.clone()); + let mut u_guard = self.user_info.write().await; + let username = value_state.username.to_vec(); + match u_guard.get(&username) { + Some(old_states) => { + let mut new_states = old_states.clone(); + new_states.insert(value_state.epoch, value_state.clone()); + u_guard.insert(username, new_states); + } + None => { + let mut new_map = HashMap::new(); + new_map.insert(value_state.epoch, value_state.clone()); + u_guard.insert(username, new_map); + } + } } else { let mut guard = self.db.write().await; guard.insert(record.get_full_binary_id(), record); @@ -120,11 +132,19 @@ impl Storage for AsyncInMemoryDatabase { for record in records.into_iter() { if let DbRecord::ValueState(value_state) = &record { - let username = value_state.username.0.clone(); - u_guard - .entry(username) - .or_insert_with(Vec::new) - .push(value_state.clone()); + let username = value_state.username.to_vec(); + match u_guard.get(&username) { + Some(old_states) => { + let mut new_states = old_states.clone(); + new_states.insert(value_state.epoch, value_state.clone()); + u_guard.insert(username, new_states); + } + None => { + let mut new_map = HashMap::new(); + new_map.insert(value_state.epoch, value_state.clone()); + u_guard.insert(username, new_map); + } + } } else { guard.insert(record.get_full_binary_id(), record); } @@ -133,9 +153,9 @@ impl Storage for AsyncInMemoryDatabase { } /// Retrieve a stored record from the data layer - async fn get(&self, id: St::Key) -> Result { + async fn get(&self, id: &St::Key) -> Result { if self.is_transaction_active().await { - if let Some(result) = self.trans.get::(&id).await { + if let Some(result) = self.trans.get::(id).await { // there's a transacted item, return that one since it's "more up to date" return Ok(result); } @@ -144,15 +164,15 @@ impl Storage for AsyncInMemoryDatabase { } /// Retrieve a record from the data layer, ignoring any caching or transaction pending - async fn get_direct(&self, id: St::Key) -> Result { - let bin_id = St::get_full_binary_key_id(&id); + async fn get_direct(&self, id: &St::Key) -> Result { + let bin_id = St::get_full_binary_key_id(id); // if the request is for a value state, look in the value state set if St::data_type() == StorageType::ValueState { if let Ok(ValueStateKey(username, epoch)) = ValueState::key_from_full_binary(&bin_id) { let u_guard = self.user_info.read().await; if let Some(state) = (*u_guard).get(&username).cloned() { - if let Some(item) = state.iter().find(|&x| x.epoch == epoch) { - return Ok(DbRecord::ValueState(item.clone())); + if let Some(found) = state.get(&epoch) { + return Ok(DbRecord::ValueState(found.clone())); } } return Err(StorageError::NotFound(format!("ValueState {:?}", id))); @@ -179,10 +199,10 @@ impl Storage for AsyncInMemoryDatabase { /// Retrieve a batch of records by id async fn batch_get( &self, - ids: Vec, + ids: &[St::Key], ) -> Result, StorageError> { let mut map = Vec::new(); - for key in ids.into_iter() { + for key in ids.iter() { if let Ok(result) = self.get::(key).await { map.push(result); } @@ -191,11 +211,40 @@ impl Storage for AsyncInMemoryDatabase { Ok(map) } + async fn tombstone_value_states(&self, keys: &[ValueStateKey]) -> Result<(), StorageError> { + if keys.is_empty() { + return Ok(()); + } + + let data = self.batch_get::(keys).await?; + let mut new_data = vec![]; + for record in data { + if let DbRecord::ValueState(value_state) = record { + debug!( + "Tombstoning 0x{}", + hex::encode(value_state.username.to_vec()) + ); + + new_data.push(DbRecord::ValueState(ValueState { + plaintext_val: crate::AkdValue(crate::TOMBSTONE.to_vec()), + ..value_state + })); + } + } + + if !new_data.is_empty() { + debug!("Tombstoning {} entries", new_data.len()); + self.batch_set(new_data).await?; + } + + Ok(()) + } + /// Retrieve the user data for a given user async fn get_user_data(&self, username: &AkdLabel) -> Result { let guard = self.user_info.read().await; if let Some(result) = guard.get(&username.0) { - let mut results: Vec = result.to_vec(); + let mut results: Vec = result.values().cloned().collect::>(); // return ordered by epoch (from smallest -> largest) results.sort_by(|a, b| a.epoch.cmp(&b.epoch)); @@ -279,7 +328,7 @@ impl Storage for AsyncInMemoryDatabase { let mut map = HashMap::new(); for username in keys.iter() { if let Ok(result) = self.get_user_state(username, flag).await { - map.insert(AkdLabel(result.username.0.clone()), result.version); + map.insert(AkdLabel(result.username.to_vec()), result.version); } } Ok(map) @@ -294,7 +343,7 @@ impl Storage for AsyncInMemoryDatabase { .map(|epoch| crate::node_state::NodeStateKey(node_label, epoch)) .collect::>(); let data = self - .batch_get::(ids) + .batch_get::(&ids) .await?; let mut epochs = data .into_iter() @@ -334,7 +383,7 @@ pub struct AsyncInMemoryDbWithCache { cache: Arc, DbRecord>>>, stats: Arc>>, - user_info: Arc>>>, + user_info: Arc>, trans: Transaction, } @@ -479,12 +528,20 @@ impl Storage for AsyncInMemoryDbWithCache { } if let DbRecord::ValueState(value_state) = &record { - let mut guard = self.user_info.write().await; - let username = value_state.username.0.clone(); - guard - .entry(username) - .or_insert_with(Vec::new) - .push(value_state.clone()); + let mut u_guard = self.user_info.write().await; + let username = value_state.username.to_vec(); + match u_guard.get(&username) { + Some(old_states) => { + let mut new_states = old_states.clone(); + new_states.insert(value_state.epoch, value_state.clone()); + u_guard.insert(username, new_states); + } + None => { + let mut new_map = HashMap::new(); + new_map.insert(value_state.epoch, value_state.clone()); + u_guard.insert(username, new_map); + } + } } else { let mut stats = self.stats.write().await; let calls = stats.entry(String::from("calls_to_cache_set")).or_insert(0); @@ -505,11 +562,19 @@ impl Storage for AsyncInMemoryDbWithCache { for record in records.into_iter() { if let DbRecord::ValueState(value_state) = &record { - let username = value_state.username.0.clone(); - u_guard - .entry(username) - .or_insert_with(Vec::new) - .push(value_state.clone()); + let username = value_state.username.to_vec(); + match u_guard.get(&username) { + Some(old_states) => { + let mut new_states = old_states.clone(); + new_states.insert(value_state.epoch, value_state.clone()); + u_guard.insert(username, new_states); + } + None => { + let mut new_map = HashMap::new(); + new_map.insert(value_state.epoch, value_state.clone()); + u_guard.insert(username, new_map); + } + } } else { *calls += 1; guard.insert(record.get_full_binary_id(), record); @@ -518,9 +583,9 @@ impl Storage for AsyncInMemoryDbWithCache { Ok(()) } - async fn get(&self, id: St::Key) -> Result { + async fn get(&self, id: &St::Key) -> Result { if self.is_transaction_active().await { - if let Some(result) = self.trans.get::(&id).await { + if let Some(result) = self.trans.get::(id).await { // there's a transacted item, return that one since it's "more up to date" return Ok(result); } @@ -529,15 +594,15 @@ impl Storage for AsyncInMemoryDbWithCache { } /// Retrieve a record from the data layer, ignoring any caching or transaction pending - async fn get_direct(&self, id: St::Key) -> Result { - let bin_id = St::get_full_binary_key_id(&id); + async fn get_direct(&self, id: &St::Key) -> Result { + let bin_id = St::get_full_binary_key_id(id); // if the request is for a value state, look in the value state set if St::data_type() == StorageType::ValueState { if let Ok(ValueStateKey(username, epoch)) = ValueState::key_from_full_binary(&bin_id) { let u_guard = self.user_info.read().await; if let Some(state) = (*u_guard).get(&username).cloned() { - if let Some(item) = state.iter().find(|&x| x.epoch == epoch) { - return Ok(DbRecord::ValueState(item.clone())); + if let Some(found) = state.get(&epoch) { + return Ok(DbRecord::ValueState(found.clone())); } } return Err(StorageError::NotFound(format!("ValueState {:?}", id))); @@ -569,17 +634,16 @@ impl Storage for AsyncInMemoryDbWithCache { } } - /// Flush the caching of objects (if present) async fn flush_cache(&self) { // no-op } async fn batch_get( &self, - ids: Vec, + ids: &[St::Key], ) -> Result, StorageError> { let mut map = Vec::new(); - for key in ids.into_iter() { + for key in ids.iter() { if let Ok(result) = self.get::(key).await { map.push(result); } @@ -588,10 +652,33 @@ impl Storage for AsyncInMemoryDbWithCache { Ok(map) } + async fn tombstone_value_states(&self, keys: &[ValueStateKey]) -> Result<(), StorageError> { + if keys.is_empty() { + return Ok(()); + } + + let data = self.batch_get::(keys).await?; + let mut new_data = vec![]; + for record in data { + if let DbRecord::ValueState(value_state) = record { + new_data.push(DbRecord::ValueState(ValueState { + plaintext_val: crate::AkdValue(crate::TOMBSTONE.to_vec()), + ..value_state + })); + } + } + + if !new_data.is_empty() { + self.batch_set(new_data).await?; + } + + Ok(()) + } + async fn get_user_data(&self, username: &AkdLabel) -> Result { let guard = self.user_info.read().await; if let Some(result) = guard.get(&username.0) { - let mut results: Vec = result.to_vec(); + let mut results: Vec = result.values().cloned().collect(); // return ordered by epoch (from smallest -> largest) results.sort_by(|a, b| a.epoch.cmp(&b.epoch)); @@ -674,7 +761,7 @@ impl Storage for AsyncInMemoryDbWithCache { let mut map = HashMap::new(); for username in keys.iter() { if let Ok(result) = self.get_user_state(username, flag).await { - map.insert(AkdLabel(result.username.0.clone()), result.version); + map.insert(AkdLabel(result.username.to_vec()), result.version); } } Ok(map) @@ -689,7 +776,7 @@ impl Storage for AsyncInMemoryDbWithCache { .map(|epoch| crate::node_state::NodeStateKey(node_label, epoch)) .collect::>(); let data = self - .batch_get::(ids) + .batch_get::(&ids) .await?; let mut epochs = data .into_iter() diff --git a/akd/src/storage/mod.rs b/akd/src/storage/mod.rs index 53e7d64b..e669d130 100644 --- a/akd/src/storage/mod.rs +++ b/akd/src/storage/mod.rs @@ -104,14 +104,21 @@ pub trait Storage: Clone { async fn batch_set(&self, records: Vec) -> Result<(), StorageError>; /// Retrieve a stored record from the data layer - async fn get(&self, id: St::Key) -> Result; + async fn get(&self, id: &St::Key) -> Result; /// Retrieve a record from the data layer, ignoring any caching or transaction pending - async fn get_direct(&self, id: St::Key) -> Result; + async fn get_direct(&self, id: &St::Key) -> Result; /// Flush the caching of objects (if present) async fn flush_cache(&self); + /// Convert the given value state's into tombstones, replacing the plaintext value with + /// the tombstone key array + async fn tombstone_value_states( + &self, + keys: &[types::ValueStateKey], + ) -> Result<(), StorageError>; + /// Retrieve the last epoch <= ```epoch_in_question``` where the node with ```node_key``` /// was edited async fn get_epoch_lte_epoch( @@ -121,10 +128,8 @@ pub trait Storage: Clone { ) -> Result; /// Retrieve a batch of records by id - async fn batch_get( - &self, - ids: Vec, - ) -> Result, StorageError>; + async fn batch_get(&self, ids: &[St::Key]) + -> Result, StorageError>; /* User data searching */ diff --git a/akd/src/storage/tests.rs b/akd/src/storage/tests.rs index 5bb4d605..3908e1ca 100644 --- a/akd/src/storage/tests.rs +++ b/akd/src/storage/tests.rs @@ -51,6 +51,7 @@ pub async fn run_test_cases_for_storage_impl(db: &S) { test_user_data(db).await; test_transactions(db).await; test_batch_get_items(db).await; + test_tombstoning_data(db).await.unwrap(); } // *** New Test Helper Functions *** // @@ -65,7 +66,7 @@ async fn test_get_and_set_item(storage: &Ns) { assert_eq!(Ok(()), set_result); let get_result = storage - .get::(crate::append_only_zks::DEFAULT_AZKS_KEY) + .get::(&crate::append_only_zks::DEFAULT_AZKS_KEY) .await; if let Ok(DbRecord::Azks(got_azks)) = get_result { assert_eq!(got_azks.latest_epoch, azks.latest_epoch); @@ -95,7 +96,7 @@ async fn test_get_and_set_item(storage: &Ns) { let set_result = storage.set(DbRecord::HistoryTreeNode(node2.clone())).await; assert_eq!(Ok(()), set_result); - let get_result = storage.get::(key).await; + let get_result = storage.get::(&key).await; if let Ok(DbRecord::HistoryTreeNode(got_node)) = get_result { assert_eq!(got_node.label, node.label); assert_eq!(got_node.parent, node.parent); @@ -106,7 +107,7 @@ async fn test_get_and_set_item(storage: &Ns) { panic!("Failed to retrieve History Tree Node"); } - let get_result = storage.get::(key2).await; + let get_result = storage.get::(&key2).await; if let Err(err) = get_result { panic!("Failed to retrieve history tree node (2) {:?}", err) } @@ -123,7 +124,7 @@ async fn test_get_and_set_item(storage: &Ns) { .await; assert_eq!(Ok(()), set_result); - let get_result = storage.get::(key).await; + let get_result = storage.get::(&key).await; if let Ok(DbRecord::HistoryNodeState(got_state)) = get_result { assert_eq!(got_state.value, node_state.value); assert_eq!(got_state.child_states, node_state.child_states); @@ -133,18 +134,18 @@ async fn test_get_and_set_item(storage: &Ns) { } // === ValueState storage === // - let key = ValueStateKey("test".to_string(), 1); + let key = ValueStateKey("test".as_bytes().to_vec(), 1); let value = ValueState { - username: AkdLabel("test".to_string()), + username: AkdLabel::from_utf8_str("test"), epoch: 1, label: NodeLabel::new(byte_arr_from_u64(1), 1), version: 1, - plaintext_val: AkdValue("abc123".to_string()), + plaintext_val: AkdValue::from_utf8_str("abc123"), }; let set_result = storage.set(DbRecord::ValueState(value.clone())).await; assert_eq!(Ok(()), set_result); - let get_result = storage.get::(key).await; + let get_result = storage.get::(&key).await; if let Ok(DbRecord::ValueState(got_state)) = get_result { assert_eq!(got_state.username, value.username); assert_eq!(got_state.epoch, value.epoch); @@ -157,15 +158,14 @@ async fn test_get_and_set_item(storage: &Ns) { } async fn test_batch_get_items(storage: &Ns) { - let mut rand_users: Vec = vec![]; + let mut rand_users: Vec> = vec![]; for _ in 0..20 { - rand_users.push( - thread_rng() - .sample_iter(&Alphanumeric) - .take(30) - .map(char::from) - .collect(), - ); + let str: String = thread_rng() + .sample_iter(&Alphanumeric) + .take(30) + .map(char::from) + .collect(); + rand_users.push(str.as_bytes().to_vec()); } let mut data = Vec::new(); @@ -192,7 +192,7 @@ async fn test_batch_get_items(storage: &Ns) { let toc: Duration = Instant::now() - tic; println!("Storage batch op: {} ms", toc.as_millis()); let got = storage - .get::(ValueStateKey(rand_users[0].clone(), 10)) + .get::(&ValueStateKey(rand_users[0].clone(), 10)) .await; if got.is_err() { panic!("Failed to retrieve a user after batch insert"); @@ -202,7 +202,7 @@ async fn test_batch_get_items(storage: &Ns) { .iter() .map(|user| ValueStateKey(user.clone(), 1)) .collect(); - let got_all = storage.batch_get::(keys).await; + let got_all = storage.batch_get::(&keys).await; match got_all { Err(_) => panic!("Failed to retrieve batch of user at specific epochs"), Ok(lst) if lst.len() != rand_users.len() => { @@ -321,15 +321,14 @@ async fn test_batch_get_items(storage: &Ns) { } async fn test_transactions(storage: &S) { - let mut rand_users: Vec = vec![]; + let mut rand_users: Vec> = vec![]; for _ in 0..20 { - rand_users.push( - thread_rng() - .sample_iter(&Alphanumeric) - .take(30) - .map(char::from) - .collect(), - ); + let str: String = thread_rng() + .sample_iter(&Alphanumeric) + .take(30) + .map(char::from) + .collect(); + rand_users.push(str.as_bytes().to_vec()); } let mut data = Vec::new(); @@ -370,7 +369,7 @@ async fn test_transactions(storage: &S) { let toc: Duration = Instant::now() - tic; println!("Storage batch op: {} ms", toc.as_millis()); let got = storage - .get::(ValueStateKey(rand_users[0].clone(), 10)) + .get::(&ValueStateKey(rand_users[0].clone(), 10)) .await; if got.is_err() { panic!("Failed to retrieve a user after batch insert"); @@ -384,7 +383,7 @@ async fn test_transactions(storage: &S) { println!("Transactional storage batch op: {} ms", toc.as_millis()); let got = storage - .get::(ValueStateKey(rand_users[0].clone(), 10 + 10000)) + .get::(&ValueStateKey(rand_users[0].clone(), 10 + 10000)) .await; if got.is_err() { panic!("Failed to retrieve a user after batch insert"); @@ -392,16 +391,20 @@ async fn test_transactions(storage: &S) { } async fn test_user_data(storage: &S) { - let rand_user: String = thread_rng() + let rand_user = thread_rng() .sample_iter(&Alphanumeric) .take(30) .map(char::from) - .collect(); - let rand_value: String = thread_rng() + .collect::() + .as_bytes() + .to_vec(); + let rand_value = thread_rng() .sample_iter(&Alphanumeric) .take(1028) .map(char::from) - .collect(); + .collect::() + .as_bytes() + .to_vec(); let mut sample_state = ValueState { plaintext_val: AkdValue(rand_value.clone()), version: 1u64, @@ -413,7 +416,7 @@ async fn test_user_data(storage: &S) { username: AkdLabel(rand_user), }; let mut sample_state_2 = sample_state.clone(); - sample_state_2.username = AkdLabel("test_user".to_string()); + sample_state_2.username = AkdLabel::from_utf8_str("test_user"); let result = storage .set(DbRecord::ValueState(sample_state.clone())) @@ -489,7 +492,7 @@ async fn test_user_data(storage: &S) { ); let specifc_result = storage - .get::(ValueStateKey(sample_state.username.0.clone(), 123)) + .get::(&ValueStateKey(sample_state.username.to_vec(), 123)) .await; if let Ok(DbRecord::ValueState(state)) = specifc_result { assert_eq!( @@ -583,3 +586,93 @@ async fn test_user_data(storage: &S) { let data = storage.get_user_data(&sample_state_2.username).await; assert_eq!(4, data.unwrap().states.len()); } + +async fn test_tombstoning_data( + storage: &S, +) -> Result<(), crate::errors::AkdError> { + let rand_user = thread_rng() + .sample_iter(&Alphanumeric) + .take(30) + .map(char::from) + .collect::() + .as_bytes() + .to_vec(); + let rand_value = rand_user.clone(); + + let mut sample_state = ValueState { + plaintext_val: AkdValue(rand_value.clone()), + version: 1u64, + label: NodeLabel { + val: byte_arr_from_u64(1), + len: 1u32, + }, + epoch: 1u64, + username: AkdLabel(rand_user.clone()), + }; + let mut sample_state2 = sample_state.clone(); + sample_state2.username = AkdLabel::from_utf8_str("tombstone_test_user"); + + // Load up a bunch of data into the storage layer + for i in 0..5 { + sample_state.version = i; + sample_state.epoch = i; + sample_state2.version = i; + sample_state2.epoch = i; + + assert_eq!( + Ok(()), + storage + .set(DbRecord::ValueState(sample_state.clone())) + .await + ); + assert_eq!( + Ok(()), + storage + .set(DbRecord::ValueState(sample_state2.clone())) + .await + ); + } + + let data = storage.get_user_data(&sample_state.username).await.unwrap(); + assert_eq!(5, data.states.len()); + let data = storage + .get_user_data(&sample_state2.username) + .await + .unwrap(); + assert_eq!(5, data.states.len()); + + let keys_to_tombstone = [ + ValueStateKey("tombstone_test_user".as_bytes().to_vec(), 0u64), + ValueStateKey("tombstone_test_user".as_bytes().to_vec(), 1u64), + ValueStateKey(sample_state.username.to_vec(), 0u64), + ValueStateKey(sample_state.username.to_vec(), 1u64), + ValueStateKey(sample_state.username.to_vec(), 2u64), + ]; + + // tombstone the given states + storage.tombstone_value_states(&keys_to_tombstone).await?; + + for label in [ + AkdLabel::from_utf8_str("tombstone_test_user"), + AkdLabel(rand_user), + ] { + for version in 0..5 { + let key = ValueStateKey(label.to_vec(), version); + let got = storage.get::(&key).await?; + + if let DbRecord::ValueState(value_state) = got { + assert_eq!(version, value_state.epoch); + if keys_to_tombstone.contains(&key) { + // should be a tombstone + assert_eq!(crate::TOMBSTONE.to_vec(), value_state.plaintext_val.0); + } else { + // should NOT be a tombstone + assert_ne!(crate::TOMBSTONE.to_vec(), value_state.plaintext_val.0); + } + } else { + panic!("Unable to retrieve value state {:?}", key); + } + } + } + Ok(()) +} diff --git a/akd/src/storage/transaction.rs b/akd/src/storage/transaction.rs index f4be91df..d67abd3e 100644 --- a/akd/src/storage/transaction.rs +++ b/akd/src/storage/transaction.rs @@ -214,18 +214,18 @@ mod tests { key: NodeStateKey(NodeLabel::new(byte_arr_from_u64(1), 1), 2), }); let value1 = DbRecord::ValueState(ValueState { - username: AkdLabel("test".to_string()), + username: AkdLabel::from_utf8_str("test"), epoch: 1, label: NodeLabel::new(byte_arr_from_u64(1), 1), version: 1, - plaintext_val: AkdValue("abc123".to_string()), + plaintext_val: AkdValue::from_utf8_str("abc123"), }); let value2 = DbRecord::ValueState(ValueState { - username: AkdLabel("test".to_string()), + username: AkdLabel::from_utf8_str("test"), epoch: 2, label: NodeLabel::new(byte_arr_from_u64(1), 1), version: 2, - plaintext_val: AkdValue("abc1234".to_string()), + plaintext_val: AkdValue::from_utf8_str("abc1234"), }); let records = vec![azks, node1, node2, node_state1, node_state2, value1, value2]; diff --git a/akd/src/storage/types.rs b/akd/src/storage/types.rs index 1fbe823f..91c89504 100644 --- a/akd/src/storage/types.rs +++ b/akd/src/storage/types.rs @@ -7,11 +7,10 @@ //! Various storage and representation related types -use crate::append_only_zks::Azks; use crate::history_tree_node::{HistoryTreeNode, NodeType}; -use crate::node_state::{HistoryChildState, HistoryNodeState, NodeLabel, NodeStateKey}; +use crate::node_state::{HistoryChildState, HistoryNodeState, NodeStateKey}; use crate::storage::Storable; -use crate::ARITY; +use crate::{Azks, NodeLabel, ARITY}; use std::convert::TryInto; /// Various elements that can be stored @@ -30,18 +29,45 @@ pub enum StorageType { /// The keys for this key-value store #[derive(Debug, Clone, Hash, Eq, PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] -pub struct AkdLabel(pub String); +pub struct AkdLabel(pub Vec); +impl AkdLabel { + /// Build an [`AkdLabel`] struct from an UTF8 string + pub fn from_utf8_str(value: &str) -> Self { + Self(value.as_bytes().to_vec()) + } +} + +impl core::ops::Deref for AkdLabel { + type Target = [u8]; + fn deref(&self) -> &Self::Target { + &self.0 + } +} /// The types of value used in the key-value pairs of a AKD #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] #[cfg_attr(feature = "serde", serde(bound = ""))] -pub struct AkdValue(pub String); +pub struct AkdValue(pub Vec); + +impl AkdValue { + /// Build an [`AkdValue`] struct from an UTF8 string + pub fn from_utf8_str(value: &str) -> Self { + Self(value.as_bytes().to_vec()) + } +} + +impl core::ops::Deref for AkdValue { + type Target = [u8]; + fn deref(&self) -> &Self::Target { + &self.0 + } +} /// State for a value at a given version for that key #[derive(Debug, Clone, Hash, Eq, PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] -pub struct ValueStateKey(pub String, pub u64); +pub struct ValueStateKey(pub Vec, pub u64); /// The state of the value for a given key, starting at a particular epoch. #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -68,13 +94,13 @@ impl crate::storage::Storable for ValueState { } fn get_id(&self) -> ValueStateKey { - ValueStateKey(self.username.0.clone(), self.epoch) + ValueStateKey(self.username.to_vec(), self.epoch) } fn get_full_binary_key_id(key: &ValueStateKey) -> Vec { let mut result = vec![StorageType::ValueState as u8]; result.extend_from_slice(&key.1.to_le_bytes()); - result.extend_from_slice(key.0.as_bytes()); + result.extend_from_slice(&key.0); result } @@ -85,11 +111,7 @@ impl crate::storage::Storable for ValueState { } let epoch_bytes: [u8; 8] = bin[1..=8].try_into().expect("Slice with incorrect length"); let epoch = u64::from_le_bytes(epoch_bytes); - if let Ok(username) = std::str::from_utf8(&bin[9..]) { - Ok(ValueStateKey(username.to_string(), epoch)) - } else { - Err("Invalid string format".to_string()) - } + Ok(ValueStateKey(bin[9..].to_vec(), epoch)) } } @@ -247,8 +269,8 @@ impl DbRecord { /// Build a user state from the properties pub fn build_user_state( - username: String, - plaintext_val: String, + username: Vec, + plaintext_val: Vec, version: u64, label_len: u32, label_val: [u8; 32], diff --git a/akd/src/tests.rs b/akd/src/tests.rs index b191a8b3..13af90b4 100644 --- a/akd/src/tests.rs +++ b/akd/src/tests.rs @@ -378,7 +378,7 @@ async fn test_insert_single_leaf_root() -> Result<(), AkdError> { let new_leaf = get_leaf_node::( &db, NodeLabel::new(byte_arr_from_u64(0b0u64), 1u32), - &[0u8], + &Blake3::hash(&[0u8]), NodeLabel::root(), 0, ) @@ -387,7 +387,7 @@ async fn test_insert_single_leaf_root() -> Result<(), AkdError> { let leaf_1 = get_leaf_node::( &db, NodeLabel::new(byte_arr_from_u64(0b1u64 << 63), 1u32), - &[1u8], + &Blake3::hash(&[1u8]), NodeLabel::root(), 0, ) @@ -434,7 +434,7 @@ async fn test_insert_single_leaf_below_root() -> Result<(), AkdError> { let new_leaf = get_leaf_node::( &db, NodeLabel::new(byte_arr_from_u64(0b00u64), 2u32), - &[0u8], + &Blake3::hash(&[0u8]), NodeLabel::root(), 1, ) @@ -443,7 +443,7 @@ async fn test_insert_single_leaf_below_root() -> Result<(), AkdError> { let leaf_1 = get_leaf_node::( &db, NodeLabel::new(byte_arr_from_u64(0b11u64 << 62), 2u32), - &[1u8], + &Blake3::hash(&[1u8]), NodeLabel::root(), 2, ) @@ -452,7 +452,7 @@ async fn test_insert_single_leaf_below_root() -> Result<(), AkdError> { let leaf_2 = get_leaf_node::( &db, NodeLabel::new(byte_arr_from_u64(0b10u64 << 62), 2u32), - &[1u8, 1u8], + &Blake3::hash(&[1u8, 1u8]), NodeLabel::root(), 3, ) @@ -519,7 +519,7 @@ async fn test_insert_single_leaf_below_root_both_sides() -> Result<(), AkdError> let new_leaf = get_leaf_node::( &db, NodeLabel::new(byte_arr_from_u64(0b000u64), 3u32), - &[0u8], + &Blake3::hash(&[0u8]), NodeLabel::root(), 0, ) @@ -528,7 +528,7 @@ async fn test_insert_single_leaf_below_root_both_sides() -> Result<(), AkdError> let leaf_1 = get_leaf_node::( &db, NodeLabel::new(byte_arr_from_u64(0b111u64 << 61), 3u32), - &[1u8], + &Blake3::hash(&[1u8]), NodeLabel::root(), 0, ) @@ -537,7 +537,7 @@ async fn test_insert_single_leaf_below_root_both_sides() -> Result<(), AkdError> let leaf_2 = get_leaf_node::( &db, NodeLabel::new(byte_arr_from_u64(0b100u64 << 61), 3u32), - &[1u8, 1u8], + &Blake3::hash(&[1u8, 1u8]), NodeLabel::root(), 0, ) @@ -546,7 +546,7 @@ async fn test_insert_single_leaf_below_root_both_sides() -> Result<(), AkdError> let leaf_3 = get_leaf_node::( &db, NodeLabel::new(byte_arr_from_u64(0b010u64 << 61), 3u32), - &[0u8, 1u8], + &Blake3::hash(&[0u8, 1u8]), NodeLabel::root(), 0, ) @@ -631,7 +631,7 @@ async fn test_insert_single_leaf_full_tree() -> Result<(), AkdError> { let new_leaf = get_leaf_node::( &db, NodeLabel::new(byte_arr_from_u64(leaf_u64), 3u32), - &leaf_u64.to_be_bytes(), + &Blake3::hash(&leaf_u64.to_be_bytes()), NodeLabel::root(), 7 - i, ) diff --git a/akd/src/utils.rs b/akd/src/utils.rs index 65f1be94..591fae43 100644 --- a/akd/src/utils.rs +++ b/akd/src/utils.rs @@ -52,3 +52,9 @@ pub(crate) fn empty_node_hash() -> H::Digest { pub(crate) fn empty_node_hash_no_label() -> H::Digest { H::hash(&EMPTY_VALUE) } + +// FIXME: Make a real commitment here, alongwith a blinding factor. See issue #123 +/// Gets the bytes for a value. +pub(crate) fn value_to_bytes(value: &crate::AkdValue) -> H::Digest { + H::hash(value) +} diff --git a/akd_client/src/tests.rs b/akd_client/src/tests.rs index c4b6e022..fb1a8a54 100644 --- a/akd_client/src/tests.rs +++ b/akd_client/src/tests.rs @@ -8,9 +8,6 @@ //! This crate contains the tests for the client library which make sure that the //! base AKD library and this "lean" client result in the same outputs -#[cfg(feature = "nostd")] -use crate::alloc::string::ToString; - use akd::ecvrf::HardCodedAkdVRF; #[cfg(feature = "nostd")] @@ -21,7 +18,8 @@ use alloc::vec; use alloc::vec::Vec; use akd::errors::{AkdError, StorageError}; -use akd::storage::types::{AkdLabel, AkdValue}; +use akd::storage::Storage; +use akd::{AkdLabel, AkdValue}; use crate::hash::DIGEST_BYTES; use winter_math::fields::f128::BaseElement; @@ -38,7 +36,7 @@ use winter_crypto::hashers::Sha3_256; type Hash = Sha3_256; type InMemoryDb = akd::storage::memory::AsyncInMemoryDatabase; -type Directory = akd::directory::Directory; +type Directory = akd::Directory; // =================================== // Test helpers @@ -91,7 +89,7 @@ fn convert_label(proof: akd::node_state::NodeLabel) -> crate::types::NodeLabel { } } -fn convert_node(node: akd::node_state::Node) -> crate::types::Node +fn convert_node(node: akd::Node) -> crate::types::Node where H: winter_crypto::Hasher, { @@ -102,9 +100,9 @@ where } fn convert_layer_proof( - parent: akd::node_state::NodeLabel, + parent: akd::NodeLabel, direction: akd::Direction, - sibling: akd::node_state::Node, + sibling: akd::Node, ) -> crate::types::LayerProof where H: winter_crypto::Hasher, @@ -159,7 +157,7 @@ where crate::types::LookupProof { epoch: proof.epoch, version: proof.version, - plaintext_value: proof.plaintext_value.0.as_bytes().to_vec(), + plaintext_value: proof.plaintext_value.to_vec(), exisitence_vrf_proof: proof.exisitence_vrf_proof.clone(), existence_proof: convert_membership_proof(&proof.existence_proof), marker_vrf_proof: proof.marker_vrf_proof.clone(), @@ -179,7 +177,7 @@ where for proof in &history_proof.proofs { let update_proof = crate::types::UpdateProof { epoch: proof.epoch, - plaintext_value: proof.plaintext_value.0.as_bytes().to_vec(), + plaintext_value: proof.plaintext_value.to_vec(), version: proof.version, existence_vrf_proof: proof.existence_vrf_proof.clone(), existence_at_ep: convert_membership_proof(&proof.existence_at_ep), @@ -225,14 +223,14 @@ async fn test_simple_lookup() -> Result<(), AkdError> { let mut updates = vec![]; for i in 0..15 { updates.push(( - AkdLabel(format!("hello{}", i)), - AkdValue(format!("hello{}", i)), + AkdLabel(format!("hello{}", i).as_bytes().to_vec()), + AkdValue(format!("hello{}", i).as_bytes().to_vec()), )); } - akd.publish::(updates, true).await?; + akd.publish::(updates).await?; - let target_label = AkdLabel(format!("hello{}", 10)); + let target_label = AkdLabel(format!("hello{}", 10).as_bytes().to_vec()); // retrieve the lookup proof let lookup_proof = akd.lookup(target_label.clone()).await?; @@ -247,7 +245,7 @@ async fn test_simple_lookup() -> Result<(), AkdError> { let akd_result = akd::client::lookup_verify::(&vrf_pk, root_hash, target_label.clone(), lookup_proof); - let target_label_bytes = target_label.0.as_bytes().to_vec(); + let target_label_bytes = target_label.to_vec(); let lean_result = crate::verify::lookup_verify( &vrf_pk.to_bytes(), @@ -263,20 +261,72 @@ async fn test_simple_lookup() -> Result<(), AkdError> { Ok(()) } +// #[tokio::test] +// async fn test_simple_lookup_for_small_tree() -> Result<(), AkdError> { +// let db = InMemoryDb::new(); +// let vrf = HardCodedAkdVRF {}; +// let akd = Directory::new::(&db, &vrf, false).await?; + +// let mut updates = vec![]; +// for i in 0..1 { +// updates.push(( +// AkdLabel(format!("hello{}", i).as_bytes().to_vec()), +// AkdValue(format!("hello{}", i).as_bytes().to_vec()), +// )); +// } + +// akd.publish::(updates).await?; + +// let target_label = AkdLabel(format!("hello{}", 0).as_bytes().to_vec()); + +// // retrieve the lookup proof +// let lookup_proof = akd.lookup(target_label.clone()).await?; +// // retrieve the root hash +// let current_azks = akd.retrieve_current_azks().await?; +// let root_hash = akd.get_root_hash::(¤t_azks).await?; + +// // create the "lean" lookup proof version +// let internal_lookup_proof = convert_lookup_proof::(&lookup_proof); + +// let vrf_pk = vrf.get_vrf_public_key().await?; + +// // perform the "traditional" AKD verification +// let akd_result = +// akd::client::lookup_verify::(&vrf_pk, root_hash, target_label.clone(), lookup_proof); + +// let target_label_bytes = target_label.to_vec(); +// let lean_result = crate::verify::lookup_verify( +// &vrf_pk.to_bytes(), +// to_digest::(root_hash), +// target_label_bytes, +// internal_lookup_proof, +// ) +// .map_err(|i_err| AkdError::AzksNotFound(format!("Internal: {:?}", i_err))); + +// // check the two results to make sure they both verify +// assert!(matches!(akd_result, Ok(()))); +// assert!(matches!(lean_result, Ok(()))); + +// Ok(()) +// } + #[tokio::test] async fn test_history_proof_multiple_epochs() -> Result<(), AkdError> { let db = InMemoryDb::new(); let vrf = HardCodedAkdVRF {}; let akd = Directory::new::(&db, &vrf, false).await?; let vrf_pk = akd.get_public_key().await.unwrap(); - let key = AkdLabel("label".to_string()); - let key_bytes = key.0.as_bytes().to_vec(); + let key = AkdLabel::from_utf8_str("label"); + let key_bytes = key.to_vec(); const EPOCHS: usize = 10; // publishes key versions in multiple epochs for epoch in 1..=EPOCHS { - let data = vec![(key.clone(), AkdValue(format!("value{}", epoch)))]; - akd.publish::(data, true).await?; + let data = vec![( + key.clone(), + AkdValue(format!("value{}", epoch).as_bytes().to_vec()), + )]; + akd.publish::(data).await?; } // retrieves and verifies history proofs for the key @@ -298,6 +348,7 @@ async fn test_history_proof_multiple_epochs() -> Result<(), AkdError> { previous_root_hashes.clone(), key.clone(), proof.clone(), + false, ); let lean_result = crate::verify::key_history_verify( &vrf_pk.to_bytes(), @@ -305,9 +356,10 @@ async fn test_history_proof_multiple_epochs() -> Result<(), AkdError> { to_digest_vec_opt::(previous_root_hashes.clone()), key_bytes.clone(), internal_proof.clone(), + false, ); - assert!(matches!(akd_result, Ok(())), "{:?}", akd_result); - assert!(matches!(lean_result, Ok(())), "{:?}", lean_result); + assert!(matches!(akd_result, Ok(_)), "{:?}", akd_result); + assert!(matches!(lean_result, Ok(_)), "{:?}", lean_result); } // corrupts root_hashes[0] and verifies both traditional and lean history verification fail @@ -320,6 +372,7 @@ async fn test_history_proof_multiple_epochs() -> Result<(), AkdError> { previous_root_hashes.clone(), key.clone(), proof.clone(), + false, ); // performs "lean" history verification let lean_result = crate::verify::key_history_verify( @@ -328,6 +381,7 @@ async fn test_history_proof_multiple_epochs() -> Result<(), AkdError> { to_digest_vec_opt::(previous_root_hashes), key_bytes, internal_proof, + false, ); assert!(akd_result.is_err(), "{:?}", akd_result); assert!(lean_result.is_err(), "{:?}", lean_result); @@ -341,11 +395,11 @@ async fn test_history_proof_single_epoch() -> Result<(), AkdError> { let vrf = HardCodedAkdVRF {}; let akd = Directory::new::(&db, &vrf, false).await?; let vrf_pk = akd.get_public_key().await.unwrap(); - let key = AkdLabel("label".to_string()); - let key_bytes = key.0.as_bytes().to_vec(); + let key = AkdLabel::from_utf8_str("label"); + let key_bytes = key.to_vec(); // publishes single key-value - akd.publish::(vec![(key.clone(), AkdValue("value".to_string()))], true) + akd.publish::(vec![(key.clone(), AkdValue::from_utf8_str("value"))]) .await?; // retrieves and verifies history proofs for the key @@ -364,6 +418,7 @@ async fn test_history_proof_single_epoch() -> Result<(), AkdError> { previous_root_hashes.clone(), key, proof, + false, ); let lean_result = crate::verify::key_history_verify( &vrf_pk.to_bytes(), @@ -371,51 +426,130 @@ async fn test_history_proof_single_epoch() -> Result<(), AkdError> { to_digest_vec_opt::(previous_root_hashes), key_bytes, internal_proof, + false, ); - assert!(matches!(akd_result, Ok(())), "{:?}", akd_result); - assert!(matches!(lean_result, Ok(())), "{:?}", lean_result); + assert!(matches!(akd_result, Ok(_)), "{:?}", akd_result); + assert!(matches!(lean_result, Ok(_)), "{:?}", lean_result); Ok(()) } -// NOTE: There is a problem with "small" AKD trees that appears to only manifest with -// SHA3 256 hashing - -// #[tokio::test] -// async fn test_simple_lookup_for_small_tree() -> Result<(), AkdError> { -// let db = InMemoryDb::new(); -// let mut akd = Directory::new::(&db).await?; - -// let mut updates = vec![]; -// for i in 0..1 { -// updates.push(( -// AkdLabel(format!("hello{}", i)), -// AkdValue(format!("hello{}", i)), -// )); -// } - -// akd.publish::(updates, true).await?; +#[tokio::test] +async fn test_tombstoned_key_history() -> Result<(), AkdError> { + let db = InMemoryDb::new(); + let vrf = HardCodedAkdVRF {}; + // epoch 0 + let akd = Directory::new::(&db, &vrf, false).await?; -// let target_label = AkdLabel(format!("hello{}", 0)); + // epoch 1 + akd.publish::(vec![( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world"), + )]) + .await?; + + // epoch 2 + akd.publish::(vec![( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world2"), + )]) + .await?; + + // epoch 3 + akd.publish::(vec![( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world3"), + )]) + .await?; + + // epoch 4 + akd.publish::(vec![( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world4"), + )]) + .await?; + + // epoch 5 + akd.publish::(vec![( + AkdLabel::from_utf8_str("hello"), + AkdValue::from_utf8_str("world5"), + )]) + .await?; + + // Epochs 1-5, we're going to tombstone 1 & 2 + let vrf_pk = akd.get_public_key().await?; + + // tombstone epochs 1 & 2 + let tombstones = [ + akd::storage::types::ValueStateKey("hello".as_bytes().to_vec(), 1u64), + akd::storage::types::ValueStateKey("hello".as_bytes().to_vec(), 2u64), + ]; + db.tombstone_value_states(&tombstones).await?; + + let history_proof = akd + .key_history::(&AkdLabel::from_utf8_str("hello")) + .await?; + assert_eq!(5, history_proof.proofs.len()); + let (root_hashes, previous_root_hashes) = + akd::directory::get_key_history_hashes(&akd, &history_proof).await?; -// // retrieve the lookup proof -// let lookup_proof = akd.lookup(target_label.clone()).await?; -// // retrieve the root hash -// let current_azks = akd.retrieve_current_azks().await?; -// let root_hash = akd.get_root_hash::(¤t_azks).await?; + // If we request a proof with tombstones but without saying we're OK with tombstones, throw an err + // check main client output + let tombstones = akd::client::key_history_verify::( + &vrf_pk, + root_hashes.clone(), + previous_root_hashes.clone(), + AkdLabel::from_utf8_str("hello"), + history_proof.clone(), + false, + ); + assert!(matches!(tombstones, Err(_))); -// // create the "lean" lookup proof version -// let internal_lookup_proof = convert_lookup_proof::(&lookup_proof); + // check lean client output + let internal_proof = convert_history_proof::(&history_proof); + let tombstones = crate::verify::key_history_verify( + &vrf_pk.to_bytes(), + to_digest_vec::(root_hashes.clone()), + to_digest_vec_opt::(previous_root_hashes.clone()), + AkdLabel::from_utf8_str("hello").to_vec(), + internal_proof, + false, + ); + assert!(matches!(tombstones, Err(_))); -// // perform the "traditional" AKD verification -// let akd_result = -// akd::client::lookup_verify::(root_hash, target_label, lookup_proof); + // We should be able to verify tombstones assuming the client is accepting + // of tombstoned states + // check main client output + let tombstones = akd::client::key_history_verify::( + &vrf_pk, + root_hashes.clone(), + previous_root_hashes.clone(), + AkdLabel::from_utf8_str("hello"), + history_proof.clone(), + true, + )?; + assert_eq!(false, tombstones[0]); + assert_eq!(false, tombstones[1]); + assert_eq!(false, tombstones[2]); + assert_eq!(true, tombstones[3]); + assert_eq!(true, tombstones[4]); + + // check lean client output + let internal_proof = convert_history_proof::(&history_proof); + let tombstones = crate::verify::key_history_verify( + &vrf_pk.to_bytes(), + to_digest_vec::(root_hashes), + to_digest_vec_opt::(previous_root_hashes), + AkdLabel::from_utf8_str("hello").to_vec(), + internal_proof, + true, + ) + .map_err(|i_err| AkdError::Storage(StorageError::Other(format!("Internal: {:?}", i_err))))?; -// let lean_result = -// crate::verify::lookup_verify(to_digest::(root_hash), vec![], internal_lookup_proof) -// .map_err(|i_err| AkdError::AzksNotFound(format!("Internal: {:?}", i_err))); -// // check the two results to make sure they both verify -// assert!(matches!(akd_result, Ok(()))); -// assert!(matches!(lean_result, Ok(()))); + assert_eq!(false, tombstones[0]); + assert_eq!(false, tombstones[1]); + assert_eq!(false, tombstones[2]); + assert_eq!(true, tombstones[3]); + assert_eq!(true, tombstones[4]); -// Ok(()) -// } + Ok(()) +} diff --git a/akd_client/src/types.rs b/akd_client/src/types.rs index 6ba0f547..4840a07d 100644 --- a/akd_client/src/types.rs +++ b/akd_client/src/types.rs @@ -41,6 +41,13 @@ pub const EMPTY_LABEL: NodeLabel = NodeLabel { len: 0, }; +/// A "tombstone" is a false value in an AKD ValueState denoting that a real value has been removed (e.g. data rentention policies). +/// Should a tombstone be encountered, we have to assume that the hash of the value is correct, and we move forward without being able to +/// verify the raw value. We utilize an empty array to save space in the storage layer +/// +/// See [GitHub issue #130](https://github.com/novifinancial/akd/issues/130) for more context +pub const TOMBSTONE: &[u8] = &[]; + // ============================================ // Structs // ============================================ diff --git a/akd_client/src/utils.rs b/akd_client/src/utils.rs index 86f4dc1b..0e2d3bba 100644 --- a/akd_client/src/utils.rs +++ b/akd_client/src/utils.rs @@ -7,7 +7,16 @@ //! Utility functions +#[cfg(feature = "nostd")] +use alloc::vec::Vec; + /// Retrieve the marker version pub(crate) fn get_marker_version(version: u64) -> u64 { 64u64 - (version.leading_zeros() as u64) - 1u64 } + +// FIXME: Make a real commitment here, alongwith a blinding factor. See issue #123 +/// Gets the bytes for a value. +pub(crate) fn value_to_bytes(value: &crate::AkdValue) -> crate::Digest { + crate::hash::hash(value) +} diff --git a/akd_client/src/verify.rs b/akd_client/src/verify.rs index 96d867c0..564c6d48 100644 --- a/akd_client/src/verify.rs +++ b/akd_client/src/verify.rs @@ -100,6 +100,11 @@ fn verify_nonmembership( Ok(verified) } +fn hash_plaintext_value(value: &AkdValue) -> Digest { + let single_hash = crate::utils::value_to_bytes(value); + merge(&[hash(&EMPTY_VALUE), single_hash]) +} + /// This function is called to verify that a given NodeLabel is indeed /// the VRF for a given version (fresh or stale) for a username. /// Hence, it also takes as input the server's public key. @@ -125,7 +130,6 @@ pub fn lookup_verify( ) -> Result<(), VerificationError> { let _epoch = proof.epoch; - // let _plaintext_value = proof.plaintext_value; #[cfg(feature = "vrf")] let version = proof.version; @@ -135,6 +139,14 @@ pub fn lookup_verify( let marker_proof = proof.marker_proof; let freshness_proof = proof.freshness_proof; + if hash_plaintext_value(&proof.plaintext_value) != existence_proof.hash_val { + return Err(verify_error!( + LookupProof, + bool, + "Hash of plaintext value did not match existence proof hash".to_string() + )); + } + #[cfg(feature = "vrf")] { let fresh_label = existence_proof.label; @@ -183,25 +195,32 @@ pub fn lookup_verify( } /// Verifies a key history proof, given the corresponding sequence of hashes. +/// Returns a vector of whether the validity of a hash could be verified. +/// When false, the value <=> hash validity at the position could not be +/// verified because the value has been removed ("tombstoned") from the storage layer. pub fn key_history_verify( vrf_public_key: &[u8], root_hashes: Vec, previous_root_hashes: Vec>, akd_key: AkdLabel, proof: HistoryProof, -) -> Result<(), VerificationError> { + allow_tombstones: bool, +) -> Result, VerificationError> { + let mut tombstones = vec![]; for (count, update_proof) in proof.proofs.into_iter().enumerate() { let root_hash = root_hashes[count]; let previous_root_hash = previous_root_hashes[count]; - verify_single_update_proof( + let is_tombstone = verify_single_update_proof( root_hash, vrf_public_key, previous_root_hash, update_proof, &akd_key, + allow_tombstones, )?; + tombstones.push(is_tombstone); } - Ok(()) + Ok(tombstones) } /// Verifies a single update proof @@ -211,7 +230,8 @@ fn verify_single_update_proof( previous_root_hash: Option, proof: UpdateProof, uname: &AkdLabel, -) -> Result<(), VerificationError> { + allow_tombstone: bool, +) -> Result { let epoch = proof.epoch; let _plaintext_value = &proof.plaintext_value; let version = proof.version; @@ -222,6 +242,29 @@ fn verify_single_update_proof( let previous_val_stale_at_ep = &proof.previous_val_stale_at_ep; let non_existence_before_ep = &proof.non_existence_before_ep; + let (is_tombstone, value_hash_valid) = match (allow_tombstone, &proof.plaintext_value) { + (true, bytes) if bytes == crate::TOMBSTONE => { + // A tombstone was encountered, we need to just take the + // hash of the value at "face value" since we don't have + // the real value available + (true, true) + } + (_, bytes) => { + // No tombstone so hash the value found, and compare to the existence proof's value + ( + false, + hash_plaintext_value(bytes) == existence_at_ep.hash_val, + ) + } + }; + if !value_hash_valid { + return Err(verify_error!( + HistoryProof, + bool, + "Hash of plaintext value did not match existence proof hash".to_string() + )); + } + // ***** PART 1 *************************** // Verify the VRF and membership proof for the corresponding label for the version being updated to. #[cfg(feature = "vrf")] @@ -341,5 +384,8 @@ fn verify_single_update_proof( uname, ver, epoch-1), error_type: VerificationErrorType::HistoryProof}); } } - Ok(()) + + // return indicator of if the value <=> hash mapping was verified + // or if the hash was simply taken at face-value. True = hash mapping verified + Ok(is_tombstone) } diff --git a/akd_mysql/src/mysql.rs b/akd_mysql/src/mysql.rs index 427469eb..eae6c82d 100644 --- a/akd_mysql/src/mysql.rs +++ b/akd_mysql/src/mysql.rs @@ -10,11 +10,12 @@ use crate::mysql_storables::MySqlStorable; use akd::errors::StorageError; use akd::history_tree_node::HistoryTreeNode; -use akd::node_state::NodeLabel; +use akd::node_state::HistoryNodeState; use akd::storage::types::{ AkdLabel, DbRecord, KeyData, StorageType, ValueState, ValueStateRetrievalFlag, }; use akd::storage::{Storable, Storage}; +use akd::NodeLabel; use async_trait::async_trait; use log::{debug, error, info, warn}; use mysql_async::prelude::*; @@ -487,9 +488,9 @@ impl<'a> AsyncMySqlDatabase { let head = &records[0]; let statement = |i: usize| -> String { match &head { - DbRecord::Azks(_) => DbRecord::set_batch_statement::(i), + DbRecord::Azks(_) => DbRecord::set_batch_statement::(i), DbRecord::HistoryNodeState(_) => { - DbRecord::set_batch_statement::(i) + DbRecord::set_batch_statement::(i) } DbRecord::HistoryTreeNode(_) => DbRecord::set_batch_statement::(i), DbRecord::ValueState(_) => { @@ -904,18 +905,21 @@ impl Storage for AsyncMySqlDatabase { } /// Retrieve a stored record from the data layer - async fn get(&self, id: St::Key) -> core::result::Result { + async fn get( + &self, + id: &St::Key, + ) -> core::result::Result { // we're in a transaction, meaning the object _might_ be newer and therefore we should try and read if from the transaction // log instead of the raw storage layer if self.is_transaction_active().await { - if let Some(result) = self.trans.get::(&id).await { + if let Some(result) = self.trans.get::(id).await { return Ok(result); } } // check for a cache hit if let Some(cache) = &self.cache { - if let Some(result) = cache.hit_test::(&id).await { + if let Some(result) = cache.hit_test::(id).await { return Ok(result); } } @@ -931,7 +935,7 @@ impl Storage for AsyncMySqlDatabase { async fn get_direct( &self, - id: St::Key, + id: &St::Key, ) -> core::result::Result { *(self.num_reads.write().await) += 1; self.record_call_stats( @@ -947,7 +951,7 @@ impl Storage for AsyncMySqlDatabase { let mut conn = self.get_connection().await?; let statement = DbRecord::get_specific_statement::(); - let params = DbRecord::get_specific_params::(&id); + let params = DbRecord::get_specific_params::(id); let out = match params { Some(p) => match conn.exec_first(statement, p).await { Err(err) => Err(err), @@ -996,7 +1000,7 @@ impl Storage for AsyncMySqlDatabase { /// Retrieve a batch of records by id async fn batch_get( &self, - ids: Vec, + ids: &[St::Key], ) -> core::result::Result, StorageError> { let mut map = Vec::new(); @@ -1164,6 +1168,47 @@ impl Storage for AsyncMySqlDatabase { Ok(map) } + async fn tombstone_value_states( + &self, + keys: &[akd::storage::types::ValueStateKey], + ) -> core::result::Result<(), StorageError> { + // NOTE: This might be optimizable in the future where we could use a SQL statement such as + // + // UPDATE `users` + // SET `data` = TOMBSTONE + // WHERE key in (set) + // + // However, the problem comes from managing an active transaction and cache (if there is one) + // since we may need to batch load nodes anyways in order to get the other properties + // which might need to be set. We could write everything to SQL, and after-the-fact update + // the active transaction and caches with replacing nodes which were updated? Anyways it's a + // relatively minor improvement here, due to proper use of batch operations + + if keys.is_empty() { + return Ok(()); + } + + let data = self.batch_get::(keys).await?; + let mut new_data = vec![]; + for record in data { + if let DbRecord::ValueState(value_state) = record { + new_data.push(DbRecord::ValueState(ValueState { + epoch: value_state.epoch, + label: value_state.label, + plaintext_val: akd::AkdValue(akd::TOMBSTONE.to_vec()), + username: value_state.username, + version: value_state.version, + })); + } + } + if !new_data.is_empty() { + debug!("Tombstoning {} entries", new_data.len()); + self.batch_set(new_data).await?; + } + + Ok(()) + } + async fn get_user_data( &self, username: &AkdLabel, @@ -1580,7 +1625,7 @@ impl Storage for AsyncMySqlDatabase { async fn get_epoch_lte_epoch( &self, - node_label: akd::node_state::NodeLabel, + node_label: NodeLabel, epoch_in_question: u64, ) -> core::result::Result { *(self.num_reads.write().await) += 1; diff --git a/akd_mysql/src/mysql_storables.rs b/akd_mysql/src/mysql_storables.rs index c9e531fb..dab89aaf 100644 --- a/akd_mysql/src/mysql_storables.rs +++ b/akd_mysql/src/mysql_storables.rs @@ -10,6 +10,7 @@ use std::convert::TryInto; use akd::history_tree_node::{HistoryTreeNode, NodeKey}; +use akd::node_state::{HistoryChildState, HistoryNodeState, NodeStateKey}; use akd::storage::types::{DbRecord, StorageType}; use akd::storage::Storable; use akd::ARITY; @@ -131,7 +132,7 @@ impl MySqlStorable for DbRecord { StorageType::Azks => format!("INSERT INTO `{}` (`key`, {}) VALUES (:key, :epoch, :num_nodes) as new ON DUPLICATE KEY UPDATE `epoch` = new.epoch, `num_nodes` = new.num_nodes", TABLE_AZKS, SELECT_AZKS_DATA), StorageType::HistoryNodeState => format!("INSERT INTO `{}` ({}) VALUES {} as new ON DUPLICATE KEY UPDATE `value` = new.value, `child_states` = new.child_states", TABLE_HISTORY_NODE_STATES, SELECT_HISTORY_NODE_STATE_DATA, parts), StorageType::HistoryTreeNode => format!("INSERT INTO `{}` ({}) VALUES {} as new ON DUPLICATE KEY UPDATE `label_len` = new.label_len, `label_val` = new.label_val, `birth_epoch` = new.birth_epoch, `last_epoch` = new.last_epoch, `parent_label_len` = new.parent_label_len, `parent_label_val` = new.parent_label_val, `node_type` = new.node_type", TABLE_HISTORY_TREE_NODES, SELECT_HISTORY_TREE_NODE_DATA, parts), - StorageType::ValueState => format!("INSERT INTO `{}` ({}) VALUES {}", TABLE_USER, SELECT_USER_DATA, parts), + StorageType::ValueState => format!("INSERT INTO `{}` ({}) VALUES {} as new ON DUPLICATE KEY UPDATE `data` = new.data, `node_label_val` = new.node_label_val, `node_label_len` = new.node_label_len, `version` = new.version", TABLE_USER, SELECT_USER_DATA, parts), } } @@ -394,8 +395,8 @@ impl MySqlStorable for DbRecord { // Since these are constructed from a safe key, they should never fail // so we'll leave the unwrap to simplify let bin = St::get_full_binary_key_id(key); - let back: akd::node_state::NodeStateKey = - akd::node_state::HistoryNodeState::key_from_full_binary(&bin).unwrap(); + let back: NodeStateKey = + HistoryNodeState::key_from_full_binary(&bin).unwrap(); vec![ (format!("label_len{}", idx), Value::from(back.0.len)), (format!("label_val{}", idx), Value::from(back.0.val)), @@ -483,7 +484,7 @@ impl MySqlStorable for DbRecord { let value_vec: Vec = value; let label_val_vec: Vec = label_val; let child_states_bin_vec: Vec = child_states; - let child_states_decoded: [Option; ARITY] = + let child_states_decoded: [Option; ARITY] = bincode::deserialize(&child_states_bin_vec).map_err( |serialization_err| { MySqlError::Other( diff --git a/integration_tests/src/test_util.rs b/integration_tests/src/test_util.rs index 24e555b2..fbda8baa 100644 --- a/integration_tests/src/test_util.rs +++ b/integration_tests/src/test_util.rs @@ -7,9 +7,9 @@ extern crate thread_id; // License, Version 2.0 found in the LICENSE-APACHE file in the root directory // of this source tree. -use akd::directory::Directory; use akd::ecvrf::VRFKeyStorage; use akd::storage::types::{AkdLabel, AkdValue}; +use akd::Directory; use log::{info, Level, Metadata, Record}; use once_cell::sync::OnceCell; use rand::distributions::Alphanumeric; @@ -159,10 +159,13 @@ pub(crate) async fn directory_test_suite< for i in 1..=3 { let mut data = Vec::new(); for value in users.iter() { - data.push((AkdLabel(value.clone()), AkdValue(format!("{}", i)))); + data.push(( + AkdLabel::from_utf8_str(value), + AkdValue(format!("{}", i).as_bytes().to_vec()), + )); } - if let Err(error) = dir.publish::(data, true).await { + if let Err(error) = dir.publish::(data).await { panic!("Error publishing batch {:?}", error); } else { info!("Published epoch {}", i); @@ -174,7 +177,7 @@ pub(crate) async fn directory_test_suite< let root_hash = dir.get_root_hash::(&azks).await.unwrap(); for user in users.iter().choose_multiple(&mut rng, 10) { - let key = AkdLabel(user.clone()); + let key = AkdLabel::from_utf8_str(user); match dir.lookup::(key.clone()).await { Err(error) => panic!("Error looking up user information {:?}", error), Ok(proof) => { @@ -191,7 +194,7 @@ pub(crate) async fn directory_test_suite< // Perform 2 random history proofs on the published material for user in users.iter().choose_multiple(&mut rng, 2) { - let key = AkdLabel(user.clone()); + let key = AkdLabel::from_utf8_str(user); match dir.key_history::(&key).await { Err(error) => panic!("Error performing key history retrieval {:?}", error), Ok(proof) => { @@ -206,6 +209,7 @@ pub(crate) async fn directory_test_suite< previous_root_hashes, key, proof, + false, ) { panic!("History proof failed to verify {:?}", error); } @@ -275,10 +279,13 @@ pub(crate) async fn test_lookups(data, true).await { + if let Err(error) = dir.publish::(data).await { panic!("Error publishing batch {:?}", error); } else { info!("Published epoch {}", i); @@ -292,7 +299,7 @@ pub(crate) async fn test_lookups, bool), + PublishBatch(Vec<(String, String)>), Lookup(String), KeyHistory(String), Audit(u64, u64), @@ -69,7 +68,10 @@ where (DirectoryCommand::Publish(a, b), Some(response)) => { let tic = Instant::now(); match directory - .publish::(vec![(AkdLabel(a.clone()), AkdValue(b.clone()))], false) + .publish::(vec![( + AkdLabel::from_utf8_str(&a), + AkdValue::from_utf8_str(&b), + )]) .await { Ok(EpochHash(epoch, hash)) => { @@ -90,16 +92,20 @@ where } } } - (DirectoryCommand::PublishBatch(batches, with_trans), Some(response)) => { + (DirectoryCommand::PublishBatch(batches), Some(response)) => { let tic = Instant::now(); let len = batches.len(); match directory .publish::( batches .into_iter() - .map(|(key, value)| (AkdLabel(key), AkdValue(value))) + .map(|(key, value)| { + ( + AkdLabel::from_utf8_str(&key), + AkdValue::from_utf8_str(&value), + ) + }) .collect(), - with_trans, ) .await { @@ -115,7 +121,7 @@ where } } (DirectoryCommand::Lookup(a), Some(response)) => { - match directory.lookup::(AkdLabel(a.clone())).await { + match directory.lookup::(AkdLabel::from_utf8_str(&a)).await { Ok(proof) => { let hash = get_root_hash::<_, H, V>(directory, None).await; match hash { @@ -124,7 +130,7 @@ where let verification = akd::client::lookup_verify::( &vrf_pk, root_hash, - AkdLabel(a.clone()), + AkdLabel::from_utf8_str(&a), proof, ); if verification.is_err() { @@ -151,7 +157,10 @@ where } } (DirectoryCommand::KeyHistory(a), Some(response)) => { - match directory.key_history::(&AkdLabel(a.clone())).await { + match directory + .key_history::(&AkdLabel::from_utf8_str(&a)) + .await + { Ok(_proof) => { let msg = format!("GOT KEY HISTORY FOR '{}'", a); response.send(Ok(msg)).unwrap(); diff --git a/poc/src/main.rs b/poc/src/main.rs index bed9300b..ceb96c64 100644 --- a/poc/src/main.rs +++ b/poc/src/main.rs @@ -8,9 +8,9 @@ // License, Version 2.0 found in the LICENSE-APACHE file in the root directory // of this source tree. -use akd::directory::Directory; use akd::ecvrf::HardCodedAkdVRF; use akd::storage::Storage; +use akd::Directory; use akd_mysql::mysql::{AsyncMySqlDatabase, MySqlCacheOptions}; use clap::arg_enum; use commands::Command; @@ -67,7 +67,6 @@ enum OtherMode { BenchPublish { num_users: u64, num_updates_per_user: u64, - use_transactions: Option, }, #[structopt(about = "Benchmark lookup API")] BenchLookup { @@ -229,8 +228,8 @@ async fn process_input( let mut data = Vec::new(); for value in values.iter() { let state = akd::storage::types::DbRecord::build_user_state( - value.clone(), - value.clone(), + value.as_bytes().to_vec(), + value.as_bytes().to_vec(), 1u64, 1u32, [1u8; 32], @@ -255,14 +254,7 @@ async fn process_input( OtherMode::BenchPublish { num_users, num_updates_per_user, - use_transactions, } => { - let use_trans = if let Some(tr) = use_transactions { - *tr - } else { - true - }; - println!("======= Benchmark operation requested ======= "); println!( "Beginning PUBLISH benchmark of {} users with {} updates/user", @@ -298,7 +290,7 @@ async fn process_input( .collect(); let (rpc_tx, rpc_rx) = tokio::sync::oneshot::channel(); let rpc = directory_host::Rpc( - directory_host::DirectoryCommand::PublishBatch(user_data, use_trans), + directory_host::DirectoryCommand::PublishBatch(user_data), Some(rpc_tx), ); let sent = tx.clone().send(rpc).await; @@ -362,7 +354,7 @@ async fn process_input( info!("Inserting {} users", num_users); let (rpc_tx, rpc_rx) = tokio::sync::oneshot::channel(); let rpc = directory_host::Rpc( - directory_host::DirectoryCommand::PublishBatch(user_data.clone(), true), + directory_host::DirectoryCommand::PublishBatch(user_data.clone()), Some(rpc_tx), ); let sent = tx.clone().send(rpc).await;