From 9ba88743019b8ac9ee4fb7001803496bb506a016 Mon Sep 17 00:00:00 2001 From: Simonas Kazlauskas Date: Wed, 17 Apr 2024 14:50:58 +0000 Subject: [PATCH] prefetch: claim.sweat::claim (#11094) --- core/store/src/config.rs | 17 ++- .../src/trie/prefetching_trie_storage.rs | 56 ++++++--- runtime/runtime/src/prefetch.rs | 111 ++++++++++++++++-- 3 files changed, 150 insertions(+), 34 deletions(-) diff --git a/core/store/src/config.rs b/core/store/src/config.rs index c0de506e8e4..9039dcb5c75 100644 --- a/core/store/src/config.rs +++ b/core/store/src/config.rs @@ -273,11 +273,18 @@ impl Default for StoreConfig { "oracle.sweat".to_owned(), "sweat_the_oracle.testnet".to_owned(), ], - claim_sweat_prefetch_config: vec![PrefetchConfig { - receiver: "claim.sweat".to_owned(), - sender: "token.sweat".to_owned(), - method_name: "record_batch_for_hold".to_owned(), - }], + claim_sweat_prefetch_config: vec![ + PrefetchConfig { + receiver: "claim.sweat".to_owned(), + sender: "token.sweat".to_owned(), + method_name: "record_batch_for_hold".to_owned(), + }, + PrefetchConfig { + receiver: "claim.sweat".to_owned(), + sender: String::new(), + method_name: "claim".to_owned(), + }, + ], kaiching_prefetch_config: vec![PrefetchConfig { receiver: "earn.kaiching".to_owned(), sender: "wallet.kaiching".to_owned(), diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index 4ea5ebf8095..8cb4b53264c 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -78,6 +78,9 @@ pub struct PrefetchApi { /// multiple times. pub(crate) prefetching: PrefetchStagingArea, + store: Store, + shard_cache: TrieCache, + pub enable_receipt_prefetching: bool, /// Configured accounts will be prefetched as SWEAT token account, if predecessor is listed as receiver. pub sweat_prefetch_receivers: Vec, @@ -421,17 +424,12 @@ impl PrefetchApi { claim_sweat_prefetch_config, kaiching_prefetch_config, shard_uid, + store, + shard_cache, }; let (shutdown_tx, shutdown_rx) = crossbeam::channel::bounded(1); let handles = (0..NUM_IO_THREADS) - .map(|_| { - this.start_io_thread( - store.clone(), - shard_cache.clone(), - shard_uid, - shutdown_rx.clone(), - ) - }) + .map(|_| this.start_io_thread(shard_uid, shutdown_rx.clone())) .collect(); let handle = PrefetchingThreadsHandle { shutdown_channel: Some(shutdown_tx), handles }; (this, handle) @@ -448,15 +446,26 @@ impl PrefetchApi { }) } + pub fn make_storage(&self) -> Rc { + Rc::new(TriePrefetchingStorage::new( + self.store.clone(), + self.shard_uid, + self.shard_cache.clone(), + self.prefetching.clone(), + )) + } + pub fn start_io_thread( &self, - store: Store, - shard_cache: TrieCache, shard_uid: ShardUId, shutdown_rx: crossbeam::channel::Receiver<()>, ) -> thread::JoinHandle<()> { - let prefetcher_storage = - TriePrefetchingStorage::new(store, shard_uid, shard_cache, self.prefetching.clone()); + let prefetcher_storage = TriePrefetchingStorage::new( + self.store.clone(), + self.shard_uid, + self.shard_cache.clone(), + self.prefetching.clone(), + ); let work_queue = self.work_queue_rx.clone(); let metric_prefetch_sent = metrics::PREFETCH_SENT.with_label_values(&[&shard_uid.shard_id.to_string()]); @@ -482,13 +491,22 @@ impl PrefetchApi { Trie::new(Rc::new(prefetcher_storage.clone()), trie_root, None); let storage_key = trie_key.to_vec(); metric_prefetch_sent.inc(); - if let Ok(_maybe_value) = prefetcher_trie.get(&storage_key) { - near_o11y::io_trace!(count: "prefetch"); - } else { - // This may happen in rare occasions and can be ignored safely. - // See comments in `TriePrefetchingStorage::retrieve_raw_bytes`. - near_o11y::io_trace!(count: "prefetch_failure"); - metric_prefetch_fail.inc(); + match prefetcher_trie.get(&storage_key) { + Ok(_maybe_value) => { + near_o11y::io_trace!(count: "prefetch"); + } + Err(e) => { + tracing::debug!( + target: "store::trie::prefetch", + message = "prefetching failure", + error = %e, + key = ?trie_key + ); + // This may happen in rare occasions and can be ignored safely. + // See comments in `TriePrefetchingStorage::retrieve_raw_bytes`. + near_o11y::io_trace!(count: "prefetch_failure"); + metric_prefetch_fail.inc(); + } } } } diff --git a/runtime/runtime/src/prefetch.rs b/runtime/runtime/src/prefetch.rs index b37171e01a5..99355783174 100644 --- a/runtime/runtime/src/prefetch.rs +++ b/runtime/runtime/src/prefetch.rs @@ -41,6 +41,7 @@ //! in the prefetcher. Implementation details for most limits are in //! `core/store/src/trie/prefetching_trie_storage.rs` +use borsh::BorshSerialize as _; use near_o11y::metrics::prometheus; use near_o11y::metrics::prometheus::core::GenericCounter; use near_primitives::receipt::{Receipt, ReceiptEnum}; @@ -151,12 +152,31 @@ impl TriePrefetcher { } } - if self.prefetch_api.claim_sweat_prefetch_config.iter().any(|cfg| { - cfg.sender == receipt.predecessor_id.as_str() - && cfg.receiver == account_id.as_str() - && cfg.method_name == fn_call.method_name - }) { - self.prefetch_claim_sweat(account_id.clone(), &fn_call.args)?; + let claim_sweat_cfg = &self.prefetch_api.claim_sweat_prefetch_config; + if fn_call.method_name == "record_batch_for_hold" { + let config = claim_sweat_cfg.iter().find(|cfg| { + cfg.receiver == account_id.as_str() + && cfg.method_name == fn_call.method_name + && cfg.sender == receipt.predecessor_id.as_str() + }); + if config.is_some() { + self.prefetch_claim_sweat_record_batch_for_hold( + account_id.clone(), + &fn_call.args, + )? + } + } + if fn_call.method_name == "claim" { + let config = claim_sweat_cfg.iter().find(|cfg| { + cfg.receiver == account_id.as_str() + && cfg.method_name == fn_call.method_name + }); + if config.is_some() { + self.prefetch_claim_sweat_claim( + account_id.clone(), + receipt.predecessor_id.clone(), + )? + } } if self.prefetch_api.kaiching_prefetch_config.iter().any(|cfg| { @@ -223,11 +243,11 @@ impl TriePrefetcher { match res { Err(PrefetchError::QueueFull) => { self.prefetch_queue_full.inc(); - debug!(target: "prefetcher", "I/O scheduler input queue is full, dropping prefetch request"); + debug!(target: "runtime::prefetch", "I/O scheduler input queue is full, dropping prefetch request"); } Err(PrefetchError::QueueDisconnected) => { // This shouldn't have happened, hence logging warning here - warn!(target: "prefetcher", "I/O scheduler input queue is disconnected, dropping prefetch request"); + warn!(target: "runtime::prefetch", "I/O scheduler input queue is disconnected, dropping prefetch request"); } Ok(()) => self.prefetch_enqueued.inc(), }; @@ -272,10 +292,14 @@ impl TriePrefetcher { Ok(()) } - /// Prefetcher tuned for claim.sweat contract calls. + /// Prefetcher tuned for claim.sweat::record_batch_for_hold contract calls. /// /// Remove after #10965 reaches mainnet. - fn prefetch_claim_sweat(&self, account_id: AccountId, arg: &[u8]) -> Result<(), PrefetchError> { + fn prefetch_claim_sweat_record_batch_for_hold( + &self, + account_id: AccountId, + arg: &[u8], + ) -> Result<(), PrefetchError> { let Ok(json) = serde_json::de::from_slice::(arg) else { return Ok(()); }; @@ -302,6 +326,73 @@ impl TriePrefetcher { Ok(()) } + /// Prefetcher tuned for claim.sweat::claim contract calls. + /// + /// Remove after #10965 reaches mainnet. + fn prefetch_claim_sweat_claim( + &self, + account_id: AccountId, + predecessor: AccountId, + ) -> Result<(), PrefetchError> { + let Self { prefetch_api, trie_root, .. } = self; + let trie_root = *trie_root; + let prefetch_api = prefetch_api.clone(); + rayon::spawn(move || { + let mut account_data_key = Vec::with_capacity(4 + 8 + predecessor.len()); + let Ok(()) = 0u8.serialize(&mut account_data_key) else { return }; + let Ok(()) = predecessor.serialize(&mut account_data_key) else { return }; + let trie_key = + TrieKey::ContractData { account_id: account_id.clone(), key: account_data_key }; + // Just read this directly for now since this is temporary anyway + let prefetcher_storage = prefetch_api.make_storage(); + let trie = Trie::new(prefetcher_storage, trie_root, None); + let Ok(Some(account_record)) = trie.get(&trie_key.to_vec()) else { + tracing::debug!( + target: "runtime::prefetch", + message = "could not load AccountRecord", + key = ?trie_key, + ); + return; + }; + #[derive(borsh::BorshDeserialize)] + #[allow(dead_code)] + struct AccountRecord { + accruals: Vec<(u32, u32)>, + is_enabled: bool, + claim_period_refreshed_at: u32, + is_locked: bool, + } + let Ok(account_record) = borsh::from_slice::(&account_record) else { + tracing::debug!( + target: "runtime::prefetch", + message = "could not decode AccountRecord", + ); + return; + }; + + for (dt, idx) in account_record.accruals { + let mut accruals_key = Vec::with_capacity(4 + 8); + // StorageKey::Accruals + let Ok(()) = 1u8.serialize(&mut accruals_key) else { continue }; + let Ok(()) = dt.serialize(&mut accruals_key) else { continue }; + let accruals_key = sha2::Sha256::digest(&accruals_key).to_vec(); + let _ = prefetch_api.prefetch_trie_key( + trie_root, + TrieKey::ContractData { account_id: account_id.clone(), key: accruals_key }, + ); + let mut amount_key = Vec::with_capacity(4 + 8 + 8); + let Ok(()) = 2u8.serialize(&mut amount_key) else { continue }; + let Ok(()) = dt.serialize(&mut amount_key) else { continue }; + amount_key.extend(&idx.to_le_bytes()); // index into Vector + let _ = prefetch_api.prefetch_trie_key( + trie_root, + TrieKey::ContractData { account_id: account_id.clone(), key: amount_key }, + ); + } + }); + Ok(()) + } + /// Prefetcher tuned for kaiching contract calls. /// /// Remove after #10965 reaches mainnet.