diff --git a/chain/chain/src/flat_storage_creator.rs b/chain/chain/src/flat_storage_creator.rs index 1690edf1bcc..d60ae44b71e 100644 --- a/chain/chain/src/flat_storage_creator.rs +++ b/chain/chain/src/flat_storage_creator.rs @@ -56,7 +56,7 @@ struct FlatStorageCreationMetrics { /// If we launched a node with enabled flat storage but it doesn't have flat storage data on disk, we have to create it. /// This struct is responsible for this process for the given shard. -/// See doc comment on [`FlatStorageStateStatus`] for the details of the process. +/// See doc comment on [`FlatStorageCreationStatus`] for the details of the process. pub struct FlatStorageShardCreator { #[allow(unused)] shard_id: ShardId, diff --git a/core/store/src/flat_state.rs b/core/store/src/flat_state.rs index 4b7c5cc5585..2625922ffc6 100644 --- a/core/store/src/flat_state.rs +++ b/core/store/src/flat_state.rs @@ -519,7 +519,7 @@ pub const NUM_PARTS_IN_ONE_STEP: u64 = 20; pub const STATE_PART_MEMORY_LIMIT: bytesize::ByteSize = bytesize::ByteSize(10 * bytesize::MIB); /// Current step of fetching state to fill flat storage. -#[derive(BorshSerialize, BorshDeserialize, Clone, Debug, PartialEq, Eq)] +#[derive(BorshSerialize, BorshDeserialize, Copy, Clone, Debug, PartialEq, Eq)] pub struct FetchingStateStatus { /// Number of the first state part to be fetched in this step. pub part_id: u64, @@ -533,7 +533,7 @@ pub struct FetchingStateStatus { /// Because this is a heavy work requiring ~5h for testnet rpc node and ~10h for testnet archival node, we do it on /// background during regular block processing. /// This struct reveals what is the current status of creating flat storage data on disk. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum FlatStorageCreationStatus { /// Flat storage state does not exist. We are saving `FlatStorageDelta`s to disk. /// During this step, we save current chain head, start saving all deltas for blocks after chain head and wait until @@ -563,10 +563,10 @@ impl Into for &FlatStorageCreationStatus { /// Cast inside enum does not work because it is not fieldless. fn into(self) -> i64 { match self { - FlatStorageCreationStatus::Ready => 0, - FlatStorageCreationStatus::SavingDeltas => 1, - FlatStorageCreationStatus::FetchingState(_) => 2, - FlatStorageCreationStatus::CatchingUp => 3, + FlatStorageCreationStatus::SavingDeltas => 0, + FlatStorageCreationStatus::FetchingState(_) => 1, + FlatStorageCreationStatus::CatchingUp => 2, + FlatStorageCreationStatus::Ready => 3, FlatStorageCreationStatus::DontCreate => 4, } } diff --git a/integration-tests/src/tests/client/flat_storage.rs b/integration-tests/src/tests/client/flat_storage.rs index e85300f37ac..cfd3864c63d 100644 --- a/integration-tests/src/tests/client/flat_storage.rs +++ b/integration-tests/src/tests/client/flat_storage.rs @@ -1,57 +1,125 @@ +/// Tests which check correctness of background flat storage creation. use assert_matches::assert_matches; use near_chain::{ChainGenesis, RuntimeAdapter}; use near_chain_configs::Genesis; use near_client::test_utils::TestEnv; use near_o11y::testonly::init_test_logger; -use near_primitives_core::types::BlockHeight; +use near_primitives::shard_layout::{ShardLayout, ShardUId}; +use near_primitives::types::AccountId; +use near_primitives_core::types::{BlockHeight, NumShards}; use near_store::flat_state::{ store_helper, FetchingStateStatus, FlatStorageCreationStatus, NUM_PARTS_IN_ONE_STEP, }; use near_store::test_utils::create_test_store; +#[cfg(feature = "protocol_feature_flat_state")] +use near_store::DBCol; +use near_store::{Store, TrieTraversalItem}; use nearcore::config::GenesisExt; use std::path::Path; +use std::str::FromStr; use std::sync::Arc; use std::thread; use std::time::Duration; +/// Height on which we start flat storage background creation. +const START_HEIGHT: BlockHeight = 4; + +/// Number of steps which should be enough to create flat storage. +const CREATION_TIMEOUT: BlockHeight = 30; + +/// Setup environment with one Near client for testing. +fn setup_env(genesis: &Genesis, store: Store) -> TestEnv { + let chain_genesis = ChainGenesis::new(genesis); + let runtimes: Vec> = + vec![Arc::new(nearcore::NightshadeRuntime::test(Path::new("../../../.."), store, genesis))]; + TestEnv::builder(chain_genesis.clone()).runtime_adapters(runtimes).build() +} + +/// Waits for flat storage creation on shard 0 for `CREATION_TIMEOUT` blocks. +/// We have a pause after processing each block because state data is being fetched in rayon threads, +/// but we expect it to finish in <30s because state is small and there is only one state part. +/// Returns next block height available to produce. +fn wait_for_flat_storage_creation(env: &mut TestEnv, start_height: BlockHeight) -> BlockHeight { + let store = env.clients[0].runtime_adapter.store().clone(); + let mut next_height = start_height; + let mut prev_status = store_helper::get_flat_storage_creation_status(&store, 0); + while next_height < start_height + CREATION_TIMEOUT { + env.produce_block(0, next_height); + env.clients[0].run_flat_storage_creation_step().unwrap(); + + let status = store_helper::get_flat_storage_creation_status(&store, 0); + // Check validity of state transition for flat storage creation. + match &prev_status { + FlatStorageCreationStatus::SavingDeltas => assert_matches!( + status, + FlatStorageCreationStatus::SavingDeltas + | FlatStorageCreationStatus::FetchingState(_) + ), + FlatStorageCreationStatus::FetchingState(_) => assert_matches!( + status, + FlatStorageCreationStatus::FetchingState(_) | FlatStorageCreationStatus::CatchingUp + ), + FlatStorageCreationStatus::CatchingUp => assert_matches!( + status, + FlatStorageCreationStatus::CatchingUp | FlatStorageCreationStatus::Ready + ), + _ => { + panic!("Invalid status {prev_status:?} observed during flat storage creation for height {next_height}"); + } + } + + prev_status = status; + next_height += 1; + if prev_status == FlatStorageCreationStatus::Ready { + break; + } + + thread::sleep(Duration::from_secs(1)); + } + let status = store_helper::get_flat_storage_creation_status(&store, 0); + assert_eq!( + status, + FlatStorageCreationStatus::Ready, + "Client couldn't create flat storage until block {next_height}, status: {status:?}" + ); + assert!(env.clients[0].runtime_adapter.get_flat_storage_state_for_shard(0).is_some()); + next_height +} + /// Check correctness of flat storage creation. #[test] fn test_flat_storage_creation() { init_test_logger(); let genesis = Genesis::test(vec!["test0".parse().unwrap()], 1); - let chain_genesis = ChainGenesis::new(&genesis); let store = create_test_store(); - // Process some blocks with flat storage. + // Process some blocks with flat storage. Then remove flat storage data from disk. { - let runtimes: Vec> = vec![Arc::new( - nearcore::NightshadeRuntime::test(Path::new("../../../.."), store.clone(), &genesis), - )]; - let mut env = - TestEnv::builder(chain_genesis.clone()).runtime_adapters(runtimes.clone()).build(); - for i in 1..4 { - env.produce_block(0, i); + let mut env = setup_env(&genesis, store.clone()); + for height in 1..START_HEIGHT { + env.produce_block(0, height); } if cfg!(feature = "protocol_feature_flat_state") { // If chain was initialized from scratch, flat storage state should be created. During block processing, flat - // storage head should be moved to block 1. + // storage head should be moved to block `START_HEIGHT - 3`. assert_eq!( store_helper::get_flat_storage_creation_status(&store, 0), FlatStorageCreationStatus::Ready ); let expected_flat_storage_head = - env.clients[0].chain.get_block_hash_by_height(1).unwrap(); + env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 3).unwrap(); assert_eq!(store_helper::get_flat_head(&store, 0), Some(expected_flat_storage_head)); - // Deltas for blocks 0 and 1 should not exist. - for i in 0..2 { - let block_hash = env.clients[0].chain.get_block_hash_by_height(i).unwrap(); + // Deltas for blocks until `START_HEIGHT - 2` should not exist. + for height in 0..START_HEIGHT - 2 { + let block_hash = env.clients[0].chain.get_block_hash_by_height(height).unwrap(); assert_eq!(store_helper::get_delta(&store, 0, block_hash), Ok(None)); } - // Deltas for blocks 2 and 3 should still exist, because they come after flat storage head. - for i in 2..4 { - let block_hash = env.clients[0].chain.get_block_hash_by_height(i).unwrap(); + // Deltas for blocks until `START_HEIGHT` should still exist, + // because they come after flat storage head. + for height in START_HEIGHT - 2..START_HEIGHT { + let block_hash = env.clients[0].chain.get_block_hash_by_height(height).unwrap(); assert_matches!(store_helper::get_delta(&store, 0, block_hash), Ok(Some(_))); } } else { @@ -61,27 +129,21 @@ fn test_flat_storage_creation() { ); assert_eq!(store_helper::get_flat_head(&store, 0), None); } - } - // Remove flat storage head using low-level disk operation. Flat storage is implemented in such way that its - // existence is determined by existence of flat storage head. - #[cfg(feature = "protocol_feature_flat_state")] - { - let mut store_update = store.store_update(); - store_helper::remove_flat_head(&mut store_update, 0); - store_update.commit().unwrap(); + let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap(); + let epoch_id = env.clients[0].chain.runtime_adapter.get_epoch_id(&block_hash).unwrap(); + env.clients[0] + .chain + .runtime_adapter + .remove_flat_storage_state_for_shard(0, &epoch_id) + .unwrap(); } // Create new chain and runtime using the same store. It should produce next blocks normally, but now it should // think that flat storage does not exist and background creation should be initiated. - let runtimes: Vec> = vec![Arc::new(nearcore::NightshadeRuntime::test( - Path::new("../../../.."), - store.clone(), - &genesis, - ))]; - let mut env = TestEnv::builder(chain_genesis).runtime_adapters(runtimes.clone()).build(); - for i in 4..6 { - env.produce_block(0, i); + let mut env = setup_env(&genesis, store.clone()); + for height in START_HEIGHT..START_HEIGHT + 2 { + env.produce_block(0, height); } assert!(env.clients[0].runtime_adapter.get_flat_storage_state_for_shard(0).is_none()); @@ -101,18 +163,18 @@ fn test_flat_storage_creation() { store_helper::get_flat_storage_creation_status(&store, 0), FlatStorageCreationStatus::SavingDeltas ); - for i in 4..6 { - let block_hash = env.clients[0].chain.get_block_hash_by_height(i).unwrap(); + for height in START_HEIGHT..START_HEIGHT + 2 { + let block_hash = env.clients[0].chain.get_block_hash_by_height(height).unwrap(); assert_matches!(store_helper::get_delta(&store, 0, block_hash), Ok(Some(_))); } // Produce new block and run flat storage creation step. - // We started the node from height 3, and now final head should move to height 4. + // We started the node from height `START_HEIGHT - 1`, and now final head should move to height `START_HEIGHT`. // Because final head height became greater than height on which node started, // we must start fetching the state. - env.produce_block(0, 6); + env.produce_block(0, START_HEIGHT + 2); assert!(!env.clients[0].run_flat_storage_creation_step().unwrap()); - let final_block_hash = env.clients[0].chain.get_block_hash_by_height(4).unwrap(); + let final_block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT).unwrap(); assert_eq!(store_helper::get_flat_head(&store, 0), Some(final_block_hash)); assert_eq!( store_helper::get_flat_storage_creation_status(&store, 0), @@ -123,46 +185,173 @@ fn test_flat_storage_creation() { }) ); - // Run chain for a couple of blocks and check that statuses switch to `CatchingUp` and then to `Ready`. - // State is being fetched in rayon threads, but we expect it to finish in <30s because state is small and there is - // only one state part. - const BLOCKS_TIMEOUT: BlockHeight = 30; - let start_height = 8; - let mut next_height = start_height; - let mut was_catching_up = false; - while next_height < start_height + BLOCKS_TIMEOUT { - env.produce_block(0, next_height); - env.clients[0].run_flat_storage_creation_step().unwrap(); - next_height += 1; - match store_helper::get_flat_storage_creation_status(&store, 0) { - FlatStorageCreationStatus::FetchingState(..) => { - assert!(!was_catching_up, "Flat storage state status inconsistency: it was catching up before fetching state"); - } - FlatStorageCreationStatus::CatchingUp => { - was_catching_up = true; - } - FlatStorageCreationStatus::Ready => { - assert!( - was_catching_up, - "Flat storage state is ready but there was no flat storage catchup observed" + wait_for_flat_storage_creation(&mut env, START_HEIGHT + 3); +} + +/// Check that client can create flat storage on some shard while it already exists on another shard. +#[test] +fn test_flat_storage_creation_two_shards() { + init_test_logger(); + let num_shards: NumShards = 2; + let genesis = Genesis::test_sharded_new_version( + vec!["test0".parse().unwrap()], + 1, + vec![1; num_shards as usize], + ); + let store = create_test_store(); + + // Process some blocks with flat storages for two shards. Then remove flat storage data from disk for shard 0. + { + let mut env = setup_env(&genesis, store.clone()); + for height in 1..START_HEIGHT { + env.produce_block(0, height); + } + + for shard_id in 0..num_shards { + if cfg!(feature = "protocol_feature_flat_state") { + assert_eq!( + store_helper::get_flat_storage_creation_status(&store, shard_id), + FlatStorageCreationStatus::Ready ); - break; - } - status @ _ => { - panic!( - "Unexpected flat storage state status for height {next_height}: {:?}", - status + } else { + assert_eq!( + store_helper::get_flat_storage_creation_status(&store, shard_id), + FlatStorageCreationStatus::DontCreate ); } } - thread::sleep(Duration::from_secs(1)); + + let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap(); + let epoch_id = env.clients[0].chain.runtime_adapter.get_epoch_id(&block_hash).unwrap(); + env.clients[0] + .chain + .runtime_adapter + .remove_flat_storage_state_for_shard(0, &epoch_id) + .unwrap(); } - if next_height == start_height + BLOCKS_TIMEOUT { - let status = store_helper::get_flat_storage_creation_status(&store, 0); - panic!("Apparently, node didn't fetch the whole state in {BLOCKS_TIMEOUT} blocks. Current status: {:?}", status); + + if !cfg!(feature = "protocol_feature_flat_state") { + return; } - // Finally, check that flat storage state was created. - assert!(env.clients[0].run_flat_storage_creation_step().unwrap()); - assert!(env.clients[0].runtime_adapter.get_flat_storage_state_for_shard(0).is_some()); + // Check that flat storage is not ready for shard 0 but ready for shard 1. + let mut env = setup_env(&genesis, store.clone()); + assert!(env.clients[0].runtime_adapter.get_flat_storage_state_for_shard(0).is_none()); + assert_eq!( + store_helper::get_flat_storage_creation_status(&store, 0), + FlatStorageCreationStatus::SavingDeltas + ); + assert!(env.clients[0].runtime_adapter.get_flat_storage_state_for_shard(1).is_some()); + assert_eq!( + store_helper::get_flat_storage_creation_status(&store, 1), + FlatStorageCreationStatus::Ready + ); + + wait_for_flat_storage_creation(&mut env, START_HEIGHT); +} + +/// Check that flat storage creation can be started from intermediate state where one +/// of state parts is already fetched. +#[test] +fn test_flat_storage_creation_start_from_state_part() { + init_test_logger(); + // Create several accounts to ensure that state is non-trivial. + let accounts = + (0..4).map(|i| AccountId::from_str(&format!("test{}", i)).unwrap()).collect::>(); + let genesis = Genesis::test(accounts, 1); + let store = create_test_store(); + let shard_layout = ShardLayout::v0_single_shard(); + + // Process some blocks with flat storage. + // Split state into two parts and return trie keys corresponding to each part. + const NUM_PARTS: u64 = 2; + let trie_keys: Vec<_> = { + let mut env = setup_env(&genesis, store.clone()); + for height in 1..START_HEIGHT { + env.produce_block(0, height); + } + + if cfg!(feature = "protocol_feature_flat_state") { + assert_eq!( + store_helper::get_flat_storage_creation_status(&store, 0), + FlatStorageCreationStatus::Ready + ); + } else { + assert_eq!( + store_helper::get_flat_storage_creation_status(&store, 0), + FlatStorageCreationStatus::DontCreate + ); + return; + } + + let block_hash = env.clients[0].chain.get_block_hash_by_height(START_HEIGHT - 1).unwrap(); + let state_root = *env.clients[0] + .chain + .get_chunk_extra(&block_hash, &ShardUId::from_shard_id_and_layout(0, &shard_layout)) + .unwrap() + .state_root(); + let trie = env.clients[0] + .chain + .runtime_adapter + .get_trie_for_shard(0, &block_hash, state_root, true) + .unwrap(); + (0..NUM_PARTS) + .map(|part_id| { + let path_begin = trie.find_path_for_part_boundary(part_id, NUM_PARTS).unwrap(); + let path_end = trie.find_path_for_part_boundary(part_id + 1, NUM_PARTS).unwrap(); + let mut trie_iter = trie.iter().unwrap(); + let mut keys = vec![]; + for item in trie_iter.visit_nodes_interval(&path_begin, &path_end).unwrap() { + if let TrieTraversalItem { key: Some(trie_key), .. } = item { + keys.push(trie_key); + } + } + keys + }) + .collect() + }; + assert!(!trie_keys[0].is_empty()); + assert!(!trie_keys[1].is_empty()); + + #[cfg(feature = "protocol_feature_flat_state")] + { + // Remove keys of part 1 from the flat state. + // Manually set flat storage creation status to the step when it should start from fetching part 1. + let mut store_update = store.store_update(); + for key in trie_keys[1].iter() { + store_update.delete(DBCol::FlatState, key); + } + store_helper::set_fetching_state_status( + &mut store_update, + 0, + FetchingStateStatus { part_id: 1, num_parts_in_step: 1, num_parts: NUM_PARTS }, + ); + store_update.commit().unwrap(); + + // Re-create runtime, check that flat storage is not created yet. + let mut env = setup_env(&genesis, store.clone()); + assert!(env.clients[0].runtime_adapter.get_flat_storage_state_for_shard(0).is_none()); + + // Run chain for a couple of blocks and check that flat storage for shard 0 is eventually created. + let next_height = wait_for_flat_storage_creation(&mut env, START_HEIGHT); + + // Check that all the keys are present in flat storage. + let block_hash = env.clients[0].chain.get_block_hash_by_height(next_height - 1).unwrap(); + let state_root = *env.clients[0] + .chain + .get_chunk_extra(&block_hash, &ShardUId::from_shard_id_and_layout(0, &shard_layout)) + .unwrap() + .state_root(); + let trie = env.clients[0] + .chain + .runtime_adapter + .get_trie_for_shard(0, &block_hash, state_root, true) + .unwrap(); + let flat_state = trie.flat_state.unwrap(); + for part_trie_keys in trie_keys.iter() { + for trie_key in part_trie_keys.iter() { + assert_matches!(flat_state.get_ref(trie_key), Ok(Some(_))); + } + } + } }