Skip to content

Commit

Permalink
prefetch: claim.sweat::claim (#11094)
Browse files Browse the repository at this point in the history
  • Loading branch information
nagisa authored and VanBarbascu committed Apr 17, 2024
1 parent c5e3782 commit 9ba8874
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 34 deletions.
17 changes: 12 additions & 5 deletions core/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,18 @@ impl Default for StoreConfig {
"oracle.sweat".to_owned(),
"sweat_the_oracle.testnet".to_owned(),
],
claim_sweat_prefetch_config: vec![PrefetchConfig {
receiver: "claim.sweat".to_owned(),
sender: "token.sweat".to_owned(),
method_name: "record_batch_for_hold".to_owned(),
}],
claim_sweat_prefetch_config: vec![
PrefetchConfig {
receiver: "claim.sweat".to_owned(),
sender: "token.sweat".to_owned(),
method_name: "record_batch_for_hold".to_owned(),
},
PrefetchConfig {
receiver: "claim.sweat".to_owned(),
sender: String::new(),
method_name: "claim".to_owned(),
},
],
kaiching_prefetch_config: vec![PrefetchConfig {
receiver: "earn.kaiching".to_owned(),
sender: "wallet.kaiching".to_owned(),
Expand Down
56 changes: 37 additions & 19 deletions core/store/src/trie/prefetching_trie_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ pub struct PrefetchApi {
/// multiple times.
pub(crate) prefetching: PrefetchStagingArea,

store: Store,
shard_cache: TrieCache,

pub enable_receipt_prefetching: bool,
/// Configured accounts will be prefetched as SWEAT token account, if predecessor is listed as receiver.
pub sweat_prefetch_receivers: Vec<AccountId>,
Expand Down Expand Up @@ -421,17 +424,12 @@ impl PrefetchApi {
claim_sweat_prefetch_config,
kaiching_prefetch_config,
shard_uid,
store,
shard_cache,
};
let (shutdown_tx, shutdown_rx) = crossbeam::channel::bounded(1);
let handles = (0..NUM_IO_THREADS)
.map(|_| {
this.start_io_thread(
store.clone(),
shard_cache.clone(),
shard_uid,
shutdown_rx.clone(),
)
})
.map(|_| this.start_io_thread(shard_uid, shutdown_rx.clone()))
.collect();
let handle = PrefetchingThreadsHandle { shutdown_channel: Some(shutdown_tx), handles };
(this, handle)
Expand All @@ -448,15 +446,26 @@ impl PrefetchApi {
})
}

pub fn make_storage(&self) -> Rc<dyn TrieStorage> {
Rc::new(TriePrefetchingStorage::new(
self.store.clone(),
self.shard_uid,
self.shard_cache.clone(),
self.prefetching.clone(),
))
}

pub fn start_io_thread(
&self,
store: Store,
shard_cache: TrieCache,
shard_uid: ShardUId,
shutdown_rx: crossbeam::channel::Receiver<()>,
) -> thread::JoinHandle<()> {
let prefetcher_storage =
TriePrefetchingStorage::new(store, shard_uid, shard_cache, self.prefetching.clone());
let prefetcher_storage = TriePrefetchingStorage::new(
self.store.clone(),
self.shard_uid,
self.shard_cache.clone(),
self.prefetching.clone(),
);
let work_queue = self.work_queue_rx.clone();
let metric_prefetch_sent =
metrics::PREFETCH_SENT.with_label_values(&[&shard_uid.shard_id.to_string()]);
Expand All @@ -482,13 +491,22 @@ impl PrefetchApi {
Trie::new(Rc::new(prefetcher_storage.clone()), trie_root, None);
let storage_key = trie_key.to_vec();
metric_prefetch_sent.inc();
if let Ok(_maybe_value) = prefetcher_trie.get(&storage_key) {
near_o11y::io_trace!(count: "prefetch");
} else {
// This may happen in rare occasions and can be ignored safely.
// See comments in `TriePrefetchingStorage::retrieve_raw_bytes`.
near_o11y::io_trace!(count: "prefetch_failure");
metric_prefetch_fail.inc();
match prefetcher_trie.get(&storage_key) {
Ok(_maybe_value) => {
near_o11y::io_trace!(count: "prefetch");
}
Err(e) => {
tracing::debug!(
target: "store::trie::prefetch",
message = "prefetching failure",
error = %e,
key = ?trie_key
);
// This may happen in rare occasions and can be ignored safely.
// See comments in `TriePrefetchingStorage::retrieve_raw_bytes`.
near_o11y::io_trace!(count: "prefetch_failure");
metric_prefetch_fail.inc();
}
}
}
}
Expand Down
111 changes: 101 additions & 10 deletions runtime/runtime/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
//! in the prefetcher. Implementation details for most limits are in
//! `core/store/src/trie/prefetching_trie_storage.rs`
use borsh::BorshSerialize as _;
use near_o11y::metrics::prometheus;
use near_o11y::metrics::prometheus::core::GenericCounter;
use near_primitives::receipt::{Receipt, ReceiptEnum};
Expand Down Expand Up @@ -151,12 +152,31 @@ impl TriePrefetcher {
}
}

if self.prefetch_api.claim_sweat_prefetch_config.iter().any(|cfg| {
cfg.sender == receipt.predecessor_id.as_str()
&& cfg.receiver == account_id.as_str()
&& cfg.method_name == fn_call.method_name
}) {
self.prefetch_claim_sweat(account_id.clone(), &fn_call.args)?;
let claim_sweat_cfg = &self.prefetch_api.claim_sweat_prefetch_config;
if fn_call.method_name == "record_batch_for_hold" {
let config = claim_sweat_cfg.iter().find(|cfg| {
cfg.receiver == account_id.as_str()
&& cfg.method_name == fn_call.method_name
&& cfg.sender == receipt.predecessor_id.as_str()
});
if config.is_some() {
self.prefetch_claim_sweat_record_batch_for_hold(
account_id.clone(),
&fn_call.args,
)?
}
}
if fn_call.method_name == "claim" {
let config = claim_sweat_cfg.iter().find(|cfg| {
cfg.receiver == account_id.as_str()
&& cfg.method_name == fn_call.method_name
});
if config.is_some() {
self.prefetch_claim_sweat_claim(
account_id.clone(),
receipt.predecessor_id.clone(),
)?
}
}

if self.prefetch_api.kaiching_prefetch_config.iter().any(|cfg| {
Expand Down Expand Up @@ -223,11 +243,11 @@ impl TriePrefetcher {
match res {
Err(PrefetchError::QueueFull) => {
self.prefetch_queue_full.inc();
debug!(target: "prefetcher", "I/O scheduler input queue is full, dropping prefetch request");
debug!(target: "runtime::prefetch", "I/O scheduler input queue is full, dropping prefetch request");
}
Err(PrefetchError::QueueDisconnected) => {
// This shouldn't have happened, hence logging warning here
warn!(target: "prefetcher", "I/O scheduler input queue is disconnected, dropping prefetch request");
warn!(target: "runtime::prefetch", "I/O scheduler input queue is disconnected, dropping prefetch request");
}
Ok(()) => self.prefetch_enqueued.inc(),
};
Expand Down Expand Up @@ -272,10 +292,14 @@ impl TriePrefetcher {
Ok(())
}

/// Prefetcher tuned for claim.sweat contract calls.
/// Prefetcher tuned for claim.sweat::record_batch_for_hold contract calls.
///
/// Remove after #10965 reaches mainnet.
fn prefetch_claim_sweat(&self, account_id: AccountId, arg: &[u8]) -> Result<(), PrefetchError> {
fn prefetch_claim_sweat_record_batch_for_hold(
&self,
account_id: AccountId,
arg: &[u8],
) -> Result<(), PrefetchError> {
let Ok(json) = serde_json::de::from_slice::<serde_json::Value>(arg) else {
return Ok(());
};
Expand All @@ -302,6 +326,73 @@ impl TriePrefetcher {
Ok(())
}

/// Prefetcher tuned for claim.sweat::claim contract calls.
///
/// Remove after #10965 reaches mainnet.
fn prefetch_claim_sweat_claim(
&self,
account_id: AccountId,
predecessor: AccountId,
) -> Result<(), PrefetchError> {
let Self { prefetch_api, trie_root, .. } = self;
let trie_root = *trie_root;
let prefetch_api = prefetch_api.clone();
rayon::spawn(move || {
let mut account_data_key = Vec::with_capacity(4 + 8 + predecessor.len());
let Ok(()) = 0u8.serialize(&mut account_data_key) else { return };
let Ok(()) = predecessor.serialize(&mut account_data_key) else { return };
let trie_key =
TrieKey::ContractData { account_id: account_id.clone(), key: account_data_key };
// Just read this directly for now since this is temporary anyway
let prefetcher_storage = prefetch_api.make_storage();
let trie = Trie::new(prefetcher_storage, trie_root, None);
let Ok(Some(account_record)) = trie.get(&trie_key.to_vec()) else {
tracing::debug!(
target: "runtime::prefetch",
message = "could not load AccountRecord",
key = ?trie_key,
);
return;
};
#[derive(borsh::BorshDeserialize)]
#[allow(dead_code)]
struct AccountRecord {
accruals: Vec<(u32, u32)>,
is_enabled: bool,
claim_period_refreshed_at: u32,
is_locked: bool,
}
let Ok(account_record) = borsh::from_slice::<AccountRecord>(&account_record) else {
tracing::debug!(
target: "runtime::prefetch",
message = "could not decode AccountRecord",
);
return;
};

for (dt, idx) in account_record.accruals {
let mut accruals_key = Vec::with_capacity(4 + 8);
// StorageKey::Accruals
let Ok(()) = 1u8.serialize(&mut accruals_key) else { continue };
let Ok(()) = dt.serialize(&mut accruals_key) else { continue };
let accruals_key = sha2::Sha256::digest(&accruals_key).to_vec();
let _ = prefetch_api.prefetch_trie_key(
trie_root,
TrieKey::ContractData { account_id: account_id.clone(), key: accruals_key },
);
let mut amount_key = Vec::with_capacity(4 + 8 + 8);
let Ok(()) = 2u8.serialize(&mut amount_key) else { continue };
let Ok(()) = dt.serialize(&mut amount_key) else { continue };
amount_key.extend(&idx.to_le_bytes()); // index into Vector
let _ = prefetch_api.prefetch_trie_key(
trie_root,
TrieKey::ContractData { account_id: account_id.clone(), key: amount_key },
);
}
});
Ok(())
}

/// Prefetcher tuned for kaiching contract calls.
///
/// Remove after #10965 reaches mainnet.
Expand Down

0 comments on commit 9ba8874

Please sign in to comment.