Skip to content

Commit

Permalink
Rewrite State Sync, from a giant state machine to proper async code. (#…
Browse files Browse the repository at this point in the history
…12172)

This rewrites state sync. All functionality is expected to continue to
work without any protocol or database changes.

See the top of state/mod.rs for an overview.

State sync status is now available on the debug page; an example:
![Screenshot from 2024-09-24
09-48-37](https://github.com/user-attachments/assets/41c17e27-564d-403b-93ac-852297b70577)
  • Loading branch information
robin-near authored Oct 11, 2024
1 parent 16fd9e8 commit e86c72a
Show file tree
Hide file tree
Showing 38 changed files with 1,795 additions and 2,063 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 0 additions & 110 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2924,46 +2924,6 @@ impl Chain {
Ok(())
}

pub fn schedule_apply_state_parts(
&self,
shard_id: ShardId,
sync_hash: CryptoHash,
num_parts: u64,
state_parts_task_scheduler: &near_async::messaging::Sender<ApplyStatePartsRequest>,
) -> Result<(), Error> {
let epoch_id = *self.get_block_header(&sync_hash)?.epoch_id();
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;

let shard_state_header = self.get_state_header(shard_id, sync_hash)?;
let state_root = shard_state_header.chunk_prev_state_root();

state_parts_task_scheduler.send(ApplyStatePartsRequest {
runtime_adapter: self.runtime_adapter.clone(),
shard_uid,
state_root,
num_parts,
epoch_id,
sync_hash,
});

Ok(())
}

pub fn schedule_load_memtrie(
&self,
shard_uid: ShardUId,
sync_hash: CryptoHash,
chunk: &ShardChunk,
load_memtrie_scheduler: &near_async::messaging::Sender<LoadMemtrieRequest>,
) {
load_memtrie_scheduler.send(LoadMemtrieRequest {
runtime_adapter: self.runtime_adapter.clone(),
shard_uid,
prev_state_root: chunk.prev_state_root(),
sync_hash,
});
}

pub fn create_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
Expand Down Expand Up @@ -4635,76 +4595,6 @@ pub fn collect_receipts_from_response(
)
}

#[derive(actix::Message)]
#[rtype(result = "()")]
pub struct ApplyStatePartsRequest {
pub runtime_adapter: Arc<dyn RuntimeAdapter>,
pub shard_uid: ShardUId,
pub state_root: StateRoot,
pub num_parts: u64,
pub epoch_id: EpochId,
pub sync_hash: CryptoHash,
}

// Skip `runtime_adapter`, because it's a complex object that has complex logic
// and many fields.
impl Debug for ApplyStatePartsRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ApplyStatePartsRequest")
.field("runtime_adapter", &"<not shown>")
.field("shard_uid", &self.shard_uid)
.field("state_root", &self.state_root)
.field("num_parts", &self.num_parts)
.field("epoch_id", &self.epoch_id)
.field("sync_hash", &self.sync_hash)
.finish()
}
}

#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct ApplyStatePartsResponse {
pub apply_result: Result<(), near_chain_primitives::error::Error>,
pub shard_id: ShardId,
pub sync_hash: CryptoHash,
}

// This message is handled by `sync_job_actions.rs::handle_load_memtrie_request()`.
// It is a request for `runtime_adapter` to load in-memory trie for `shard_uid`.
#[derive(actix::Message)]
#[rtype(result = "()")]
pub struct LoadMemtrieRequest {
pub runtime_adapter: Arc<dyn RuntimeAdapter>,
pub shard_uid: ShardUId,
// Required to load memtrie.
pub prev_state_root: StateRoot,
// Needs to be included in a response to the caller for identification purposes.
pub sync_hash: CryptoHash,
}

// Skip `runtime_adapter`, because it's a complex object that has complex logic
// and many fields.
impl Debug for LoadMemtrieRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LoadMemtrieRequest")
.field("runtime_adapter", &"<not shown>")
.field("shard_uid", &self.shard_uid)
.field("prev_state_root", &self.prev_state_root)
.field("sync_hash", &self.sync_hash)
.finish()
}
}

