diff --git a/.gitignore b/.gitignore index ca739ab20..628f21143 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,7 @@ target/ .fleet/ perf.data* -.scratch \ No newline at end of file +.scratch + +.DS_Store +storage/tmp/ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 1e55d55df..52f8fe301 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -196,6 +196,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" + [[package]] name = "base64ct" version = "1.6.0" @@ -665,7 +671,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff3c058b07bdb5414da10bc8a2489715e31b0c3f4274a213c1a23831e9d94e91" dependencies = [ "ahash", - "base64", + "base64 0.21.7", "bitflags 2.4.2", "crc32c", "everscale-crypto", @@ -1123,7 +1129,7 @@ version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310" dependencies = [ - "base64", + "base64 0.21.7", "serde", ] @@ -2173,7 +2179,7 @@ dependencies = [ "ahash", "anyhow", "argh", - "base64", + "base64 0.21.7", "bytes", "castaway", "dashmap", @@ -2224,6 +2230,7 @@ version = "0.0.1" dependencies = [ "anyhow", "arc-swap", + "base64 0.22.0", "bumpalo", "bytes", "bytesize", @@ -2240,12 +2247,16 @@ dependencies = [ "quick_cache", "rlimit", "serde", + "serde_json", "sha2", "smallvec", "sysinfo", "thiserror", "tokio", "tracing", + "tracing-appender", + "tracing-subscriber", + "tracing-test", "triomphe", "tycho-block-util", "tycho-util", diff --git a/storage/Cargo.toml b/storage/Cargo.toml index aea2cc4e8..b8760f1e6 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -36,5 +36,12 @@ weedb = "0.1.1" tycho-block-util = { path = "../block-util" } tycho-util = { path = "../util" } +[dev-dependencies] +base64 = "0.22.0" +serde_json = "1.0.114" +tracing-appender = "0.2.3" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-test = "0.2" + [lints] workspace = true diff --git a/storage/src/db/kv_db/mod.rs b/storage/src/db/kv_db/mod.rs index 7e9c434a8..6e2faf8ab 100644 --- a/storage/src/db/kv_db/mod.rs +++ b/storage/src/db/kv_db/mod.rs @@ -10,6 +10,8 @@ use weedb::{Caches, WeeDb}; pub use weedb::Stats as RocksdbStats; pub use weedb::{rocksdb, BoundedCfHandle, ColumnFamily, Table}; +pub use self::config::DbOptions; + pub mod refcount; pub mod tables; @@ -247,56 +249,3 @@ impl Drop for Db { self.raw().cancel_all_background_work(true); } } - -#[derive(Debug, Copy, Clone, Serialize, Deserialize)] -#[serde(deny_unknown_fields, default)] -pub struct DbOptions { - pub rocksdb_lru_capacity: ByteSize, - pub cells_cache_size: ByteSize, -} - -impl Default for DbOptions { - fn default() -> Self { - // Fetch the currently available memory in bytes - let available = { - let mut sys = sysinfo::System::new(); - sys.refresh_memory(); - sys.available_memory() - }; - - // Estimated memory usage of components other than cache: - // - 2 GiBs for write buffers(4 if we are out of luck and all memtables are being flushed at the same time) - // - 2 GiBs for indexer logic - // - 10 bits per cell for bloom filter. Realistic case is 100M cells, so 0.25 GiBs - // - 1/3 of all available memory is reserved for kernel buffers - const WRITE_BUFFERS: ByteSize = ByteSize::gib(2); - const INDEXER_LOGIC: ByteSize = ByteSize::gib(2); - const BLOOM_FILTER: ByteSize = ByteSize::mib(256); - let estimated_memory_usage = WRITE_BUFFERS + INDEXER_LOGIC + BLOOM_FILTER + available / 3; - - // Reduce the available memory by the fixed offset - let available = available - .checked_sub(estimated_memory_usage.as_u64()) - .unwrap_or_else(|| { - tracing::error!( - "Not enough memory for cache, using 1/4 of all available memory. \ - Tweak `db_options` in config to improve performance." - ); - available / 4 - }); - - // We will use 3/4 of available memory for the cells cache (at most 4 GB). - let cells_cache_size = std::cmp::min(ByteSize(available * 4 / 3), ByteSize::gib(4)); - - // The reset of the memory is used for LRU cache (at least 128 MB) - let rocksdb_lru_capacity = std::cmp::max( - ByteSize(available.saturating_sub(cells_cache_size.as_u64())), - ByteSize::mib(128), - ); - - Self { - rocksdb_lru_capacity, - cells_cache_size, - } - } -} diff --git a/storage/src/lib.rs b/storage/src/lib.rs index b1b8d4f48..73b31cfdc 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -23,15 +23,69 @@ pub struct Storage { } impl Storage { + pub fn new( + db: Arc, + file_db_path: PathBuf, + max_cell_cache_size_bytes: u64, + ) -> anyhow::Result> { + let block_handle_storage = Arc::new(BlockHandleStorage::new(db.clone())); + let runtime_storage = Arc::new(RuntimeStorage::new(block_handle_storage.clone())); + let block_storage = Arc::new(BlockStorage::new(db.clone(), block_handle_storage.clone())?); + let shard_state_storage = ShardStateStorage::new( + db.clone(), + block_handle_storage.clone(), + block_storage.clone(), + file_db_path.clone(), + max_cell_cache_size_bytes, + )?; + let persistent_state_storage = PersistentStateStorage::new( + file_db_path.clone(), + db.clone(), + block_handle_storage.clone(), + )?; + let node_state_storage = NodeStateStorage::new(db.clone()); + let block_connection_storage = BlockConnectionStorage::new(db); + + Ok(Arc::new(Self { + file_db_path, + + block_handle_storage, + block_storage, + shard_state_storage, + persistent_state_storage, + block_connection_storage, + node_state_storage, + runtime_storage, + })) + } + + #[inline(always)] + pub fn runtime_storage(&self) -> &RuntimeStorage { + &self.runtime_storage + } + + #[inline(always)] + pub fn persistent_state_storage(&self) -> &PersistentStateStorage { + &self.persistent_state_storage + } + + #[inline(always)] pub fn block_handle_storage(&self) -> &BlockHandleStorage { &self.block_handle_storage } + #[inline(always)] pub fn block_connection_storage(&self) -> &BlockConnectionStorage { &self.block_connection_storage } + #[inline(always)] pub fn shard_state_storage(&self) -> &ShardStateStorage { &self.shard_state_storage } + + #[inline(always)] + pub fn node_state(&self) -> &NodeStateStorage { + &self.node_state_storage + } } diff --git a/storage/src/store/block_connection/mod.rs b/storage/src/store/block_connection/mod.rs index 50ad26ece..3a6dd2f55 100644 --- a/storage/src/store/block_connection/mod.rs +++ b/storage/src/store/block_connection/mod.rs @@ -13,8 +13,8 @@ pub struct BlockConnectionStorage { } impl BlockConnectionStorage { - pub fn new(db: Arc) -> Result { - Ok(Self { db }) + pub fn new(db: Arc) -> Self { + Self { db } } pub fn store_connection( diff --git a/storage/src/store/block_handle/mod.rs b/storage/src/store/block_handle/mod.rs index 535a62623..1790047c9 100644 --- a/storage/src/store/block_handle/mod.rs +++ b/storage/src/store/block_handle/mod.rs @@ -16,11 +16,11 @@ pub struct BlockHandleStorage { } impl BlockHandleStorage { - pub fn new(db: Arc) -> Result { - Ok(Self { + pub fn new(db: Arc) -> Self { + Self { db, cache: Arc::new(Default::default()), - }) + } } pub fn store_block_applied(&self, handle: &Arc) -> Result { diff --git a/storage/src/store/node_state/mod.rs b/storage/src/store/node_state/mod.rs index 02c45eed5..a9ae5926a 100644 --- a/storage/src/store/node_state/mod.rs +++ b/storage/src/store/node_state/mod.rs @@ -15,13 +15,13 @@ pub struct NodeStateStorage { } impl NodeStateStorage { - pub fn new(db: Arc) -> Result { - Ok(Self { + pub fn new(db: Arc) -> Self { + Self { db, last_mc_block_id: (Default::default(), LAST_MC_BLOCK_ID), init_mc_block_id: (Default::default(), INIT_MC_BLOCK_ID), shards_client_mc_block_id: (Default::default(), SHARDS_CLIENT_MC_BLOCK_ID), - }) + } } pub fn store_historical_sync_start(&self, id: &BlockId) -> Result<()> { diff --git a/storage/src/store/persistent_state/mod.rs b/storage/src/store/persistent_state/mod.rs index 7c1d43da2..9c0e74477 100644 --- a/storage/src/store/persistent_state/mod.rs +++ b/storage/src/store/persistent_state/mod.rs @@ -139,7 +139,6 @@ impl PersistentStateStorage { self.storage_path .clone() .join(mc_block_id.seqno.to_string()) - .join(block_id.root_hash.to_string()) } pub fn cancel(&self) { diff --git a/storage/src/store/shard_state/cell_storage.rs b/storage/src/store/shard_state/cell_storage.rs index 16112cc95..99b211685 100644 --- a/storage/src/store/shard_state/cell_storage.rs +++ b/storage/src/store/shard_state/cell_storage.rs @@ -20,20 +20,20 @@ pub struct CellStorage { } impl CellStorage { - pub fn new(db: Arc, cache_size_bytes: u64) -> Result> { + pub fn new(db: Arc, cache_size_bytes: u64) -> Arc { let cells_cache = Default::default(); let raw_cells_cache = RawCellsCache::new(cache_size_bytes); - Ok(Arc::new(Self { + Arc::new(Self { db, cells_cache, raw_cells_cache, - })) + }) } pub fn store_cell( &self, - batch: &mut weedb::rocksdb::WriteBatch, + batch: &mut rocksdb::WriteBatch, root: Cell, ) -> Result { struct CellWithRefs<'a> { diff --git a/storage/src/store/shard_state/mod.rs b/storage/src/store/shard_state/mod.rs index cde8dedda..5603b4443 100644 --- a/storage/src/store/shard_state/mod.rs +++ b/storage/src/store/shard_state/mod.rs @@ -46,7 +46,7 @@ impl ShardStateStorage { cache_size_bytes: u64, ) -> Result { let downloads_dir = prepare_file_db_dir(file_db_path, "downloads")?; - let cell_storage = CellStorage::new(db.clone(), cache_size_bytes)?; + let cell_storage = CellStorage::new(db.clone(), cache_size_bytes); let res = Self { db, diff --git a/storage/tests/everscale_zerostate.boc b/storage/tests/everscale_zerostate.boc new file mode 100644 index 000000000..6cea5582d Binary files /dev/null and b/storage/tests/everscale_zerostate.boc differ diff --git a/storage/tests/global-config.json b/storage/tests/global-config.json new file mode 100644 index 000000000..bd4533a2f --- /dev/null +++ b/storage/tests/global-config.json @@ -0,0 +1,22 @@ +{ + "@type": "config.global", + "dht": { + "@type": "dht.config.global", + "k": 6, + "a": 3, + "static_nodes": { + "@type": "dht.nodes", + "nodes": [] + } + }, + "validator": { + "@type": "validator.config.global", + "zero_state": { + "workchain": -1, + "shard": -9223372036854775808, + "seqno": 0, + "root_hash": "WP/KGheNr/cF3lQhblQzyb0ufYUAcNM004mXhHq56EU=", + "file_hash": "0nC4eylStbp9qnCq8KjDYb789NjS25L5ZA1UQwcIOOQ=" + } + } +} \ No newline at end of file diff --git a/storage/tests/mod.rs b/storage/tests/mod.rs new file mode 100644 index 000000000..793f2c307 --- /dev/null +++ b/storage/tests/mod.rs @@ -0,0 +1,270 @@ +use std::path::Path; +use std::str::FromStr; +use std::time::Duration; + +use anyhow::{anyhow, Result}; +use base64::prelude::BASE64_STANDARD; +use base64::Engine; +use bytesize::ByteSize; +use everscale_types::boc::Boc; +use everscale_types::cell::{Cell, HashBytes}; +use everscale_types::models::{BlockId, ShardIdent, ShardState}; +use serde::{Deserialize, Deserializer}; +use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff}; +use tycho_storage::{BlockMetaData, Db, DbOptions, Storage}; + +#[derive(Clone)] +struct ShardStateCombined { + cell: Cell, + state: ShardState, +} + +impl ShardStateCombined { + fn from_file(path: impl AsRef) -> Result { + let bytes = std::fs::read(path.as_ref())?; + let cell = Boc::decode(bytes)?; + let state = cell.parse()?; + Ok(Self { cell, state }) + } + + fn short_id(&self) -> ShardShortId { + match &self.state { + ShardState::Unsplit(s) => ShardShortId::Unsplit { + seqno: s.seqno, + shard_ident: s.shard_ident, + }, + ShardState::Split(s) => { + let left = s.left.load().unwrap(); + let right = s.right.load().unwrap(); + ShardShortId::Split { + left_seqno: left.seqno, + left_shard_ident: left.shard_ident, + right_seqno: right.seqno, + right_shard_ident: right.shard_ident, + } + } + } + } + + fn gen_utime(&self) -> Option { + match &self.state { + ShardState::Unsplit(s) => Some(s.gen_utime), + ShardState::Split(_) => None, + } + } + + fn min_ref_mc_seqno(&self) -> Option { + match &self.state { + ShardState::Unsplit(s) => Some(s.min_ref_mc_seqno), + ShardState::Split(s) => None, + } + } +} + +#[derive(Debug)] +enum ShardShortId { + Unsplit { + seqno: u32, + shard_ident: ShardIdent, + }, + Split { + left_seqno: u32, + left_shard_ident: ShardIdent, + right_seqno: u32, + right_shard_ident: ShardIdent, + }, +} + +impl ShardShortId { + pub fn shard_ident(&self) -> ShardIdent { + match self { + ShardShortId::Unsplit { shard_ident, .. } => *shard_ident, + ShardShortId::Split { + left_shard_ident, .. + } => *left_shard_ident, + } + } + + pub fn seqno(&self) -> u32 { + match self { + ShardShortId::Unsplit { seqno, .. } => *seqno, + ShardShortId::Split { left_seqno, .. } => *left_seqno, + } + } +} + +#[derive(Deserialize)] +struct GlobalConfigJson { + validator: ValidatorJson, +} + +#[derive(Deserialize)] +struct ValidatorJson { + zero_state: BlockIdJson, +} + +#[derive(Debug, Default)] +pub struct BlockIdJson { + workchain: i32, + shard: u64, + seqno: u32, + root_hash: HashBytes, + file_hash: HashBytes, +} + +impl<'de> Deserialize<'de> for BlockIdJson { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + use serde::de::Error; + + #[derive(Deserialize)] + struct BlockIdJsonHelper { + workchain: i32, + shard: i64, + seqno: u32, + root_hash: String, + file_hash: String, + } + + let BlockIdJsonHelper { + workchain, + shard, + seqno, + root_hash, + file_hash, + } = BlockIdJsonHelper::deserialize(deserializer)?; + + let shard = shard as u64; + + let mut result = Self { + workchain, + shard, + seqno, + ..Default::default() + }; + + result.root_hash = + HashBytes::from_slice(&BASE64_STANDARD.decode(root_hash).map_err(Error::custom)?); + + result.file_hash = + HashBytes::from_slice(&BASE64_STANDARD.decode(file_hash).map_err(Error::custom)?); + + Ok(result) + } +} + +impl TryFrom for BlockId { + type Error = anyhow::Error; + + fn try_from(value: BlockIdJson) -> Result { + Ok(Self { + shard: ShardIdent::new(value.workchain, value.shard) + .ok_or(anyhow!("Invalid ShardIdent"))?, + seqno: value.seqno, + root_hash: value.root_hash, + file_hash: value.file_hash, + }) + } +} + +#[derive(Debug)] +struct GlobalConfig { + block_id: BlockId, +} + +impl GlobalConfig { + pub fn from_file(path: impl AsRef) -> Result { + let data = std::fs::read_to_string(path.as_ref())?; + Ok(serde_json::from_str::(&data)?.try_into()?) + } +} + +impl TryFrom for GlobalConfig { + type Error = anyhow::Error; + + fn try_from(value: GlobalConfigJson) -> Result { + Ok(Self { + block_id: value.validator.zero_state.try_into()?, + }) + } +} + +#[tokio::test] +async fn storage_init() { + tracing_subscriber::fmt::try_init().ok(); + tracing::info!("connect_new_node_to_bootstrap"); + + let root_path = Path::new("tmp"); + let db_options = DbOptions { + rocksdb_lru_capacity: ByteSize::kb(1024), + cells_cache_size: ByteSize::kb(1024), + }; + let db = Db::open(root_path.join("db_storage"), db_options).unwrap(); + + let storage = Storage::new( + db, + root_path.join("file_storage"), + db_options.cells_cache_size.as_u64(), + ) + .unwrap(); + assert!(storage.node_state().load_init_mc_block_id().is_err()); + + // Read zerostate + let zero_state = ShardStateCombined::from_file("tests/everscale_zerostate.boc").unwrap(); + + // Read global config + let global_config = GlobalConfig::from_file("tests/global-config.json").unwrap(); + + // Write zerostate to db + let (handle, _) = storage + .block_handle_storage() + .create_or_load_handle( + &global_config.block_id, + BlockMetaData::zero_state(zero_state.gen_utime().unwrap()), + ) + .unwrap(); + + let state = ShardStateStuff::new( + global_config.block_id, + zero_state.cell.clone(), + storage.shard_state_storage().min_ref_mc_state(), + ) + .unwrap(); + + storage + .shard_state_storage() + .store_state(&handle, &state) + .await + .unwrap(); + + let min_ref_mc_state = storage.shard_state_storage().min_ref_mc_state(); + assert_eq!(min_ref_mc_state.seqno(), zero_state.min_ref_mc_seqno()); + + // Write persistent state + let persistent_state_keeper = storage.runtime_storage().persistent_state_keeper(); + assert!(persistent_state_keeper.current().is_none()); + + storage + .persistent_state_storage() + .prepare_persistent_states_dir(&state.block_id()) + .unwrap(); + + storage + .persistent_state_storage() + .save_state( + &state.block_id(), + &state.block_id(), + zero_state.cell.repr_hash(), + ) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_secs(10)).await; + + //println!("{:?}", zero_state.state); + //println!("{:?}", global_config); + + //std::fs::remove_dir_all(root_path).unwrap() +}