Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(state sync): handle StateResponse on state_parts_future_spawner #12205

Merged
merged 3 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,8 @@ impl Handler<StateResponse> for ClientActorInner {
shard_id,
state_response,
&mut self.client.chain,
self.state_parts_future_spawner.as_ref(),
self.client.runtime_adapter.clone(),
);
return;
}
Expand All @@ -637,6 +639,8 @@ impl Handler<StateResponse> for ClientActorInner {
shard_id,
state_response,
&mut self.client.chain,
self.state_parts_future_spawner.as_ref(),
self.client.runtime_adapter.clone(),
);
return;
}
Expand Down
175 changes: 119 additions & 56 deletions chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::sync::external::{
use borsh::BorshDeserialize;
use futures::{future, FutureExt};
use near_async::futures::{FutureSpawner, FutureSpawnerExt};
use near_async::messaging::{CanSend, SendAsync};
use near_async::messaging::SendAsync;
use near_async::time::{Clock, Duration, Utc};
use near_chain::chain::{ApplyStatePartsRequest, LoadMemtrieRequest};
use near_chain::near_chain_primitives;
Expand All @@ -40,7 +40,7 @@ use near_client_primitives::types::{
use near_epoch_manager::EpochManagerAdapter;
use near_network::types::{
HighestHeightPeerInfo, NetworkRequests, NetworkResponses, PeerManagerAdapter,
PeerManagerMessageRequest, StateSyncEvent,
PeerManagerMessageRequest,
};
use near_primitives::hash::CryptoHash;
use near_primitives::network::PeerId;
Expand Down Expand Up @@ -85,13 +85,20 @@ pub enum StateSyncFileDownloadResult {
StatePart { part_length: u64 },
}

#[derive(PartialEq, Eq)]
enum PartProvenance {
Peers,
External,
}

/// Signals that a state part was downloaded and saved to RocksDB.
/// Or failed to do so.
pub struct StateSyncGetFileResult {
sync_hash: CryptoHash,
shard_id: ShardId,
part_id: Option<PartId>,
result: Result<StateSyncFileDownloadResult, String>,
provenance: PartProvenance,
}

struct StateSyncExternal {
Expand Down Expand Up @@ -348,8 +355,13 @@ impl StateSync {
sync_hash: CryptoHash,
shard_sync: &mut HashMap<ShardId, ShardSyncDownload>,
) {
for StateSyncGetFileResult { sync_hash: msg_sync_hash, shard_id, part_id, result } in
self.state_parts_mpsc_rx.try_iter()
for StateSyncGetFileResult {
sync_hash: msg_sync_hash,
shard_id,
part_id,
result,
provenance,
} in self.state_parts_mpsc_rx.try_iter()
{
if msg_sync_hash != sync_hash {
tracing::debug!(target: "sync",
Expand Down Expand Up @@ -401,6 +413,7 @@ impl StateSync {
download,
file_type,
download_result,
provenance,
);
}
}
Expand Down Expand Up @@ -723,22 +736,24 @@ impl StateSync {
pub fn update_download_on_state_response_message(
&mut self,
shard_sync_download: &mut ShardSyncDownload,
hash: CryptoHash,
sync_hash: CryptoHash,
shard_id: ShardId,
state_response: ShardStateSyncResponse,
chain: &mut Chain,
state_parts_future_spawner: &dyn FutureSpawner,
runtime_adapter: Arc<dyn RuntimeAdapter>,
) {
match shard_sync_download.status {
ShardSyncStatus::StateDownloadHeader => {
let header_download = shard_sync_download.get_header_download_mut().unwrap();
if let Some(header) = state_response.take_header() {
if !header_download.done {
match chain.set_state_header(shard_id, hash, header) {
match chain.set_state_header(shard_id, sync_hash, header) {
Ok(()) => {
header_download.done = true;
}
Err(err) => {
tracing::error!(target: "sync", %shard_id, %hash, ?err, "State sync set_state_header error");
tracing::error!(target: "sync", %shard_id, %sync_hash, ?err, "State sync set_state_header error");
header_download.error = true;
}
}
Expand All @@ -747,7 +762,7 @@ impl StateSync {
// No header found.
// It may happen because requested node couldn't build state response.
if !header_download.done {
tracing::info!(target: "sync", %shard_id, %hash, "state_response doesn't have header, should be re-requested");
marcelo-gonzalez marked this conversation as resolved.
Show resolved Hide resolved
tracing::info!(target: "sync", %shard_id, %sync_hash, "state_response doesn't have header, should be re-requested");
header_download.error = true;
}
}
Expand All @@ -757,27 +772,43 @@ impl StateSync {
let num_parts = shard_sync_download.downloads.len() as u64;
let (part_id, data) = part;
if part_id >= num_parts {
tracing::error!(target: "sync", %shard_id, %hash, part_id, "State sync received incorrect part_id, potential malicious peer");
tracing::error!(target: "sync", %shard_id, %sync_hash, part_id, "State sync received incorrect part_id, potential malicious peer");
return;
}
if !shard_sync_download.downloads[part_id as usize].done {
match chain.set_state_part(
shard_id,
hash,
PartId::new(part_id, num_parts),
&data,
) {
Ok(()) => {
tracing::debug!(target: "sync", %shard_id, %hash, part_id, "Received correct start part");
self.network_adapter
.send(StateSyncEvent::StatePartReceived(shard_id, part_id));
shard_sync_download.downloads[part_id as usize].done = true;
}
Err(err) => {
tracing::error!(target: "sync", %shard_id, %hash, part_id, ?err, "State sync set_state_part error");
shard_sync_download.downloads[part_id as usize].error = true;
let state_root = chain
.get_state_header(shard_id, sync_hash)
.unwrap()
.chunk_prev_state_root();
let runtime_adapter = runtime_adapter.clone();
let part_id = PartId { idx: part_id, total: num_parts };
let state_parts_mpsc_tx = self.state_parts_mpsc_tx.clone();
state_parts_future_spawner.spawn(
"update_download_on_state_response_message",
async move {
let result = try_validate_and_store_received_state_part(
part_id,
shard_id,
sync_hash,
state_root,
data,
runtime_adapter
);

match state_parts_mpsc_tx.send(StateSyncGetFileResult {
saketh-are marked this conversation as resolved.
Show resolved Hide resolved
sync_hash,
shard_id,
part_id: Some(part_id),
result,
provenance: PartProvenance::Peers,
}) {
Ok(_) => tracing::debug!(target: "sync", %shard_id, ?part_id, "Download response sent to processing thread."),
Err(err) => {
tracing::error!(target: "sync", ?err, %shard_id, ?part_id, "Unable to send part download response to processing thread.");
},
}
}
}
);
}
}
}
Expand Down Expand Up @@ -1114,6 +1145,7 @@ fn request_header_from_external_storage(
shard_id,
part_id: None,
result,
provenance: PartProvenance::External,
}) {
Ok(_) => tracing::debug!(target: "sync", %shard_id, "Download header response sent to processing thread."),
Err(err) => {
Expand All @@ -1134,26 +1166,19 @@ async fn download_and_store_part_from_external_storage(
external: ExternalConnection,
runtime_adapter: Arc<dyn RuntimeAdapter>,
) -> Result<StateSyncFileDownloadResult, String> {
external
.get_file(shard_id, &location, file_type)
.await
.map_err(|err| err.to_string())
.and_then(|data| {
info!(target: "sync", ?shard_id, ?part_id, "downloaded state part");
if runtime_adapter.validate_state_part(&state_root, part_id, &data) {
let mut store_update = runtime_adapter.store().store_update();
borsh::to_vec(&StatePartKey(sync_hash, shard_id, part_id.idx))
.and_then(|key| {
store_update.set(DBCol::StateParts, &key, &data);
store_update.commit()
})
.map_err(|err| format!("Failed to store a state part. err={err:?}, state_root={state_root:?}, part_id={part_id:?}, shard_id={shard_id:?}"))
.map(|_| data.len() as u64)
.map(|part_length| StateSyncFileDownloadResult::StatePart { part_length })
} else {
Err(format!("validate_state_part failed. state_root={state_root:?}, part_id={part_id:?}, shard_id={shard_id}"))
}
})
external.get_file(shard_id, &location, file_type).await.map_err(|err| err.to_string()).and_then(
|data| {
info!(target: "sync", ?shard_id, ?part_id, "downloaded state part");
try_validate_and_store_received_state_part(
part_id,
shard_id,
sync_hash,
state_root,
data,
runtime_adapter,
)
},
)
}
/// Starts an asynchronous network request to external storage to fetch the given state part.
fn request_part_from_external_storage(
Expand Down Expand Up @@ -1210,6 +1235,7 @@ fn request_part_from_external_storage(
shard_id,
part_id: Some(part_id),
result,
provenance: PartProvenance::External,
}) {
Ok(_) => tracing::debug!(target: "sync", %shard_id, ?part_id, "Download response sent to processing thread."),
Err(err) => {
Expand Down Expand Up @@ -1266,6 +1292,33 @@ fn request_part_from_peers(
);
}

/// Takes a received state part and attempts to validate and store the part.
/// Used both for parts downloaded from external storage and parts received from peers.
/// This process is slow and should only occur on state_parts_future_spawner.
/// Returns a result to be passed back via state_parts_mpsc_tx.
fn try_validate_and_store_received_state_part(
marcelo-gonzalez marked this conversation as resolved.
Show resolved Hide resolved
part_id: PartId,
shard_id: ShardId,
sync_hash: CryptoHash,
state_root: StateRoot,
data: Vec<u8>,
runtime_adapter: Arc<dyn RuntimeAdapter>,
) -> Result<StateSyncFileDownloadResult, String> {
if runtime_adapter.validate_state_part(&state_root, part_id, &data) {
let mut store_update = runtime_adapter.store().store_update();
borsh::to_vec(&StatePartKey(sync_hash, shard_id, part_id.idx))
.and_then(|key| {
store_update.set(DBCol::StateParts, &key, &data);
store_update.commit()
})
.map_err(|err| format!("Failed to store a state part. err={err:?}, state_root={state_root:?}, part_id={part_id:?}, shard_id={shard_id:?}"))
.map(|_| data.len() as u64)
.map(|part_length| StateSyncFileDownloadResult::StatePart { part_length })
} else {
Err(format!("validate_state_part failed. state_root={state_root:?}, part_id={part_id:?}, shard_id={shard_id}"))
}
}

/// Works around how data requests to external storage are done.
/// This function investigates if the response is valid and updates `done` and `error` appropriately.
/// If the response is successful, then the downloaded state file was written to the DB.
Expand All @@ -1275,23 +1328,28 @@ fn process_download_response(
download: Option<&mut DownloadStatus>,
file_type: String,
download_result: Result<u64, String>,
provenance: PartProvenance,
) {
match download_result {
Ok(data_len) => {
// No error, aka Success.
metrics::STATE_SYNC_EXTERNAL_PARTS_DONE
.with_label_values(&[&shard_id.to_string(), &file_type])
.inc();
metrics::STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED
.with_label_values(&[&shard_id.to_string(), &file_type])
.inc_by(data_len);
if provenance == PartProvenance::External {
metrics::STATE_SYNC_EXTERNAL_PARTS_DONE
.with_label_values(&[&shard_id.to_string(), &file_type])
.inc();
metrics::STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED
.with_label_values(&[&shard_id.to_string(), &file_type])
.inc_by(data_len);
}
download.map(|download| download.done = true);
}
// The request failed without reaching the external storage.
Err(err) => {
metrics::STATE_SYNC_EXTERNAL_PARTS_FAILED
.with_label_values(&[&shard_id.to_string(), &file_type])
.inc();
if provenance == PartProvenance::External {
metrics::STATE_SYNC_EXTERNAL_PARTS_FAILED
.with_label_values(&[&shard_id.to_string(), &file_type])
.inc();
}
tracing::debug!(target: "sync", ?err, %shard_id, %sync_hash, ?file_type, "Failed to get a file from external storage, will retry");
download.map(|download| download.done = false);
}
Expand Down Expand Up @@ -1379,6 +1437,9 @@ mod test {
};

run_actix(async {
let state_parts_future_spawner =
ActixArbiterHandleFutureSpawner(Arbiter::new().handle());

state_sync
.run(
&None,
Expand All @@ -1390,9 +1451,9 @@ mod test {
vec![new_shard_id_tmp(0)],
&noop().into_sender(),
&noop().into_sender(),
&ActixArbiterHandleFutureSpawner(Arbiter::new().handle()),
&state_parts_future_spawner,
false,
runtime,
runtime.clone(),
)
.unwrap();

Expand Down Expand Up @@ -1439,6 +1500,8 @@ mod test {
new_shard_id_tmp(0),
state_response,
&mut chain,
&state_parts_future_spawner,
runtime,
);

let download = new_shard_sync.get(&new_shard_id_tmp(0)).unwrap();
Expand Down
Loading