// It is message indicating the result of loading in-memory trie for `shard_id`.
// `sync_hash` is passed around to indicate to which block we were catching up.
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct LoadMemtrieResponse {
pub load_result: Result<(), near_chain_primitives::error::Error>,
pub shard_uid: ShardUId,
pub sync_hash: CryptoHash,
}

#[derive(actix::Message)]
#[rtype(result = "()")]
pub struct BlockCatchUpRequest {
Expand Down
60 changes: 30 additions & 30 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use near_primitives::views::{
BlockView, ChunkView, DownloadStatusView, EpochValidatorInfo, ExecutionOutcomeWithIdView,
GasPriceView, LightClientBlockLiteView, LightClientBlockView, MaintenanceWindowsView,
QueryRequest, QueryResponse, ReceiptView, ShardSyncDownloadView, SplitStorageInfoView,
StateChangesKindsView, StateChangesRequestView, StateChangesView, SyncStatusView, TxStatusView,
StateChangesKindsView, StateChangesRequestView, StateChangesView, StateSyncStatusView,
SyncStatusView, TxStatusView,
};
pub use near_primitives::views::{StatusResponse, StatusSyncInfo};
use std::collections::HashMap;
Expand Down Expand Up @@ -88,7 +89,7 @@ impl Clone for DownloadStatus {
}

/// Various status of syncing a specific shard.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Copy)]
pub enum ShardSyncStatus {
StateDownloadHeader,
StateDownloadParts,
Expand Down Expand Up @@ -245,28 +246,21 @@ pub fn format_shard_sync_phase(
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct StateSyncStatus {
pub sync_hash: CryptoHash,
pub sync_status: HashMap<ShardId, ShardSyncDownload>,
}

/// If alternate flag was specified, write formatted sync_status per shard.
impl std::fmt::Debug for StateSyncStatus {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
if f.alternate() {
write!(
f,
"StateSyncStatus {{ sync_hash: {:?}, shard_sync: {:?} }}",
self.sync_hash,
format_shard_sync_phase_per_shard(&self.sync_status, false)
)
} else {
write!(
f,
"StateSyncStatus {{ sync_hash: {:?}, sync_status: {:?} }}",
self.sync_hash, self.sync_status
)
pub sync_status: HashMap<ShardId, ShardSyncStatus>,
pub download_tasks: Vec<String>,
pub computation_tasks: Vec<String>,
}

impl StateSyncStatus {
pub fn new(sync_hash: CryptoHash) -> Self {
Self {
sync_hash,
sync_status: HashMap::new(),
download_tasks: Vec::new(),
computation_tasks: Vec::new(),
}
}
}
Expand Down Expand Up @@ -372,14 +366,20 @@ impl From<SyncStatus> for SyncStatusView {
SyncStatus::HeaderSync { start_height, current_height, highest_height } => {
SyncStatusView::HeaderSync { start_height, current_height, highest_height }
}
SyncStatus::StateSync(state_sync_status) => SyncStatusView::StateSync(
state_sync_status.sync_hash,
state_sync_status
.sync_status
.into_iter()
.map(|(shard_id, shard_sync)| (shard_id, shard_sync.into()))
.collect(),
),
SyncStatus::StateSync(state_sync_status) => {
SyncStatusView::StateSync(StateSyncStatusView {
sync_hash: state_sync_status.sync_hash,
shard_sync_status: state_sync_status
.sync_status
.iter()
.map(|(shard_id, shard_sync_status)| {
(*shard_id, shard_sync_status.to_string())
})
.collect(),
download_tasks: state_sync_status.download_tasks,
computation_tasks: state_sync_status.computation_tasks,
})
}
SyncStatus::StateSyncDone => SyncStatusView::StateSyncDone,
SyncStatus::BlockSync { start_height, current_height, highest_height } => {
SyncStatusView::BlockSync { start_height, current_height, highest_height }
Expand Down
2 changes: 2 additions & 0 deletions chain/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ tempfile.workspace = true
thiserror.workspace = true
time.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
tracing.workspace = true
yansi.workspace = true

Expand Down
Loading

0 comments on commit e86c72a

Please sign in to comment.