diff --git a/chain/client/src/stateless_validation/partial_witness/mod.rs b/chain/client/src/stateless_validation/partial_witness/mod.rs index 9861d111eb8..b627a5c7348 100644 --- a/chain/client/src/stateless_validation/partial_witness/mod.rs +++ b/chain/client/src/stateless_validation/partial_witness/mod.rs @@ -1,6 +1,7 @@ mod encoding; mod partial_deploys_tracker; pub mod partial_witness_actor; +pub mod partial_witness_actor_v2; mod partial_witness_tracker; pub use encoding::witness_part_length; diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index 191607df2fc..5f649038ad8 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -1,15 +1,8 @@ -use std::collections::HashSet; -use std::num::NonZeroUsize; -use std::sync::Arc; - -use itertools::Itertools; -use lru::LruCache; -use near_async::futures::{AsyncComputationSpawner, AsyncComputationSpawnerExt}; -use near_async::messaging::{Actor, CanSend, Handler, Sender}; +use near_async::futures::{AsyncComputationSpawner, TokioRuntimeFutureSpawner}; +use near_async::messaging::{Actor, Handler, Sender}; use near_async::time::Clock; use near_async::{MultiSend, MultiSenderFrom}; use near_chain::types::RuntimeAdapter; -use near_chain::Error; use near_chain_configs::MutableValidatorSigner; use near_epoch_manager::EpochManagerAdapter; use near_network::state_witness::{ @@ -17,66 +10,21 @@ use near_network::state_witness::{ ContractCodeResponseMessage, PartialEncodedContractDeploysMessage, PartialEncodedStateWitnessForwardMessage, PartialEncodedStateWitnessMessage, }; -use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest}; -use near_parameters::RuntimeConfig; +use near_network::types::PeerManagerAdapter; use near_performance_metrics_macros::perf; -use near_primitives::reed_solomon::{ReedSolomonEncoder, ReedSolomonEncoderCache}; -use near_primitives::sharding::ShardChunkHeader; -use near_primitives::stateless_validation::contract_distribution::{ - ChunkContractAccesses, ChunkContractDeploys, CodeBytes, CodeHash, ContractCodeRequest, - ContractCodeResponse, ContractUpdates, MainTransitionKey, PartialEncodedContractDeploys, - PartialEncodedContractDeploysPart, -}; -use near_primitives::stateless_validation::partial_witness::PartialEncodedStateWitness; -use near_primitives::stateless_validation::state_witness::{ - ChunkStateWitness, ChunkStateWitnessAck, EncodedChunkStateWitness, -}; -use near_primitives::stateless_validation::stored_chunk_state_transition_data::StoredChunkStateTransitionData; -use near_primitives::stateless_validation::ChunkProductionKey; -use near_primitives::types::{AccountId, EpochId, ShardId}; -use near_primitives::validator_signer::ValidatorSigner; -use near_store::adapter::trie_store::TrieStoreAdapter; -use near_store::{DBCol, StorageError, TrieDBStorage, TrieStorage}; -use near_vm_runner::{get_contract_cache_key, ContractCode, ContractRuntimeCache}; -use rand::Rng; +use near_primitives::stateless_validation::contract_distribution::ContractUpdates; +use near_primitives::stateless_validation::state_witness::ChunkStateWitness; +use near_primitives::types::ShardId; +use std::sync::Arc; use crate::client_actor::ClientSenderForPartialWitness; -use crate::metrics; -use crate::stateless_validation::state_witness_tracker::ChunkStateWitnessTracker; -use crate::stateless_validation::validate::{ - validate_chunk_contract_accesses, validate_contract_code_request, - validate_partial_encoded_contract_deploys, validate_partial_encoded_state_witness, -}; -use super::encoding::{CONTRACT_DEPLOYS_RATIO_DATA_PARTS, WITNESS_RATIO_DATA_PARTS}; -use super::partial_deploys_tracker::PartialEncodedContractDeploysTracker; -use super::partial_witness_tracker::PartialEncodedStateWitnessTracker; -use near_primitives::utils::compression::CompressedData; - -const PROCESSED_CONTRACT_CODE_REQUESTS_CACHE_SIZE: usize = 30; +use super::partial_witness_actor_v2::{ + PartialWitnessMsg, PartialWitnessSender, PartialWitnessService, +}; pub struct PartialWitnessActor { - /// Adapter to send messages to the network. - network_adapter: PeerManagerAdapter, - /// Validator signer to sign the state witness. This field is mutable and optional. Use with caution! - /// Lock the value of mutable validator signer for the duration of a request to ensure consistency. - /// Please note that the locked value should not be stored anywhere or passed through the thread boundary. - my_signer: MutableValidatorSigner, - epoch_manager: Arc, - runtime: Arc, - /// Tracks the parts of the state witness sent from chunk producers to chunk validators. - partial_witness_tracker: PartialEncodedStateWitnessTracker, - partial_deploys_tracker: PartialEncodedContractDeploysTracker, - /// Tracks a collection of state witnesses sent from chunk producers to chunk validators. - state_witness_tracker: ChunkStateWitnessTracker, - /// Reed Solomon encoder for encoding state witness parts. - /// We keep one wrapper for each length of chunk_validators to avoid re-creating the encoder. - witness_encoders: ReedSolomonEncoderCache, - /// Same as above for contract deploys. - contract_deploys_encoders: ReedSolomonEncoderCache, - compile_contracts_spawner: Arc, - /// AccountId in the key corresponds to the requester (chunk validator). - processed_contract_code_requests: LruCache<(ChunkProductionKey, AccountId), ()>, + tx: PartialWitnessSender, } impl Actor for PartialWitnessActor {} @@ -97,68 +45,55 @@ pub struct PartialWitnessSenderForClient { impl Handler for PartialWitnessActor { #[perf] fn handle(&mut self, msg: DistributeStateWitnessRequest) { - if let Err(err) = self.handle_distribute_state_witness_request(msg) { - tracing::error!(target: "client", ?err, "Failed to handle distribute chunk state witness request"); - } + self.tx.send(PartialWitnessMsg::DistributeStateWitnessRequest(Box::new(msg))).unwrap(); } } impl Handler for PartialWitnessActor { fn handle(&mut self, msg: ChunkStateWitnessAckMessage) { - self.handle_chunk_state_witness_ack(msg.0); + self.tx.send(PartialWitnessMsg::ChunkStateWitnessAckMessage(msg)).unwrap(); } } impl Handler for PartialWitnessActor { fn handle(&mut self, msg: PartialEncodedStateWitnessMessage) { - if let Err(err) = self.handle_partial_encoded_state_witness(msg.0) { - tracing::error!(target: "client", ?err, "Failed to handle PartialEncodedStateWitnessMessage"); - } + self.tx.send(PartialWitnessMsg::PartialEncodedStateWitnessMessage(msg)).unwrap(); } } impl Handler for PartialWitnessActor { fn handle(&mut self, msg: PartialEncodedStateWitnessForwardMessage) { - if let Err(err) = self.handle_partial_encoded_state_witness_forward(msg.0) { - tracing::error!(target: "client", ?err, "Failed to handle PartialEncodedStateWitnessForwardMessage"); - } + self.tx.send(PartialWitnessMsg::PartialEncodedStateWitnessForwardMessage(msg)).unwrap(); } } impl Handler for PartialWitnessActor { fn handle(&mut self, msg: ChunkContractAccessesMessage) { - if let Err(err) = self.handle_chunk_contract_accesses(msg.0) { - tracing::error!(target: "client", ?err, "Failed to handle ChunkContractAccessesMessage"); - } + self.tx.send(PartialWitnessMsg::ChunkContractAccessesMessage(msg)).unwrap(); } } impl Handler for PartialWitnessActor { fn handle(&mut self, msg: PartialEncodedContractDeploysMessage) { - if let Err(err) = self.handle_partial_encoded_contract_deploys(msg.0) { - tracing::error!(target: "client", ?err, "Failed to handle PartialEncodedContractDeploysMessage"); - } + self.tx.send(PartialWitnessMsg::PartialEncodedContractDeploysMessage(msg)).unwrap(); } } impl Handler for PartialWitnessActor { fn handle(&mut self, msg: ContractCodeRequestMessage) { - if let Err(err) = self.handle_contract_code_request(msg.0) { - tracing::error!(target: "client", ?err, "Failed to handle ContractCodeRequestMessage"); - } + self.tx.send(PartialWitnessMsg::ContractCodeRequestMessage(msg)).unwrap(); } } impl Handler for PartialWitnessActor { fn handle(&mut self, msg: ContractCodeResponseMessage) { - if let Err(err) = self.handle_contract_code_response(msg.0) { - tracing::error!(target: "client", ?err, "Failed to handle ContractCodeResponseMessage"); - } + self.tx.send(PartialWitnessMsg::ContractCodeResponseMessage(msg)).unwrap(); } } impl PartialWitnessActor { pub fn new( + rt: Arc, clock: Clock, network_adapter: PeerManagerAdapter, client_sender: ClientSenderForPartialWitness, @@ -166,634 +101,19 @@ impl PartialWitnessActor { epoch_manager: Arc, runtime: Arc, compile_contracts_spawner: Arc, + partial_witness_spawner: Arc, ) -> Self { - let partial_witness_tracker = - PartialEncodedStateWitnessTracker::new(client_sender, epoch_manager.clone()); - Self { + let tx = PartialWitnessService::new( + rt, + clock, network_adapter, + client_sender, my_signer, epoch_manager, - partial_witness_tracker, - partial_deploys_tracker: PartialEncodedContractDeploysTracker::new(), - state_witness_tracker: ChunkStateWitnessTracker::new(clock), runtime, - witness_encoders: ReedSolomonEncoderCache::new(WITNESS_RATIO_DATA_PARTS), - contract_deploys_encoders: ReedSolomonEncoderCache::new( - CONTRACT_DEPLOYS_RATIO_DATA_PARTS, - ), compile_contracts_spawner, - processed_contract_code_requests: LruCache::new( - NonZeroUsize::new(PROCESSED_CONTRACT_CODE_REQUESTS_CACHE_SIZE).unwrap(), - ), - } - } - - fn handle_distribute_state_witness_request( - &mut self, - msg: DistributeStateWitnessRequest, - ) -> Result<(), Error> { - let DistributeStateWitnessRequest { - state_witness, - contract_updates: ContractUpdates { contract_accesses, contract_deploys }, - main_transition_shard_id, - } = msg; - - tracing::debug!( - target: "client", - chunk_hash=?state_witness.chunk_header.chunk_hash(), - "distribute_chunk_state_witness", - ); - - // We send the state-witness and contract-updates in the following order: - // 1. We send the hashes of the contract code accessed (if contract code is excluded from witness and any contracts are called) - // before the state witness in order to allow validators to check and request missing contract code, while waiting for witness parts. - // 2. We send the state witness parts to witness-part owners. - // 3. We send the contract deploys parts to other validators (that do not validate the witness in this turn). This is lower priority - // since the newly-deployed contracts will be needed by other validators in later turns. - - let signer = self.my_validator_signer()?; - let key = state_witness.chunk_production_key(); - let chunk_validators = self - .epoch_manager - .get_chunk_validator_assignments(&key.epoch_id, key.shard_id, key.height_created) - .expect("Chunk validators must be defined") - .ordered_chunk_validators(); - - if !contract_accesses.is_empty() { - self.send_contract_accesses_to_chunk_validators( - key.clone(), - contract_accesses, - MainTransitionKey { - block_hash: state_witness.main_state_transition.block_hash, - shard_id: main_transition_shard_id, - }, - &chunk_validators, - &signer, - ); - } - - let witness_bytes = compress_witness(&state_witness)?; - self.send_state_witness_parts( - key.epoch_id, - &state_witness.chunk_header, - witness_bytes, - &chunk_validators, - &signer, - ); - - if !contract_deploys.is_empty() { - self.send_chunk_contract_deploys_parts(key, contract_deploys)?; - } - Ok(()) - } - - // Function to generate the parts of the state witness and return them as a tuple of chunk_validator and part. - fn generate_state_witness_parts( - &mut self, - epoch_id: EpochId, - chunk_header: &ShardChunkHeader, - witness_bytes: EncodedChunkStateWitness, - chunk_validators: &[AccountId], - signer: &ValidatorSigner, - ) -> Vec<(AccountId, PartialEncodedStateWitness)> { - tracing::debug!( - target: "client", - chunk_hash=?chunk_header.chunk_hash(), - ?chunk_validators, - "generate_state_witness_parts", - ); - - // Break the state witness into parts using Reed Solomon encoding. - let encoder = self.witness_encoders.entry(chunk_validators.len()); - let (parts, encoded_length) = encoder.encode(&witness_bytes); - - chunk_validators - .iter() - .zip_eq(parts) - .enumerate() - .map(|(part_ord, (chunk_validator, part))| { - // It's fine to unwrap part here as we just constructed the parts above and we expect - // all of them to be present. - let partial_witness = PartialEncodedStateWitness::new( - epoch_id, - chunk_header.clone(), - part_ord, - part.unwrap().to_vec(), - encoded_length, - signer, - ); - (chunk_validator.clone(), partial_witness) - }) - .collect_vec() - } - - fn generate_contract_deploys_parts( - &mut self, - key: &ChunkProductionKey, - deploys: ChunkContractDeploys, - ) -> Result, Error> { - let validators = self.ordered_contract_deploys_validators(key)?; - // Note that target validators do not include the chunk producers, and thus in some case - // (eg. tests or small networks) there may be no other validators to send the new contracts to. - if validators.is_empty() { - return Ok(vec![]); - } - - let encoder = self.contract_deploys_encoder(validators.len()); - let (parts, encoded_length) = encoder.encode(&deploys); - let signer = self.my_validator_signer()?; - - Ok(validators - .into_iter() - .zip_eq(parts) - .enumerate() - .map(|(part_ord, (validator, part))| { - let partial_deploys = PartialEncodedContractDeploys::new( - key.clone(), - PartialEncodedContractDeploysPart { - part_ord, - data: part.unwrap().to_vec().into_boxed_slice(), - encoded_length, - }, - &signer, - ); - (validator, partial_deploys) - }) - .collect_vec()) - } - - // Break the state witness into parts and send each part to the corresponding chunk validator owner. - // The chunk validator owner will then forward the part to all other chunk validators. - // Each chunk validator would collect the parts and reconstruct the state witness. - fn send_state_witness_parts( - &mut self, - epoch_id: EpochId, - chunk_header: &ShardChunkHeader, - witness_bytes: EncodedChunkStateWitness, - chunk_validators: &[AccountId], - signer: &ValidatorSigner, - ) { - // Capture these values first, as the sources are consumed before calling record_witness_sent. - let chunk_hash = chunk_header.chunk_hash(); - let witness_size_in_bytes = witness_bytes.size_bytes(); - - // Record time taken to encode the state witness parts. - let shard_id_label = chunk_header.shard_id().to_string(); - let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_TIME - .with_label_values(&[shard_id_label.as_str()]) - .start_timer(); - let validator_witness_tuple = self.generate_state_witness_parts( - epoch_id, - chunk_header, - witness_bytes, - chunk_validators, - signer, - ); - encode_timer.observe_duration(); - - // Record the witness in order to match the incoming acks for measuring round-trip times. - // See process_chunk_state_witness_ack for the handling of the ack messages. - self.state_witness_tracker.record_witness_sent( - chunk_hash, - witness_size_in_bytes, - validator_witness_tuple.len(), - ); - - // Send the parts to the corresponding chunk validator owners. - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple), - )); - } - - /// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part. - fn forward_state_witness_part( - &self, - partial_witness: PartialEncodedStateWitness, - ) -> Result<(), Error> { - let ChunkProductionKey { shard_id, epoch_id, height_created } = - partial_witness.chunk_production_key(); - let chunk_producer = self - .epoch_manager - .get_chunk_producer_info(&ChunkProductionKey { epoch_id, height_created, shard_id })? - .take_account_id(); - - // Forward witness part to chunk validators except the validator that produced the chunk and witness. - let target_chunk_validators = self - .epoch_manager - .get_chunk_validator_assignments(&epoch_id, shard_id, height_created)? - .ordered_chunk_validators() - .into_iter() - .filter(|validator| validator != &chunk_producer) - .collect(); - - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::PartialEncodedStateWitnessForward( - target_chunk_validators, - partial_witness, - ), - )); - Ok(()) - } - - /// Function to handle receiving partial_encoded_state_witness message from chunk producer. - fn handle_partial_encoded_state_witness( - &mut self, - partial_witness: PartialEncodedStateWitness, - ) -> Result<(), Error> { - tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessMessage"); - - let signer = self.my_validator_signer()?; - // Validate the partial encoded state witness and forward the part to all the chunk validators. - if validate_partial_encoded_state_witness( - self.epoch_manager.as_ref(), - &partial_witness, - &signer, - self.runtime.store(), - )? { - self.forward_state_witness_part(partial_witness)?; - } - - Ok(()) - } - - /// Function to handle receiving partial_encoded_state_witness_forward message from chunk producer. - fn handle_partial_encoded_state_witness_forward( - &mut self, - partial_witness: PartialEncodedStateWitness, - ) -> Result<(), Error> { - tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessForwardMessage"); - - let signer = self.my_validator_signer()?; - // Validate the partial encoded state witness and store the partial encoded state witness. - if validate_partial_encoded_state_witness( - self.epoch_manager.as_ref(), - &partial_witness, - &signer, - self.runtime.store(), - )? { - self.partial_witness_tracker.store_partial_encoded_state_witness(partial_witness)?; - } - - Ok(()) - } - - /// Handles partial contract deploy message received from a peer. - /// - /// This message may belong to one of two steps of distributing contract code. In the first step the code is compressed - /// and encoded into parts using Reed Solomon encoding and each part is sent to one of the validators (part owner). - /// See `send_chunk_contract_deploys_parts` for the code implementing this. In the second step each validator (part-owner) - /// forwards the part it receives to other validators. - fn handle_partial_encoded_contract_deploys( - &mut self, - partial_deploys: PartialEncodedContractDeploys, - ) -> Result<(), Error> { - tracing::debug!(target: "client", ?partial_deploys, "Receive PartialEncodedContractDeploys"); - if !validate_partial_encoded_contract_deploys( - self.epoch_manager.as_ref(), - &partial_deploys, - self.runtime.store(), - )? { - return Ok(()); - } - if self.partial_deploys_tracker.already_processed(&partial_deploys) { - return Ok(()); - } - let key = partial_deploys.chunk_production_key().clone(); - let validators = self.ordered_contract_deploys_validators(&key)?; - if validators.is_empty() { - // Note that target validators do not include the chunk producers, and thus in some case - // (eg. tests or small networks) there may be no other validators to send the new contracts to. - // In such case, the message we are handling here should not be sent in the first place, - // unless there is a bug or adversarial behavior that sends the message. - debug_assert!(false, "No target validators, we must not receive this message"); - return Ok(()); - } - - // Forward to other validators if the part received is my part - let signer = self.my_validator_signer()?; - let my_account_id = signer.validator_id(); - let Some(my_part_ord) = validators.iter().position(|validator| validator == my_account_id) - else { - tracing::warn!( - target: "client", - ?key, - "Validator is not a part of contract deploys distribution" - ); - return Ok(()); - }; - if partial_deploys.part().part_ord == my_part_ord { - let other_validators = validators - .iter() - .filter(|&validator| validator != my_account_id) - .cloned() - .collect_vec(); - if !other_validators.is_empty() { - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::PartialEncodedContractDeploys( - other_validators, - partial_deploys.clone(), - ), - )); - } - } - - // Store part - let encoder = self.contract_deploys_encoder(validators.len()); - if let Some(deploys) = self - .partial_deploys_tracker - .store_partial_encoded_contract_deploys(partial_deploys, encoder)? - { - let contracts = match deploys.decompress_contracts() { - Ok(contracts) => contracts, - Err(err) => { - tracing::warn!( - target: "client", - ?err, - ?key, - "Failed to decompress deployed contracts." - ); - return Ok(()); - } - }; - let contract_codes = contracts.into_iter().map(|contract| contract.into()).collect(); - let runtime = self.runtime.clone(); - self.compile_contracts_spawner.spawn("precompile_deployed_contracts", move || { - if let Err(err) = runtime.precompile_contracts(&key.epoch_id, contract_codes) { - tracing::error!( - target: "client", - ?err, - ?key, - "Failed to precompile deployed contracts." - ); - } - }); - } - - Ok(()) - } - - /// Handles the state witness ack message from the chunk validator. - /// It computes the round-trip time between sending the state witness and receiving - /// the ack message and updates the corresponding metric with it. - /// Currently we do not raise an error for handling of witness-ack messages, - /// as it is used only for tracking some networking metrics. - fn handle_chunk_state_witness_ack(&mut self, witness_ack: ChunkStateWitnessAck) { - self.state_witness_tracker.on_witness_ack_received(witness_ack); - } - - /// Handles contract code accesses message from chunk producer. - /// This is sent in parallel to a chunk state witness and contains the hashes - /// of the contract code accessed when applying the previous chunk of the witness. - fn handle_chunk_contract_accesses( - &mut self, - accesses: ChunkContractAccesses, - ) -> Result<(), Error> { - let signer = self.my_validator_signer()?; - if !validate_chunk_contract_accesses( - self.epoch_manager.as_ref(), - &accesses, - &signer, - self.runtime.store(), - )? { - return Ok(()); - } - let key = accesses.chunk_production_key(); - let contracts_cache = self.runtime.compiled_contract_cache(); - let runtime_config = self - .runtime - .get_runtime_config(self.epoch_manager.get_epoch_protocol_version(&key.epoch_id)?)?; - let missing_contract_hashes = HashSet::from_iter( - accesses - .contracts() - .iter() - .filter(|&hash| { - !contracts_cache_contains_contract(contracts_cache, hash, &runtime_config) - }) - .cloned(), + partial_witness_spawner, ); - if missing_contract_hashes.is_empty() { - return Ok(()); - } - self.partial_witness_tracker - .store_accessed_contract_hashes(key.clone(), missing_contract_hashes.clone())?; - let random_chunk_producer = { - let mut chunk_producers = self - .epoch_manager - .get_epoch_chunk_producers_for_shard(&key.epoch_id, key.shard_id)?; - chunk_producers.swap_remove(rand::thread_rng().gen_range(0..chunk_producers.len())) - }; - let request = ContractCodeRequest::new( - key.clone(), - missing_contract_hashes, - accesses.main_transition().clone(), - &signer, - ); - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::ContractCodeRequest(random_chunk_producer, request), - )); - Ok(()) - } - - /// Sends the contract accesses to the same chunk validators - /// (except for the chunk producers that track the same shard), - /// which will receive the state witness for the new chunk. - fn send_contract_accesses_to_chunk_validators( - &self, - key: ChunkProductionKey, - contract_accesses: HashSet, - main_transition: MainTransitionKey, - chunk_validators: &[AccountId], - my_signer: &ValidatorSigner, - ) { - let chunk_producers: HashSet = self - .epoch_manager - .get_epoch_chunk_producers_for_shard(&key.epoch_id, key.shard_id) - .expect("Chunk producers must be defined") - .into_iter() - .collect(); - - // Exclude chunk producers that track the same shard from the target list, since they track the state that contains the respective code. - let target_chunk_validators = chunk_validators - .iter() - .filter(|validator| !chunk_producers.contains(*validator)) - .cloned() - .collect(); - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::ChunkContractAccesses( - target_chunk_validators, - ChunkContractAccesses::new(key, contract_accesses, main_transition, my_signer), - ), - )); + Self { tx } } - - /// Retrieves the code for the given contract hashes and distributes them to validator in parts. - /// - /// This implements the first step of distributing contract code to validators where the contract codes - /// are compressed and encoded into parts using Reed Solomon encoding, and then each part is sent to - /// one of the validators (part-owner). Second step of the distribution, where each validator (part-owner) - /// forwards the part it receives is implemented in `handle_partial_encoded_contract_deploys`. - fn send_chunk_contract_deploys_parts( - &mut self, - key: ChunkProductionKey, - contract_codes: Vec, - ) -> Result<(), Error> { - let contracts = contract_codes.into_iter().map(|contract| contract.into()).collect(); - let compressed_deploys = ChunkContractDeploys::compress_contracts(&contracts)?; - let validator_parts = self.generate_contract_deploys_parts(&key, compressed_deploys)?; - for (part_owner, deploys_part) in validator_parts.into_iter() { - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::PartialEncodedContractDeploys(vec![part_owner], deploys_part), - )); - } - Ok(()) - } - - /// Handles contract code requests message from chunk validators. - /// As response to this message, sends the contract code requested to - /// the requesting chunk validator for the given hashes of the contract code. - fn handle_contract_code_request(&mut self, request: ContractCodeRequest) -> Result<(), Error> { - if !validate_contract_code_request( - self.epoch_manager.as_ref(), - &request, - self.runtime.store(), - )? { - return Ok(()); - } - - let key = request.chunk_production_key(); - let processed_requests_key = (key.clone(), request.requester().clone()); - if self.processed_contract_code_requests.contains(&processed_requests_key) { - tracing::warn!( - target: "client", - ?processed_requests_key, - "Contract code request from this account was already processed" - ); - return Ok(()); - } - self.processed_contract_code_requests.push(processed_requests_key, ()); - - let _timer = near_chain::stateless_validation::metrics::PROCESS_CONTRACT_CODE_REQUEST_TIME - .with_label_values(&[&key.shard_id.to_string()]) - .start_timer(); - - let main_transition_key = request.main_transition(); - let Some(transition_data) = - self.runtime.store().get_ser::( - DBCol::StateTransitionData, - &near_primitives::utils::get_block_shard_id( - &main_transition_key.block_hash, - main_transition_key.shard_id, - ), - )? - else { - tracing::warn!( - target: "client", - ?key, - ?main_transition_key, - "Missing state transition data" - ); - return Ok(()); - }; - let valid_accesses: HashSet = - transition_data.contract_accesses().iter().cloned().collect(); - - let storage = TrieDBStorage::new( - TrieStoreAdapter::new(self.runtime.store().clone()), - self.epoch_manager.shard_id_to_uid( - main_transition_key.shard_id, - &self.epoch_manager.get_epoch_id(&main_transition_key.block_hash)?, - )?, - ); - let mut contracts = Vec::new(); - for contract_hash in request.contracts() { - if !valid_accesses.contains(contract_hash) { - tracing::warn!( - target: "client", - ?key, - ?contract_hash, - "Requested contract code was not accessed when applying the chunk" - ); - return Ok(()); - } - match storage.retrieve_raw_bytes(&contract_hash.0) { - Ok(bytes) => contracts.push(CodeBytes(bytes)), - Err(StorageError::MissingTrieValue(_, _)) => { - tracing::warn!( - target: "client", - ?contract_hash, - chunk_production_key = ?key, - "Requested contract hash is not present in the storage" - ); - return Ok(()); - } - Err(err) => return Err(err.into()), - } - } - let response = ContractCodeResponse::encode(key.clone(), &contracts)?; - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::ContractCodeResponse(request.requester().clone(), response), - )); - Ok(()) - } - - /// Handles contract code responses message from chunk producer. - fn handle_contract_code_response( - &mut self, - response: ContractCodeResponse, - ) -> Result<(), Error> { - let key = response.chunk_production_key().clone(); - let contracts = response.decompress_contracts()?; - self.partial_witness_tracker.store_accessed_contract_codes(key, contracts) - } - - fn my_validator_signer(&self) -> Result, Error> { - self.my_signer.get().ok_or_else(|| Error::NotAValidator("not a validator".to_owned())) - } - - fn contract_deploys_encoder(&mut self, validators_count: usize) -> Arc { - self.contract_deploys_encoders.entry(validators_count) - } - - fn ordered_contract_deploys_validators( - &mut self, - key: &ChunkProductionKey, - ) -> Result, Error> { - let chunk_producers = HashSet::::from_iter( - self.epoch_manager.get_epoch_chunk_producers_for_shard(&key.epoch_id, key.shard_id)?, - ); - let mut validators = self - .epoch_manager - .get_epoch_all_validators(&key.epoch_id)? - .into_iter() - .filter(|stake| !chunk_producers.contains(stake.account_id())) - .map(|stake| stake.account_id().clone()) - .collect::>(); - validators.sort(); - Ok(validators) - } -} - -fn compress_witness(witness: &ChunkStateWitness) -> Result { - let shard_id_label = witness.chunk_header.shard_id().to_string(); - let encode_timer = near_chain::stateless_validation::metrics::CHUNK_STATE_WITNESS_ENCODE_TIME - .with_label_values(&[shard_id_label.as_str()]) - .start_timer(); - let (witness_bytes, raw_witness_size) = EncodedChunkStateWitness::encode(witness)?; - encode_timer.observe_duration(); - - near_chain::stateless_validation::metrics::record_witness_size_metrics( - raw_witness_size, - witness_bytes.size_bytes(), - witness, - ); - Ok(witness_bytes) -} - -fn contracts_cache_contains_contract( - cache: &dyn ContractRuntimeCache, - contract_hash: &CodeHash, - runtime_config: &RuntimeConfig, -) -> bool { - let cache_key = get_contract_cache_key(contract_hash.0, &runtime_config.wasm_config); - cache.memory_cache().contains(cache_key) || cache.has(&cache_key).is_ok_and(|has| has) } diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs new file mode 100644 index 00000000000..0e087e319e0 --- /dev/null +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor_v2.rs @@ -0,0 +1,868 @@ +use std::collections::HashSet; +use std::num::NonZeroUsize; +use std::sync::Arc; + +use itertools::Itertools; +use lru::LruCache; +use near_async::futures::{ + AsyncComputationSpawner, AsyncComputationSpawnerExt, FutureSpawnerExt, + TokioRuntimeFutureSpawner, +}; +use near_async::messaging::{CanSend, Sender}; +use near_async::time::Clock; +use near_async::{MultiSend, MultiSenderFrom}; +use near_chain::types::RuntimeAdapter; +use near_chain::Error; +use near_chain_configs::MutableValidatorSigner; +use near_epoch_manager::EpochManagerAdapter; +use near_network::state_witness::{ + ChunkContractAccessesMessage, ChunkStateWitnessAckMessage, ContractCodeRequestMessage, + ContractCodeResponseMessage, PartialEncodedContractDeploysMessage, + PartialEncodedStateWitnessForwardMessage, PartialEncodedStateWitnessMessage, +}; +use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest}; +use near_parameters::RuntimeConfig; +use near_primitives::reed_solomon::{ReedSolomonEncoder, ReedSolomonEncoderCache}; +use near_primitives::sharding::ShardChunkHeader; +use near_primitives::stateless_validation::contract_distribution::{ + ChunkContractAccesses, ChunkContractDeploys, CodeBytes, CodeHash, ContractCodeRequest, + ContractCodeResponse, ContractUpdates, MainTransitionKey, PartialEncodedContractDeploys, + PartialEncodedContractDeploysPart, +}; +use near_primitives::stateless_validation::partial_witness::PartialEncodedStateWitness; +use near_primitives::stateless_validation::state_witness::{ + ChunkStateWitness, ChunkStateWitnessAck, EncodedChunkStateWitness, +}; +use near_primitives::stateless_validation::stored_chunk_state_transition_data::StoredChunkStateTransitionData; +use near_primitives::stateless_validation::ChunkProductionKey; +use near_primitives::types::{AccountId, EpochId}; +use near_primitives::validator_signer::ValidatorSigner; +use near_store::adapter::trie_store::TrieStoreAdapter; +use near_store::{DBCol, StorageError, TrieDBStorage, TrieStorage}; +use near_vm_runner::{get_contract_cache_key, ContractCode, ContractRuntimeCache}; +use rand::Rng; + +use crate::client_actor::ClientSenderForPartialWitness; +use crate::stateless_validation::state_witness_tracker::ChunkStateWitnessTracker; +use crate::stateless_validation::validate::{ + validate_chunk_contract_accesses, validate_contract_code_request, + validate_partial_encoded_contract_deploys, validate_partial_encoded_state_witness, +}; +use crate::{metrics, DistributeStateWitnessRequest}; + +use super::encoding::{CONTRACT_DEPLOYS_RATIO_DATA_PARTS, WITNESS_RATIO_DATA_PARTS}; +use super::partial_deploys_tracker::PartialEncodedContractDeploysTracker; +use super::partial_witness_tracker::PartialEncodedStateWitnessTracker; +use near_primitives::utils::compression::CompressedData; +use std::sync::mpsc::{self, Receiver, SendError, Sender as MpscSender}; + +const PROCESSED_CONTRACT_CODE_REQUESTS_CACHE_SIZE: usize = 30; + +#[derive(Debug)] +pub enum PartialWitnessMsg { + DistributeStateWitnessRequest(Box), + ChunkStateWitnessAckMessage(ChunkStateWitnessAckMessage), + PartialEncodedStateWitnessMessage(PartialEncodedStateWitnessMessage), + PartialEncodedStateWitnessForwardMessage(PartialEncodedStateWitnessForwardMessage), + ChunkContractAccessesMessage(ChunkContractAccessesMessage), + PartialEncodedContractDeploysMessage(PartialEncodedContractDeploysMessage), + ContractCodeRequestMessage(ContractCodeRequestMessage), + ContractCodeResponseMessage(ContractCodeResponseMessage), +} + +#[derive(Clone)] +pub struct PartialWitnessSender(MpscSender); + +impl PartialWitnessSender { + /// Send a message to the Partial Witness Service (async). + #[allow(clippy::result_large_err)] + pub fn send(&self, msg: PartialWitnessMsg) -> Result<(), SendError> { + self.0.send(msg) + } +} + +pub struct PartialWitnessService { + rx: Receiver, + /// Adapter to send messages to the network. + network_adapter: PeerManagerAdapter, + /// Validator signer to sign the state witness. This field is mutable and optional. Use with caution! + /// Lock the value of mutable validator signer for the duration of a request to ensure consistency. + /// Please note that the locked value should not be stored anywhere or passed through the thread boundary. + my_signer: MutableValidatorSigner, + epoch_manager: Arc, + runtime: Arc, + /// Tracks the parts of the state witness sent from chunk producers to chunk validators. + partial_witness_tracker: Arc, + partial_deploys_tracker: PartialEncodedContractDeploysTracker, + /// Tracks a collection of state witnesses sent from chunk producers to chunk validators. + state_witness_tracker: ChunkStateWitnessTracker, + /// Reed Solomon encoder for encoding state witness parts. + /// We keep one wrapper for each length of chunk_validators to avoid re-creating the encoder. + witness_encoders: ReedSolomonEncoderCache, + /// Same as above for contract deploys. + contract_deploys_encoders: ReedSolomonEncoderCache, + compile_contracts_spawner: Arc, + partial_witness_spawner: Arc, + /// AccountId in the key corresponds to the requester (chunk validator). + processed_contract_code_requests: LruCache<(ChunkProductionKey, AccountId), ()>, +} + +#[derive(Clone, MultiSend, MultiSenderFrom)] +pub struct PartialWitnessSenderForClient { + pub distribute_chunk_state_witness: Sender, +} + +impl PartialWitnessService { + pub fn new( + rt: Arc, + clock: Clock, + network_adapter: PeerManagerAdapter, + client_sender: ClientSenderForPartialWitness, + my_signer: MutableValidatorSigner, + epoch_manager: Arc, + runtime: Arc, + compile_contracts_spawner: Arc, + partial_witness_spawner: Arc, + ) -> PartialWitnessSender { + let (tx, rx) = mpsc::channel(); + + let partial_witness_tracker = + Arc::new(PartialEncodedStateWitnessTracker::new(client_sender, epoch_manager.clone())); + + let actor = Self { + rx, + network_adapter, + my_signer, + epoch_manager, + runtime, + partial_witness_tracker, + partial_deploys_tracker: PartialEncodedContractDeploysTracker::new(), + state_witness_tracker: ChunkStateWitnessTracker::new(clock), + witness_encoders: ReedSolomonEncoderCache::new(WITNESS_RATIO_DATA_PARTS), + contract_deploys_encoders: ReedSolomonEncoderCache::new( + CONTRACT_DEPLOYS_RATIO_DATA_PARTS, + ), + compile_contracts_spawner, + partial_witness_spawner, + processed_contract_code_requests: LruCache::new( + NonZeroUsize::new(PROCESSED_CONTRACT_CODE_REQUESTS_CACHE_SIZE).unwrap(), + ), + }; + + rt.spawn("PartialWitnessService", async move { + actor.run().await.expect("Failed to run PartialWitnessActor"); + }); + + let sender = PartialWitnessSender(tx); + sender + } + + /// Main async loop processing all incoming PartialWitnessMsg. + pub async fn run(mut self) -> Result<(), Error> { + while let Ok(msg) = self.rx.recv() { + match msg { + PartialWitnessMsg::DistributeStateWitnessRequest(req) => { + if let Err(err) = self.handle_distribute_state_witness_request(req).await { + tracing::error!(target: "client", ?err, "Failed to handle distribute chunk state witness request"); + } + } + + PartialWitnessMsg::ChunkStateWitnessAckMessage(msg) => { + self.handle_chunk_state_witness_ack(msg.0).await; + } + + PartialWitnessMsg::PartialEncodedStateWitnessMessage(msg) => { + if let Err(err) = self.handle_partial_encoded_state_witness(msg.0).await { + tracing::error!(target: "client", ?err, "Failed to handle PartialEncodedStateWitnessMessage"); + } + } + + PartialWitnessMsg::PartialEncodedStateWitnessForwardMessage(msg) => { + if let Err(err) = self.handle_partial_encoded_state_witness_forward(msg.0).await + { + tracing::error!(target: "client", ?err, "Failed to handle PartialEncodedStateWitnessForwardMessage"); + } + } + + PartialWitnessMsg::ChunkContractAccessesMessage(msg) => { + if let Err(err) = self.handle_chunk_contract_accesses(msg.0).await { + tracing::error!(target: "client", ?err, "Failed to handle ChunkContractAccessesMessage"); + } + } + + PartialWitnessMsg::PartialEncodedContractDeploysMessage(msg) => { + if let Err(err) = self.handle_partial_encoded_contract_deploys(msg.0).await { + tracing::error!(target: "client", ?err, "Failed to handle PartialEncodedContractDeploysMessage"); + } + } + + PartialWitnessMsg::ContractCodeRequestMessage(msg) => { + if let Err(err) = self.handle_contract_code_request(msg.0).await { + tracing::error!(target: "client", ?err, "Failed to handle ContractCodeRequestMessage"); + } + } + + PartialWitnessMsg::ContractCodeResponseMessage(msg) => { + if let Err(err) = self.handle_contract_code_response(msg.0).await { + tracing::error!(target: "client", ?err, "Failed to handle ContractCodeResponseMessage"); + } + } + } + } + + Ok(()) + } + + async fn handle_distribute_state_witness_request( + &mut self, + msg: Box, + ) -> Result<(), Error> { + let DistributeStateWitnessRequest { + state_witness, + contract_updates: ContractUpdates { contract_accesses, contract_deploys }, + main_transition_shard_id, + } = *msg; + + tracing::debug!( + target: "client", + chunk_hash=?state_witness.chunk_header.chunk_hash(), + "distribute_chunk_state_witness", + ); + + // We send the state-witness and contract-updates in the following order: + // 1. We send the hashes of the contract code accessed (if contract code is excluded from witness and any contracts are called) + // before the state witness in order to allow validators to check and request missing contract code, while waiting for witness parts. + // 2. We send the state witness parts to witness-part owners. + // 3. We send the contract deploys parts to other validators (that do not validate the witness in this turn). This is lower priority + // since the newly-deployed contracts will be needed by other validators in later turns. + + let signer = self.my_validator_signer()?; + let key = state_witness.chunk_production_key(); + let chunk_validators = self + .epoch_manager + .get_chunk_validator_assignments(&key.epoch_id, key.shard_id, key.height_created) + .expect("Chunk validators must be defined") + .ordered_chunk_validators(); + + if !contract_accesses.is_empty() { + self.send_contract_accesses_to_chunk_validators( + key.clone(), + contract_accesses, + MainTransitionKey { + block_hash: state_witness.main_state_transition.block_hash, + shard_id: main_transition_shard_id, + }, + &chunk_validators, + &signer, + ); + } + + let witness_bytes = compress_witness(&state_witness)?; + self.send_state_witness_parts( + key.epoch_id, + &state_witness.chunk_header, + witness_bytes, + &chunk_validators, + &signer, + ); + + if !contract_deploys.is_empty() { + self.send_chunk_contract_deploys_parts(key, contract_deploys)?; + } + Ok(()) + } + + // Function to generate the parts of the state witness and return them as a tuple of chunk_validator and part. + fn generate_state_witness_parts( + &mut self, + epoch_id: EpochId, + chunk_header: &ShardChunkHeader, + witness_bytes: EncodedChunkStateWitness, + chunk_validators: &[AccountId], + signer: &ValidatorSigner, + ) -> Vec<(AccountId, PartialEncodedStateWitness)> { + tracing::debug!( + target: "client", + chunk_hash=?chunk_header.chunk_hash(), + ?chunk_validators, + "generate_state_witness_parts", + ); + + // Break the state witness into parts using Reed Solomon encoding. + let encoder = self.witness_encoders.entry(chunk_validators.len()); + let (parts, encoded_length) = encoder.encode(&witness_bytes); + + chunk_validators + .iter() + .zip_eq(parts) + .enumerate() + .map(|(part_ord, (chunk_validator, part))| { + // It's fine to unwrap part here as we just constructed the parts above and we expect + // all of them to be present. + let partial_witness = PartialEncodedStateWitness::new( + epoch_id, + chunk_header.clone(), + part_ord, + part.unwrap().to_vec(), + encoded_length, + signer, + ); + (chunk_validator.clone(), partial_witness) + }) + .collect_vec() + } + + fn generate_contract_deploys_parts( + &mut self, + key: &ChunkProductionKey, + deploys: ChunkContractDeploys, + ) -> Result, Error> { + let validators = self.ordered_contract_deploys_validators(key)?; + // Note that target validators do not include the chunk producers, and thus in some case + // (eg. tests or small networks) there may be no other validators to send the new contracts to. + if validators.is_empty() { + return Ok(vec![]); + } + + let encoder = self.contract_deploys_encoder(validators.len()); + let (parts, encoded_length) = encoder.encode(&deploys); + let signer = self.my_validator_signer()?; + + Ok(validators + .into_iter() + .zip_eq(parts) + .enumerate() + .map(|(part_ord, (validator, part))| { + let partial_deploys = PartialEncodedContractDeploys::new( + key.clone(), + PartialEncodedContractDeploysPart { + part_ord, + data: part.unwrap().to_vec().into_boxed_slice(), + encoded_length, + }, + &signer, + ); + (validator, partial_deploys) + }) + .collect_vec()) + } + + // Break the state witness into parts and send each part to the corresponding chunk validator owner. + // The chunk validator owner will then forward the part to all other chunk validators. + // Each chunk validator would collect the parts and reconstruct the state witness. + fn send_state_witness_parts( + &mut self, + epoch_id: EpochId, + chunk_header: &ShardChunkHeader, + witness_bytes: EncodedChunkStateWitness, + chunk_validators: &[AccountId], + signer: &ValidatorSigner, + ) { + // Capture these values first, as the sources are consumed before calling record_witness_sent. + let chunk_hash = chunk_header.chunk_hash(); + let witness_size_in_bytes = witness_bytes.size_bytes(); + + // Record time taken to encode the state witness parts. + let shard_id_label = chunk_header.shard_id().to_string(); + let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_TIME + .with_label_values(&[shard_id_label.as_str()]) + .start_timer(); + let validator_witness_tuple = self.generate_state_witness_parts( + epoch_id, + chunk_header, + witness_bytes, + chunk_validators, + signer, + ); + encode_timer.observe_duration(); + + // Record the witness in order to match the incoming acks for measuring round-trip times. + // See process_chunk_state_witness_ack for the handling of the ack messages. + self.state_witness_tracker.record_witness_sent( + chunk_hash, + witness_size_in_bytes, + validator_witness_tuple.len(), + ); + + // Send the parts to the corresponding chunk validator owners. + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple), + )); + } + + /// Function to handle receiving partial_encoded_state_witness message from chunk producer. + async fn handle_partial_encoded_state_witness( + &mut self, + partial_witness: PartialEncodedStateWitness, + ) -> Result<(), Error> { + tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessMessage"); + let signer = self.my_validator_signer()?; + let epoch_manager = self.epoch_manager.clone(); + let runtime_adapter = self.runtime.clone(); + + let ChunkProductionKey { shard_id, epoch_id, height_created } = + partial_witness.chunk_production_key(); + + let chunk_producer = self + .epoch_manager + .get_chunk_producer_info(&ChunkProductionKey { epoch_id, height_created, shard_id })? + .take_account_id(); + + // Forward witness part to chunk validators except the validator that produced the chunk and witness. + let target_chunk_validators = self + .epoch_manager + .get_chunk_validator_assignments(&epoch_id, shard_id, height_created)? + .ordered_chunk_validators() + .into_iter() + .filter(|validator| validator != &chunk_producer) + .collect(); + + let pw_clone = partial_witness.clone(); + let validation = tokio::task::spawn_blocking(move || { + validate_partial_encoded_state_witness( + epoch_manager.as_ref(), + &pw_clone, + &signer, + runtime_adapter.store(), + ) + }) + .await + .expect("Failed to validate partial encoded state witness"); + + // Validate the partial encoded state witness and forward the part to all the chunk validators. + match validation { + Ok(true) => { + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedStateWitnessForward( + target_chunk_validators, + partial_witness, + ), + )); + } + Ok(false) => { + tracing::warn!( + target: "client", + "Received invalid partial encoded state witness" + ); + } + Err(err) => { + tracing::warn!( + target: "client", + "Encountered error during validation: {}", + err + ); + } + } + + Ok(()) + } + + /// Function to handle receiving partial_encoded_state_witness_forward message from chunk producer. + async fn handle_partial_encoded_state_witness_forward( + &mut self, + partial_witness: PartialEncodedStateWitness, + ) -> Result<(), Error> { + tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessForwardMessage"); + + let signer = self.my_validator_signer()?; + let partial_witness_tracker = self.partial_witness_tracker.clone(); + let epoch_manager = self.epoch_manager.clone(); + let runtime_adapter = self.runtime.clone(); + self.partial_witness_spawner.spawn( + "handle_partial_encoded_state_witness_forward", + move || { + // Validate the partial encoded state witness and store the partial encoded state witness. + match validate_partial_encoded_state_witness( + epoch_manager.as_ref(), + &partial_witness, + &signer, + runtime_adapter.store(), + ) { + Ok(true) => { + if let Err(err) = partial_witness_tracker.store_partial_encoded_state_witness(partial_witness) { + tracing::error!(target: "client", "Failed to store partial encoded state witness: {}", err); + } + } + Ok(false) => { + tracing::warn!( + target: "client", + "Received invalid partial encoded state witness" + ); + } + Err(err) => { + tracing::warn!( + target: "client", + "Encountered error during validation: {}", + err + ); + } + } + }, + ); + + Ok(()) + } + + /// Handles partial contract deploy message received from a peer. + /// + /// This message may belong to one of two steps of distributing contract code. In the first step the code is compressed + /// and encoded into parts using Reed Solomon encoding and each part is sent to one of the validators (part owner). + /// See `send_chunk_contract_deploys_parts` for the code implementing this. In the second step each validator (part-owner) + /// forwards the part it receives to other validators. + async fn handle_partial_encoded_contract_deploys( + &mut self, + partial_deploys: PartialEncodedContractDeploys, + ) -> Result<(), Error> { + tracing::debug!(target: "client", ?partial_deploys, "Receive PartialEncodedContractDeploys"); + if !validate_partial_encoded_contract_deploys( + self.epoch_manager.as_ref(), + &partial_deploys, + self.runtime.store(), + )? { + return Ok(()); + } + if self.partial_deploys_tracker.already_processed(&partial_deploys) { + return Ok(()); + } + let key = partial_deploys.chunk_production_key().clone(); + let validators = self.ordered_contract_deploys_validators(&key)?; + if validators.is_empty() { + // Note that target validators do not include the chunk producers, and thus in some case + // (eg. tests or small networks) there may be no other validators to send the new contracts to. + // In such case, the message we are handling here should not be sent in the first place, + // unless there is a bug or adversarial behavior that sends the message. + debug_assert!(false, "No target validators, we must not receive this message"); + return Ok(()); + } + + // Forward to other validators if the part received is my part + let signer = self.my_validator_signer()?; + let my_account_id = signer.validator_id(); + let Some(my_part_ord) = validators.iter().position(|validator| validator == my_account_id) + else { + tracing::warn!( + target: "client", + ?key, + "Validator is not a part of contract deploys distribution" + ); + return Ok(()); + }; + if partial_deploys.part().part_ord == my_part_ord { + let other_validators = validators + .iter() + .filter(|&validator| validator != my_account_id) + .cloned() + .collect_vec(); + if !other_validators.is_empty() { + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedContractDeploys( + other_validators, + partial_deploys.clone(), + ), + )); + } + } + + // Store part + let encoder = self.contract_deploys_encoder(validators.len()); + if let Some(deploys) = self + .partial_deploys_tracker + .store_partial_encoded_contract_deploys(partial_deploys, encoder)? + { + let contracts = match deploys.decompress_contracts() { + Ok(contracts) => contracts, + Err(err) => { + tracing::warn!( + target: "client", + ?err, + ?key, + "Failed to decompress deployed contracts." + ); + return Ok(()); + } + }; + let contract_codes = contracts.into_iter().map(|contract| contract.into()).collect(); + let runtime = self.runtime.clone(); + self.compile_contracts_spawner.spawn("precompile_deployed_contracts", move || { + if let Err(err) = runtime.precompile_contracts(&key.epoch_id, contract_codes) { + tracing::error!( + target: "client", + ?err, + ?key, + "Failed to precompile deployed contracts." + ); + } + }); + } + + Ok(()) + } + + /// Handles the state witness ack message from the chunk validator. + /// It computes the round-trip time between sending the state witness and receiving + /// the ack message and updates the corresponding metric with it. + /// Currently we do not raise an error for handling of witness-ack messages, + /// as it is used only for tracking some networking metrics. + async fn handle_chunk_state_witness_ack(&mut self, witness_ack: ChunkStateWitnessAck) { + self.state_witness_tracker.on_witness_ack_received(witness_ack); + } + + /// Handles contract code accesses message from chunk producer. + /// This is sent in parallel to a chunk state witness and contains the hashes + /// of the contract code accessed when applying the previous chunk of the witness. + async fn handle_chunk_contract_accesses( + &mut self, + accesses: ChunkContractAccesses, + ) -> Result<(), Error> { + let signer = self.my_validator_signer()?; + if !validate_chunk_contract_accesses( + self.epoch_manager.as_ref(), + &accesses, + &signer, + self.runtime.store(), + )? { + return Ok(()); + } + let key = accesses.chunk_production_key(); + let contracts_cache = self.runtime.compiled_contract_cache(); + let runtime_config = self + .runtime + .get_runtime_config(self.epoch_manager.get_epoch_protocol_version(&key.epoch_id)?)?; + let missing_contract_hashes = HashSet::from_iter( + accesses + .contracts() + .iter() + .filter(|&hash| { + !contracts_cache_contains_contract(contracts_cache, hash, &runtime_config) + }) + .cloned(), + ); + if missing_contract_hashes.is_empty() { + return Ok(()); + } + self.partial_witness_tracker + .store_accessed_contract_hashes(key.clone(), missing_contract_hashes.clone())?; + let random_chunk_producer = { + let mut chunk_producers = self + .epoch_manager + .get_epoch_chunk_producers_for_shard(&key.epoch_id, key.shard_id)?; + chunk_producers.swap_remove(rand::thread_rng().gen_range(0..chunk_producers.len())) + }; + let request = ContractCodeRequest::new( + key.clone(), + missing_contract_hashes, + accesses.main_transition().clone(), + &signer, + ); + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::ContractCodeRequest(random_chunk_producer, request), + )); + Ok(()) + } + + /// Sends the contract accesses to the same chunk validators + /// (except for the chunk producers that track the same shard), + /// which will receive the state witness for the new chunk. + fn send_contract_accesses_to_chunk_validators( + &self, + key: ChunkProductionKey, + contract_accesses: HashSet, + main_transition: MainTransitionKey, + chunk_validators: &[AccountId], + my_signer: &ValidatorSigner, + ) { + let chunk_producers: HashSet = self + .epoch_manager + .get_epoch_chunk_producers_for_shard(&key.epoch_id, key.shard_id) + .expect("Chunk producers must be defined") + .into_iter() + .collect(); + + // Exclude chunk producers that track the same shard from the target list, since they track the state that contains the respective code. + let target_chunk_validators = chunk_validators + .iter() + .filter(|validator| !chunk_producers.contains(*validator)) + .cloned() + .collect(); + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::ChunkContractAccesses( + target_chunk_validators, + ChunkContractAccesses::new(key, contract_accesses, main_transition, my_signer), + ), + )); + } + + /// Retrieves the code for the given contract hashes and distributes them to validator in parts. + /// + /// This implements the first step of distributing contract code to validators where the contract codes + /// are compressed and encoded into parts using Reed Solomon encoding, and then each part is sent to + /// one of the validators (part-owner). Second step of the distribution, where each validator (part-owner) + /// forwards the part it receives is implemented in `handle_partial_encoded_contract_deploys`. + fn send_chunk_contract_deploys_parts( + &mut self, + key: ChunkProductionKey, + contract_codes: Vec, + ) -> Result<(), Error> { + let contracts = contract_codes.into_iter().map(|contract| contract.into()).collect(); + let compressed_deploys = ChunkContractDeploys::compress_contracts(&contracts)?; + let validator_parts = self.generate_contract_deploys_parts(&key, compressed_deploys)?; + for (part_owner, deploys_part) in validator_parts.into_iter() { + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedContractDeploys(vec![part_owner], deploys_part), + )); + } + Ok(()) + } + + /// Handles contract code requests message from chunk validators. + /// As response to this message, sends the contract code requested to + /// the requesting chunk validator for the given hashes of the contract code. + async fn handle_contract_code_request( + &mut self, + request: ContractCodeRequest, + ) -> Result<(), Error> { + if !validate_contract_code_request( + self.epoch_manager.as_ref(), + &request, + self.runtime.store(), + )? { + return Ok(()); + } + + let key = request.chunk_production_key(); + let processed_requests_key = (key.clone(), request.requester().clone()); + if self.processed_contract_code_requests.contains(&processed_requests_key) { + tracing::warn!( + target: "client", + ?processed_requests_key, + "Contract code request from this account was already processed" + ); + return Ok(()); + } + self.processed_contract_code_requests.push(processed_requests_key, ()); + + let _timer = near_chain::stateless_validation::metrics::PROCESS_CONTRACT_CODE_REQUEST_TIME + .with_label_values(&[&key.shard_id.to_string()]) + .start_timer(); + + let main_transition_key = request.main_transition(); + let Some(transition_data) = + self.runtime.store().get_ser::( + DBCol::StateTransitionData, + &near_primitives::utils::get_block_shard_id( + &main_transition_key.block_hash, + main_transition_key.shard_id, + ), + )? + else { + tracing::warn!( + target: "client", + ?key, + ?main_transition_key, + "Missing state transition data" + ); + return Ok(()); + }; + let valid_accesses: HashSet = + transition_data.contract_accesses().iter().cloned().collect(); + + let storage = TrieDBStorage::new( + TrieStoreAdapter::new(self.runtime.store().clone()), + self.epoch_manager.shard_id_to_uid( + main_transition_key.shard_id, + &self.epoch_manager.get_epoch_id(&main_transition_key.block_hash)?, + )?, + ); + let mut contracts = Vec::new(); + for contract_hash in request.contracts() { + if !valid_accesses.contains(contract_hash) { + tracing::warn!( + target: "client", + ?key, + ?contract_hash, + "Requested contract code was not accessed when applying the chunk" + ); + return Ok(()); + } + match storage.retrieve_raw_bytes(&contract_hash.0) { + Ok(bytes) => contracts.push(CodeBytes(bytes)), + Err(StorageError::MissingTrieValue(_, _)) => { + tracing::warn!( + target: "client", + ?contract_hash, + chunk_production_key = ?key, + "Requested contract hash is not present in the storage" + ); + return Ok(()); + } + Err(err) => return Err(err.into()), + } + } + let response = ContractCodeResponse::encode(key.clone(), &contracts)?; + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::ContractCodeResponse(request.requester().clone(), response), + )); + Ok(()) + } + + /// Handles contract code responses message from chunk producer. + async fn handle_contract_code_response( + &mut self, + response: ContractCodeResponse, + ) -> Result<(), Error> { + let key = response.chunk_production_key().clone(); + let contracts = response.decompress_contracts()?; + self.partial_witness_tracker.store_accessed_contract_codes(key, contracts) + } + + fn my_validator_signer(&self) -> Result, Error> { + self.my_signer.get().ok_or_else(|| Error::NotAValidator("not a validator".to_owned())) + } + + fn contract_deploys_encoder(&mut self, validators_count: usize) -> Arc { + self.contract_deploys_encoders.entry(validators_count) + } + + fn ordered_contract_deploys_validators( + &mut self, + key: &ChunkProductionKey, + ) -> Result, Error> { + let chunk_producers = HashSet::::from_iter( + self.epoch_manager.get_epoch_chunk_producers_for_shard(&key.epoch_id, key.shard_id)?, + ); + let mut validators = self + .epoch_manager + .get_epoch_all_validators(&key.epoch_id)? + .into_iter() + .filter(|stake| !chunk_producers.contains(stake.account_id())) + .map(|stake| stake.account_id().clone()) + .collect::>(); + validators.sort(); + Ok(validators) + } +} + +fn compress_witness(witness: &ChunkStateWitness) -> Result { + let shard_id_label = witness.chunk_header.shard_id().to_string(); + let encode_timer = near_chain::stateless_validation::metrics::CHUNK_STATE_WITNESS_ENCODE_TIME + .with_label_values(&[shard_id_label.as_str()]) + .start_timer(); + let (witness_bytes, raw_witness_size) = EncodedChunkStateWitness::encode(witness)?; + encode_timer.observe_duration(); + + near_chain::stateless_validation::metrics::record_witness_size_metrics( + raw_witness_size, + witness_bytes.size_bytes(), + witness, + ); + Ok(witness_bytes) +} + +fn contracts_cache_contains_contract( + cache: &dyn ContractRuntimeCache, + contract_hash: &CodeHash, + runtime_config: &RuntimeConfig, +) -> bool { + let cache_key = get_contract_cache_key(contract_hash.0, &runtime_config.wasm_config); + cache.memory_cache().contains(cache_key) || cache.has(&cache_key).is_ok_and(|has| has) +} diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs index 4beb1b1ff3d..b8e9de73a8e 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs @@ -1,10 +1,11 @@ use std::collections::HashSet; use std::num::NonZeroUsize; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use lru::LruCache; use near_async::messaging::CanSend; use near_async::time::Instant; +use near_cache::SyncLruCache; use near_chain::chain::ChunkStateWitnessMessage; use near_chain::Error; use near_epoch_manager::EpochManagerAdapter; @@ -308,13 +309,13 @@ pub struct PartialEncodedStateWitnessTracker { /// Epoch manager to get the set of chunk validators epoch_manager: Arc, /// Keeps track of state witness parts received from chunk producers. - parts_cache: LruCache, + parts_cache: Mutex>, /// Keeps track of the already decoded witnesses. This is needed /// to protect chunk validator from processing the same witness multiple /// times. - processed_witnesses: LruCache, + processed_witnesses: SyncLruCache, /// Reed Solomon encoder for decoding state witness parts. - encoders: ReedSolomonEncoderCache, + encoders: Mutex, } impl PartialEncodedStateWitnessTracker { @@ -325,16 +326,16 @@ impl PartialEncodedStateWitnessTracker { Self { client_sender, epoch_manager, - parts_cache: LruCache::new(NonZeroUsize::new(WITNESS_PARTS_CACHE_SIZE).unwrap()), - processed_witnesses: LruCache::new( - NonZeroUsize::new(PROCESSED_WITNESSES_CACHE_SIZE).unwrap(), - ), - encoders: ReedSolomonEncoderCache::new(WITNESS_RATIO_DATA_PARTS), + parts_cache: Mutex::new(LruCache::new( + NonZeroUsize::new(WITNESS_PARTS_CACHE_SIZE).unwrap(), + )), + processed_witnesses: SyncLruCache::new(PROCESSED_WITNESSES_CACHE_SIZE), + encoders: Mutex::new(ReedSolomonEncoderCache::new(WITNESS_RATIO_DATA_PARTS)), } } pub fn store_partial_encoded_state_witness( - &mut self, + &self, partial_witness: PartialEncodedStateWitness, ) -> Result<(), Error> { tracing::debug!(target: "client", ?partial_witness, "store_partial_encoded_state_witness"); @@ -345,7 +346,7 @@ impl PartialEncodedStateWitnessTracker { } pub fn store_accessed_contract_hashes( - &mut self, + &self, key: ChunkProductionKey, hashes: HashSet, ) -> Result<(), Error> { @@ -355,7 +356,7 @@ impl PartialEncodedStateWitnessTracker { } pub fn store_accessed_contract_codes( - &mut self, + &self, key: ChunkProductionKey, codes: Vec, ) -> Result<(), Error> { @@ -365,7 +366,7 @@ impl PartialEncodedStateWitnessTracker { } fn process_update( - &mut self, + &self, key: ChunkProductionKey, create_if_not_exists: bool, update: CacheUpdate, @@ -382,17 +383,23 @@ impl PartialEncodedStateWitnessTracker { if create_if_not_exists { self.maybe_insert_new_entry_in_parts_cache(&key); } - let Some(entry) = self.parts_cache.get_mut(&key) else { + let mut parts_cache = self.parts_cache.lock().unwrap(); + let Some(entry) = parts_cache.get_mut(&key) else { return Ok(()); }; - if let Some((decode_result, accessed_contracts)) = entry.update(update) { + let total_size: usize = if let Some((decode_result, accessed_contracts)) = + entry.update(update) + { // Record the time taken from receiving first part to decoding partial witness. let time_to_last_part = Instant::now().signed_duration_since(entry.created_at); metrics::PARTIAL_WITNESS_TIME_TO_LAST_PART .with_label_values(&[key.shard_id.to_string().as_str()]) .observe(time_to_last_part.as_seconds_f64()); - self.parts_cache.pop(&key); + parts_cache.pop(&key); + let total_size = parts_cache.iter().map(|(_, entry)| entry.total_size()).sum(); + drop(parts_cache); + self.processed_witnesses.push(key.clone(), ()); let encoded_witness = match decode_result { @@ -428,26 +435,33 @@ impl PartialEncodedStateWitnessTracker { tracing::debug!(target: "client", ?key, "Sending encoded witness to client."); self.client_sender.send(ChunkStateWitnessMessage { witness, raw_witness_size }); - } - self.record_total_parts_cache_size_metric(); + + total_size + } else { + parts_cache.iter().map(|(_, entry)| entry.total_size()).sum() + }; + metrics::PARTIAL_WITNESS_CACHE_SIZE.set(total_size as f64); + Ok(()) } - fn get_encoder(&mut self, key: &ChunkProductionKey) -> Result, Error> { + fn get_encoder(&self, key: &ChunkProductionKey) -> Result, Error> { // The expected number of parts for the Reed Solomon encoding is the number of chunk validators. let num_parts = self .epoch_manager .get_chunk_validator_assignments(&key.epoch_id, key.shard_id, key.height_created)? .len(); - Ok(self.encoders.entry(num_parts)) + let mut encoders = self.encoders.lock().unwrap(); + Ok(encoders.entry(num_parts)) } // Function to insert a new entry into the cache for the chunk hash if it does not already exist // We additionally check if an evicted entry has been fully decoded and processed. - fn maybe_insert_new_entry_in_parts_cache(&mut self, key: &ChunkProductionKey) { - if !self.parts_cache.contains(key) { + fn maybe_insert_new_entry_in_parts_cache(&self, key: &ChunkProductionKey) { + let mut parts_cache = self.parts_cache.lock().unwrap(); + if !parts_cache.contains(key) { if let Some((evicted_key, evicted_entry)) = - self.parts_cache.push(key.clone(), CacheEntry::new(key.shard_id)) + parts_cache.push(key.clone(), CacheEntry::new(key.shard_id)) { tracing::warn!( target: "client", @@ -460,11 +474,6 @@ impl PartialEncodedStateWitnessTracker { } } - fn record_total_parts_cache_size_metric(&self) { - let total_size: usize = self.parts_cache.iter().map(|(_, entry)| entry.total_size()).sum(); - metrics::PARTIAL_WITNESS_CACHE_SIZE.set(total_size as f64); - } - fn decode_state_witness( &self, encoded_witness: &EncodedChunkStateWitness, diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index 22ca09bde99..35c8db5f5de 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -16,7 +16,7 @@ use actix::{Actor, Addr, Context}; use futures::{future, FutureExt}; use near_async::actix::AddrWithAutoSpanContextExt; use near_async::actix_wrapper::{spawn_actix_actor, ActixWrapper}; -use near_async::futures::ActixFutureSpawner; +use near_async::futures::{ActixFutureSpawner, TokioRuntimeFutureSpawner}; use near_async::messaging::{ noop, CanSend, IntoMultiSender, IntoSender, LateBoundSender, SendAsync, Sender, }; @@ -156,7 +156,10 @@ pub fn setup( ); let client_adapter_for_partial_witness_actor = LateBoundSender::new(); + let networking_rt = Arc::new(tokio::runtime::Builder::new_current_thread().build().unwrap()); + let networking_spawner = Arc::new(TokioRuntimeFutureSpawner(networking_rt)); let (partial_witness_addr, _) = spawn_actix_actor(PartialWitnessActor::new( + networking_spawner, clock.clone(), network_adapter.clone(), client_adapter_for_partial_witness_actor.as_multi_sender(), @@ -164,6 +167,7 @@ pub fn setup( epoch_manager.clone(), runtime.clone(), Arc::new(RayonAsyncComputationSpawner), + Arc::new(RayonAsyncComputationSpawner), )); let partial_witness_adapter = partial_witness_addr.with_auto_span_context(); diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index 144a7677c00..fb6136de4b8 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Mutex}; use tempfile::TempDir; -use near_async::futures::FutureSpawner; +use near_async::futures::{FutureSpawner, TokioRuntimeFutureSpawner}; use near_async::messaging::{noop, IntoMultiSender, IntoSender, LateBoundSender}; use near_async::test_loop::sender::TestLoopSender; use near_async::test_loop::TestLoopV2; @@ -719,7 +719,11 @@ impl TestLoopBuilder { ) .unwrap(); + let networking_rt = + Arc::new(tokio::runtime::Builder::new_current_thread().build().unwrap()); + let networking_spawner = Arc::new(TokioRuntimeFutureSpawner(networking_rt)); let partial_witness_actor = PartialWitnessActor::new( + networking_spawner, self.test_loop.clock(), network_adapter.as_multi_sender(), client_adapter.as_multi_sender(), @@ -727,6 +731,7 @@ impl TestLoopBuilder { epoch_manager.clone(), runtime_adapter.clone(), Arc::new(self.test_loop.async_computation_spawner(|_| Duration::milliseconds(80))), + Arc::new(self.test_loop.async_computation_spawner(|_| Duration::milliseconds(80))), ); let gc_actor = GCActor::new( diff --git a/integration-tests/src/tests/network/runner.rs b/integration-tests/src/tests/network/runner.rs index 8eecd5a410a..3138a5dc90d 100644 --- a/integration-tests/src/tests/network/runner.rs +++ b/integration-tests/src/tests/network/runner.rs @@ -2,7 +2,7 @@ use actix::{Actor, Addr}; use anyhow::{anyhow, bail, Context}; use near_async::actix::AddrWithAutoSpanContextExt; use near_async::actix_wrapper::{spawn_actix_actor, ActixWrapper}; -use near_async::futures::ActixFutureSpawner; +use near_async::futures::{ActixFutureSpawner, TokioRuntimeFutureSpawner}; use near_async::messaging::{noop, IntoMultiSender, IntoSender, LateBoundSender}; use near_async::time::{self, Clock}; use near_chain::rayon_spawner::RayonAsyncComputationSpawner; @@ -140,7 +140,10 @@ fn setup_network_node( runtime.store().clone(), client_config.chunk_request_retry_period, ); + let networking_rt = Arc::new(tokio::runtime::Builder::new_current_thread().build().unwrap()); + let networking_spawner = Arc::new(TokioRuntimeFutureSpawner(networking_rt)); let (partial_witness_actor, _) = spawn_actix_actor(PartialWitnessActor::new( + networking_spawner, Clock::real(), network_adapter.as_multi_sender(), client_actor.clone().with_auto_span_context().into_multi_sender(), @@ -148,6 +151,7 @@ fn setup_network_node( epoch_manager, runtime, Arc::new(RayonAsyncComputationSpawner), + Arc::new(RayonAsyncComputationSpawner), )); shards_manager_adapter.bind(shards_manager_actor.with_auto_span_context()); let peer_manager = PeerManagerActor::spawn( diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index f132a7a4174..9e28d301cdc 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -227,6 +227,8 @@ pub struct NearNode { pub state_sync_runtime: Arc, /// Shard tracker, allows querying of which shards are tracked by this node. pub shard_tracker: ShardTracker, + // The threads that the networking layer runs in. + pub networking_rt: Arc, } pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result { @@ -363,8 +365,13 @@ pub fn start_with_config_and_synchronization( ); let snapshot_callbacks = SnapshotCallbacks { make_snapshot_callback, delete_snapshot_callback }; + let networking_rt = + Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()); + let networking_spawner = Arc::new(TokioRuntimeFutureSpawner(networking_rt.clone())); + let (partial_witness_actor, partial_witness_arbiter) = spawn_actix_actor(PartialWitnessActor::new( + networking_spawner, Clock::real(), network_adapter.as_multi_sender(), client_adapter_for_partial_witness_actor.as_multi_sender(), @@ -372,6 +379,7 @@ pub fn start_with_config_and_synchronization( epoch_manager.clone(), runtime.clone(), Arc::new(RayonAsyncComputationSpawner), + Arc::new(RayonAsyncComputationSpawner), )); let (_gc_actor, gc_arbiter) = spawn_actix_actor(GCActor::new( @@ -516,5 +524,6 @@ pub fn start_with_config_and_synchronization( resharding_handle, state_sync_runtime, shard_tracker, + networking_rt, }) } diff --git a/utils/near-cache/src/sync.rs b/utils/near-cache/src/sync.rs index 4b971c9a655..8cec6a2cc33 100644 --- a/utils/near-cache/src/sync.rs +++ b/utils/near-cache/src/sync.rs @@ -30,6 +30,18 @@ where self.inner.lock().unwrap().is_empty() } + /// Returns true if the cache contains the key and false otherwise. + pub fn contains(&self, key: &K) -> bool { + self.inner.lock().unwrap().contains(key) + } + + /// Pushes a key-value pair into the cache. If an entry with key `k` already exists in + /// the cache or another cache entry is removed (due to the lru's capacity), + /// then it returns the old entry's key-value pair. Otherwise, returns `None`. + pub fn push(&self, key: K, value: V) -> Option<(K, V)> { + self.inner.lock().unwrap().push(key, value) + } + /// Return the value of they key in the cache otherwise computes the value and inserts it into /// the cache. If the key is already in the cache, they get moved to the head of /// the LRU list.