diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index ea909642fac..98ba4e6555e 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -620,6 +620,8 @@ impl Handler for ClientActorInner { shard_id, state_response, &mut self.client.chain, + self.state_parts_future_spawner.as_ref(), + self.client.runtime_adapter.clone(), ); return; } @@ -637,6 +639,8 @@ impl Handler for ClientActorInner { shard_id, state_response, &mut self.client.chain, + self.state_parts_future_spawner.as_ref(), + self.client.runtime_adapter.clone(), ); return; } diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index 48d74ff5236..bd5fd3297b0 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -27,7 +27,7 @@ use crate::sync::external::{ use borsh::BorshDeserialize; use futures::{future, FutureExt}; use near_async::futures::{FutureSpawner, FutureSpawnerExt}; -use near_async::messaging::{CanSend, SendAsync}; +use near_async::messaging::SendAsync; use near_async::time::{Clock, Duration, Utc}; use near_chain::chain::{ApplyStatePartsRequest, LoadMemtrieRequest}; use near_chain::near_chain_primitives; @@ -40,7 +40,7 @@ use near_client_primitives::types::{ use near_epoch_manager::EpochManagerAdapter; use near_network::types::{ HighestHeightPeerInfo, NetworkRequests, NetworkResponses, PeerManagerAdapter, - PeerManagerMessageRequest, StateSyncEvent, + PeerManagerMessageRequest, }; use near_primitives::hash::CryptoHash; use near_primitives::network::PeerId; @@ -85,6 +85,12 @@ pub enum StateSyncFileDownloadResult { StatePart { part_length: u64 }, } +#[derive(PartialEq, Eq)] +enum PartProvenance { + Peers, + External, +} + /// Signals that a state part was downloaded and saved to RocksDB. /// Or failed to do so. pub struct StateSyncGetFileResult { @@ -92,6 +98,7 @@ pub struct StateSyncGetFileResult { shard_id: ShardId, part_id: Option, result: Result, + provenance: PartProvenance, } struct StateSyncExternal { @@ -348,8 +355,13 @@ impl StateSync { sync_hash: CryptoHash, shard_sync: &mut HashMap, ) { - for StateSyncGetFileResult { sync_hash: msg_sync_hash, shard_id, part_id, result } in - self.state_parts_mpsc_rx.try_iter() + for StateSyncGetFileResult { + sync_hash: msg_sync_hash, + shard_id, + part_id, + result, + provenance, + } in self.state_parts_mpsc_rx.try_iter() { if msg_sync_hash != sync_hash { tracing::debug!(target: "sync", @@ -401,6 +413,7 @@ impl StateSync { download, file_type, download_result, + provenance, ); } } @@ -723,22 +736,24 @@ impl StateSync { pub fn update_download_on_state_response_message( &mut self, shard_sync_download: &mut ShardSyncDownload, - hash: CryptoHash, + sync_hash: CryptoHash, shard_id: ShardId, state_response: ShardStateSyncResponse, chain: &mut Chain, + state_parts_future_spawner: &dyn FutureSpawner, + runtime_adapter: Arc, ) { match shard_sync_download.status { ShardSyncStatus::StateDownloadHeader => { let header_download = shard_sync_download.get_header_download_mut().unwrap(); if let Some(header) = state_response.take_header() { if !header_download.done { - match chain.set_state_header(shard_id, hash, header) { + match chain.set_state_header(shard_id, sync_hash, header) { Ok(()) => { header_download.done = true; } Err(err) => { - tracing::error!(target: "sync", %shard_id, %hash, ?err, "State sync set_state_header error"); + tracing::error!(target: "sync", %shard_id, %sync_hash, ?err, "State sync set_state_header error"); header_download.error = true; } } @@ -747,7 +762,7 @@ impl StateSync { // No header found. // It may happen because requested node couldn't build state response. if !header_download.done { - tracing::info!(target: "sync", %shard_id, %hash, "state_response doesn't have header, should be re-requested"); + tracing::info!(target: "sync", %shard_id, %sync_hash, "state_response doesn't have header, should be re-requested"); header_download.error = true; } } @@ -757,27 +772,43 @@ impl StateSync { let num_parts = shard_sync_download.downloads.len() as u64; let (part_id, data) = part; if part_id >= num_parts { - tracing::error!(target: "sync", %shard_id, %hash, part_id, "State sync received incorrect part_id, potential malicious peer"); + tracing::error!(target: "sync", %shard_id, %sync_hash, part_id, "State sync received incorrect part_id, potential malicious peer"); return; } if !shard_sync_download.downloads[part_id as usize].done { - match chain.set_state_part( - shard_id, - hash, - PartId::new(part_id, num_parts), - &data, - ) { - Ok(()) => { - tracing::debug!(target: "sync", %shard_id, %hash, part_id, "Received correct start part"); - self.network_adapter - .send(StateSyncEvent::StatePartReceived(shard_id, part_id)); - shard_sync_download.downloads[part_id as usize].done = true; - } - Err(err) => { - tracing::error!(target: "sync", %shard_id, %hash, part_id, ?err, "State sync set_state_part error"); - shard_sync_download.downloads[part_id as usize].error = true; + let state_root = chain + .get_state_header(shard_id, sync_hash) + .unwrap() + .chunk_prev_state_root(); + let runtime_adapter = runtime_adapter.clone(); + let part_id = PartId { idx: part_id, total: num_parts }; + let state_parts_mpsc_tx = self.state_parts_mpsc_tx.clone(); + state_parts_future_spawner.spawn( + "update_download_on_state_response_message", + async move { + let result = try_validate_and_store_received_state_part( + part_id, + shard_id, + sync_hash, + state_root, + data, + runtime_adapter + ); + + match state_parts_mpsc_tx.send(StateSyncGetFileResult { + sync_hash, + shard_id, + part_id: Some(part_id), + result, + provenance: PartProvenance::Peers, + }) { + Ok(_) => tracing::debug!(target: "sync", %shard_id, ?part_id, "Download response sent to processing thread."), + Err(err) => { + tracing::error!(target: "sync", ?err, %shard_id, ?part_id, "Unable to send part download response to processing thread."); + }, + } } - } + ); } } } @@ -1114,6 +1145,7 @@ fn request_header_from_external_storage( shard_id, part_id: None, result, + provenance: PartProvenance::External, }) { Ok(_) => tracing::debug!(target: "sync", %shard_id, "Download header response sent to processing thread."), Err(err) => { @@ -1134,26 +1166,19 @@ async fn download_and_store_part_from_external_storage( external: ExternalConnection, runtime_adapter: Arc, ) -> Result { - external - .get_file(shard_id, &location, file_type) - .await - .map_err(|err| err.to_string()) - .and_then(|data| { - info!(target: "sync", ?shard_id, ?part_id, "downloaded state part"); - if runtime_adapter.validate_state_part(&state_root, part_id, &data) { - let mut store_update = runtime_adapter.store().store_update(); - borsh::to_vec(&StatePartKey(sync_hash, shard_id, part_id.idx)) - .and_then(|key| { - store_update.set(DBCol::StateParts, &key, &data); - store_update.commit() - }) - .map_err(|err| format!("Failed to store a state part. err={err:?}, state_root={state_root:?}, part_id={part_id:?}, shard_id={shard_id:?}")) - .map(|_| data.len() as u64) - .map(|part_length| StateSyncFileDownloadResult::StatePart { part_length }) - } else { - Err(format!("validate_state_part failed. state_root={state_root:?}, part_id={part_id:?}, shard_id={shard_id}")) - } - }) + external.get_file(shard_id, &location, file_type).await.map_err(|err| err.to_string()).and_then( + |data| { + info!(target: "sync", ?shard_id, ?part_id, "downloaded state part"); + try_validate_and_store_received_state_part( + part_id, + shard_id, + sync_hash, + state_root, + data, + runtime_adapter, + ) + }, + ) } /// Starts an asynchronous network request to external storage to fetch the given state part. fn request_part_from_external_storage( @@ -1210,6 +1235,7 @@ fn request_part_from_external_storage( shard_id, part_id: Some(part_id), result, + provenance: PartProvenance::External, }) { Ok(_) => tracing::debug!(target: "sync", %shard_id, ?part_id, "Download response sent to processing thread."), Err(err) => { @@ -1266,6 +1292,33 @@ fn request_part_from_peers( ); } +/// Takes a received state part and attempts to validate and store the part. +/// Used both for parts downloaded from external storage and parts received from peers. +/// This process is slow and should only occur on state_parts_future_spawner. +/// Returns a result to be passed back via state_parts_mpsc_tx. +fn try_validate_and_store_received_state_part( + part_id: PartId, + shard_id: ShardId, + sync_hash: CryptoHash, + state_root: StateRoot, + data: Vec, + runtime_adapter: Arc, +) -> Result { + if runtime_adapter.validate_state_part(&state_root, part_id, &data) { + let mut store_update = runtime_adapter.store().store_update(); + borsh::to_vec(&StatePartKey(sync_hash, shard_id, part_id.idx)) + .and_then(|key| { + store_update.set(DBCol::StateParts, &key, &data); + store_update.commit() + }) + .map_err(|err| format!("Failed to store a state part. err={err:?}, state_root={state_root:?}, part_id={part_id:?}, shard_id={shard_id:?}")) + .map(|_| data.len() as u64) + .map(|part_length| StateSyncFileDownloadResult::StatePart { part_length }) + } else { + Err(format!("validate_state_part failed. state_root={state_root:?}, part_id={part_id:?}, shard_id={shard_id}")) + } +} + /// Works around how data requests to external storage are done. /// This function investigates if the response is valid and updates `done` and `error` appropriately. /// If the response is successful, then the downloaded state file was written to the DB. @@ -1275,23 +1328,28 @@ fn process_download_response( download: Option<&mut DownloadStatus>, file_type: String, download_result: Result, + provenance: PartProvenance, ) { match download_result { Ok(data_len) => { // No error, aka Success. - metrics::STATE_SYNC_EXTERNAL_PARTS_DONE - .with_label_values(&[&shard_id.to_string(), &file_type]) - .inc(); - metrics::STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED - .with_label_values(&[&shard_id.to_string(), &file_type]) - .inc_by(data_len); + if provenance == PartProvenance::External { + metrics::STATE_SYNC_EXTERNAL_PARTS_DONE + .with_label_values(&[&shard_id.to_string(), &file_type]) + .inc(); + metrics::STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED + .with_label_values(&[&shard_id.to_string(), &file_type]) + .inc_by(data_len); + } download.map(|download| download.done = true); } // The request failed without reaching the external storage. Err(err) => { - metrics::STATE_SYNC_EXTERNAL_PARTS_FAILED - .with_label_values(&[&shard_id.to_string(), &file_type]) - .inc(); + if provenance == PartProvenance::External { + metrics::STATE_SYNC_EXTERNAL_PARTS_FAILED + .with_label_values(&[&shard_id.to_string(), &file_type]) + .inc(); + } tracing::debug!(target: "sync", ?err, %shard_id, %sync_hash, ?file_type, "Failed to get a file from external storage, will retry"); download.map(|download| download.done = false); } @@ -1379,6 +1437,9 @@ mod test { }; run_actix(async { + let state_parts_future_spawner = + ActixArbiterHandleFutureSpawner(Arbiter::new().handle()); + state_sync .run( &None, @@ -1390,9 +1451,9 @@ mod test { vec![new_shard_id_tmp(0)], &noop().into_sender(), &noop().into_sender(), - &ActixArbiterHandleFutureSpawner(Arbiter::new().handle()), + &state_parts_future_spawner, false, - runtime, + runtime.clone(), ) .unwrap(); @@ -1439,6 +1500,8 @@ mod test { new_shard_id_tmp(0), state_response, &mut chain, + &state_parts_future_spawner, + runtime, ); let download = new_shard_sync.get(&new_shard_id_tmp(0)).unwrap();