diff --git a/.gitignore b/.gitignore index 628f21143..ae9a3bdcb 100644 --- a/.gitignore +++ b/.gitignore @@ -10,5 +10,4 @@ target/ perf.data* .scratch -.DS_Store -storage/tmp/ \ No newline at end of file +.DS_Store \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 52f8fe301..c62c9fb6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2251,6 +2251,7 @@ dependencies = [ "sha2", "smallvec", "sysinfo", + "tempfile", "thiserror", "tokio", "tracing", diff --git a/storage/Cargo.toml b/storage/Cargo.toml index b8760f1e6..d865f5b6f 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -42,6 +42,7 @@ serde_json = "1.0.114" tracing-appender = "0.2.3" tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-test = "0.2" +tempfile = "3.10" [lints] workspace = true diff --git a/storage/src/db/file_db/mapped_file.rs b/storage/src/db/file_db/mapped_file.rs index e671b5bb3..a6c535245 100644 --- a/storage/src/db/file_db/mapped_file.rs +++ b/storage/src/db/file_db/mapped_file.rs @@ -1,3 +1,4 @@ +use std::fs; use std::path::Path; use anyhow::Result; @@ -17,7 +18,14 @@ impl MappedFile { where P: AsRef, { - let file_db = FileDb::open(path)?; + let file_db = FileDb::new( + path, + fs::OpenOptions::new() + .write(true) + .read(true) + .truncate(true) + .create(true), + )?; file_db.file.set_len(length as u64)?; Self::from_existing_file(file_db) diff --git a/storage/src/db/file_db/mod.rs b/storage/src/db/file_db/mod.rs index c4f73b1f0..4be7821b5 100644 --- a/storage/src/db/file_db/mod.rs +++ b/storage/src/db/file_db/mod.rs @@ -2,67 +2,57 @@ use std::fs::File; use std::io::{Read, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; -use anyhow::{Context, Result}; -use everscale_types::models::*; - pub use mapped_file::MappedFile; mod mapped_file; pub struct FileDb { file: File, - path: PathBuf, + _path: PathBuf, } impl FileDb { - pub fn open

(path: P) -> Result + pub fn new

(path: P, options: &mut std::fs::OpenOptions) -> std::io::Result where P: AsRef, { - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .read(true) - .open(&path) - .context(format!("Failed to create file {:?}", path.as_ref()))?; + let file = options.open(&path)?; Ok(Self { file, - path: PathBuf::from(path.as_ref()), + _path: PathBuf::from(path.as_ref()), }) } - pub fn write(&mut self, buf: &[u8]) -> Result<()> { - self.file.write(buf)?; - Ok(()) - } - - pub fn write_all(&mut self, buf: &[u8]) -> Result<()> { - self.file.write_all(buf)?; - Ok(()) - } - - pub fn flush(&mut self) -> Result<()> { - self.file.flush()?; - Ok(()) + pub fn file(&self) -> &File { + &self.file } +} - pub fn seek(&mut self, pos: SeekFrom) -> Result<()> { - self.file.seek(pos)?; - Ok(()) +impl Write for FileDb { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.file.write(buf) } - pub fn file(&self) -> &File { - &self.file + #[inline] + fn flush(&mut self) -> std::io::Result<()> { + self.file.flush() } +} - pub fn read(&mut self, buf: &mut [u8]) -> Result { +impl Read for FileDb { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { let bytes = self.file.read(buf)?; Ok(bytes) } } +impl Seek for FileDb { + fn seek(&mut self, pos: SeekFrom) -> std::io::Result { + self.file.seek(pos) + } +} + impl Into for FileDb { fn into(self) -> File { self.file diff --git a/storage/src/db/kv_db/mod.rs b/storage/src/db/kv_db/mod.rs index 6e2faf8ab..aeed753fd 100644 --- a/storage/src/db/kv_db/mod.rs +++ b/storage/src/db/kv_db/mod.rs @@ -4,7 +4,6 @@ use std::thread::available_parallelism; use anyhow::{Context, Result}; use bytesize::ByteSize; -use serde::{Deserialize, Serialize}; use weedb::{Caches, WeeDb}; pub use weedb::Stats as RocksdbStats; diff --git a/storage/src/utils/cell_writer.rs b/storage/src/store/persistent_state/cell_writer.rs similarity index 83% rename from storage/src/utils/cell_writer.rs rename to storage/src/store/persistent_state/cell_writer.rs index f51468419..bb997e4db 100644 --- a/storage/src/utils/cell_writer.rs +++ b/storage/src/store/persistent_state/cell_writer.rs @@ -8,6 +8,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::{Context, Result}; +use everscale_types::cell::CellDescriptor; use smallvec::SmallVec; use tycho_util::FastHashMap; @@ -29,8 +30,13 @@ impl<'a> CellWriter<'a> { #[allow(unused)] pub fn write(&self, root_hash: &[u8; 32], is_cancelled: Option>) -> Result<()> { // Open target file in advance to get the error immediately (if any) - let file_path = self.base_path.join(hex::encode(root_hash)); - let file_db = FileDb::open(file_path)?; + let file_db = FileDb::new( + self.base_path, + fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true), + )?; // Load cells from db in reverse order into the temp file tracing::info!("started loading cells"); @@ -99,13 +105,13 @@ impl<'a> CellWriter<'a> { .file .read_exact(&mut cell_buffer[..cell_size as usize])?; - let d1 = cell_buffer[0]; - let d2 = cell_buffer[1]; - let ref_count = (d1 & 7) as usize; - let data_size = ((d2 >> 1) + (d2 & 1 != 0) as u8) as usize; + let descriptor = CellDescriptor { + d1: cell_buffer[0], + d2: cell_buffer[1], + }; - let ref_offset = 2 + data_size; - for r in 0..ref_count { + let ref_offset = 2 + descriptor.byte_len() as usize; + for r in 0..descriptor.reference_count() as usize { let ref_offset = ref_offset + r * REF_SIZE; let slice = &mut cell_buffer[ref_offset..ref_offset + REF_SIZE]; @@ -121,11 +127,10 @@ impl<'a> CellWriter<'a> { Ok(()) } - pub fn remove(&self, root_hash: &[u8; 32]) -> Result<()> { - let file_path = self.base_path.join(hex::encode(root_hash)); - fs::remove_file(&file_path).context(format!( + pub fn remove(&self) -> Result<()> { + fs::remove_file(&self.base_path).context(format!( "Failed to remove persistent state file {:?}", - file_path + self.base_path )) } } @@ -150,18 +155,21 @@ fn write_rev_cells>( struct LoadedCell { hash: [u8; 32], - d1: u8, - d2: u8, + descriptor: CellDescriptor, data: SmallVec<[u8; 128]>, indices: SmallVec<[u32; 4]>, } - let file_path = base_path - .as_ref() - .join(hex::encode(root_hash)) - .with_extension("temp"); + let file_path = base_path.as_ref().with_extension("temp"); - let file_db = FileDb::open(&file_path)?; + let file_db = FileDb::new( + &file_path, + fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true), + )?; let remove_on_drop = RemoveOnDrop(file_path); let raw = db.raw().as_ref(); @@ -197,12 +205,17 @@ fn write_rev_cells>( .get_pinned_cf_opt(&cf, hash, read_options)? .ok_or(CellWriterError::CellNotFound)?; - let value = value.as_ref(); + let value = match crate::refcount::strip_refcount(value.as_ref()) { + Some(bytes) => bytes, + None => { + return Err(CellWriterError::CellNotFound.into()); + } + }; if value.is_empty() { return Err(CellWriterError::InvalidCell.into()); } - let (d1, d2, data) = deserialize_cell(&value[1..], &mut references_buffer) + let (descriptor, data) = deserialize_cell(value, &mut references_buffer) .ok_or(CellWriterError::InvalidCell)?; let mut reference_indices = SmallVec::with_capacity(references_buffer.len()); @@ -242,8 +255,7 @@ fn write_rev_cells>( index, StackItem::Loaded(LoadedCell { hash, - d1, - d2, + descriptor, data: SmallVec::from_slice(data), indices: reference_indices, }), @@ -283,7 +295,7 @@ fn write_rev_cells>( cell_sizes.push(cell_size as u8); total_size += cell_size as u64; - temp_file_buffer.write_all(&[loaded.d1, loaded.d2])?; + temp_file_buffer.write_all(&[loaded.descriptor.d1, loaded.descriptor.d2])?; temp_file_buffer.write_all(&loaded.data)?; for index in loaded.indices { let index = remap.get(&index).with_context(|| { @@ -309,56 +321,30 @@ fn write_rev_cells>( fn deserialize_cell<'a>( value: &'a [u8], references_buffer: &mut SmallVec<[[u8; 32]; 4]>, -) -> Option<(u8, u8, &'a [u8])> { +) -> Option<(CellDescriptor, &'a [u8])> { let mut index = Index { value_len: value.len(), offset: 0, }; - index.require(3)?; - let cell_type = value[*index]; - index.advance(1); - let bit_length = u16::from_le_bytes((&value[*index..*index + 2]).try_into().unwrap()); - index.advance(2); + index.require(4)?; + let mut descriptor = CellDescriptor::new([value[*index], value[*index + 1]]); + descriptor.d1 &= !CellDescriptor::STORE_HASHES_MASK; - let d2 = (((bit_length >> 2) as u8) & !0b1) | ((bit_length % 8 != 0) as u8); + index.advance(2); + let bit_length = u16::from_le_bytes([value[*index], value[*index + 1]]); + index.advance(2); - // TODO: Replace with `(big_length + 7) / 8` - let data_len = ((d2 >> 1) + u8::from(d2 & 1 != 0)) as usize; + let data_len = descriptor.byte_len() as usize; index.require(data_len)?; let data = &value[*index..*index + data_len]; + index.advance(data_len); - // NOTE: additional byte is required here due to internal structure - index.advance(((bit_length + 8) / 8) as usize); - - index.require(1)?; - let level_mask = value[*index]; - // skip store_hashes - index.advance(2); - - index.require(2)?; - let has_hashes = value[*index]; - index.advance(1); - if has_hashes != 0 { - let count = value[*index]; - index.advance(1 + (count * 32) as usize); - } - - index.require(2)?; - let has_depths = value[*index]; - index.advance(1); - if has_depths != 0 { - let count = value[*index]; - index.advance(1 + (count * 2) as usize); - } - - index.require(1)?; - let reference_count = value[*index]; - index.advance(1); + assert_eq!((bit_length as usize + 7) / 8, data_len); - let d1 = reference_count | (((cell_type != 0x01) as u8) << 3) | (level_mask << 5); + index.advance((32 + 2) * descriptor.hash_count() as usize); - for _ in 0..reference_count { + for _ in 0..descriptor.reference_count() { index.require(32)?; let mut hash = [0; 32]; hash.copy_from_slice(&value[*index..*index + 32]); @@ -366,7 +352,7 @@ fn deserialize_cell<'a>( index.advance(32); } - Some((d1, d2, data)) + Some((descriptor, data)) } #[cfg(not(target_os = "macos"))] @@ -411,7 +397,7 @@ struct Index { impl Index { #[inline(always)] fn require(&self, len: usize) -> Option<()> { - if self.offset + len < self.value_len { + if self.offset + len <= self.value_len { Some(()) } else { None diff --git a/storage/src/store/persistent_state/mod.rs b/storage/src/store/persistent_state/mod.rs index 9c0e74477..e938bc70b 100644 --- a/storage/src/store/persistent_state/mod.rs +++ b/storage/src/store/persistent_state/mod.rs @@ -1,20 +1,21 @@ use std::fs; -use std::io::SeekFrom; -use std::path::{Path, PathBuf}; +use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::Result; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use everscale_types::cell::HashBytes; use everscale_types::models::BlockId; use tokio::time::Instant; use crate::db::Db; use crate::store::BlockHandleStorage; -use crate::utils::CellWriter; use crate::FileDb; +mod cell_writer; + const KEY_BLOCK_UTIME_STEP: u32 = 86400; pub struct PersistentStateStorage { @@ -55,7 +56,7 @@ impl PersistentStateStorage { let base_path = self.get_state_file_path(&mc_block_id, &block_id); tokio::task::spawn_blocking(move || { - let cell_writer = CellWriter::new(&db, &base_path); + let cell_writer = cell_writer::CellWriter::new(&db, &base_path); match cell_writer.write(&root_hash.0, is_cancelled) { Ok(()) => { tracing::info!( @@ -69,7 +70,7 @@ impl PersistentStateStorage { "writing persistent state failed: {e:?}" ); - if let Err(e) = cell_writer.remove(&root_hash.0) { + if let Err(e) = cell_writer.remove() { tracing::error!(%block_id, "{e}") } } @@ -85,39 +86,49 @@ impl PersistentStateStorage { block_id: &BlockId, offset: u64, size: u64, - ) -> Option> { - // TODO: cache file handles - let mut file_db = FileDb::open(self.get_state_file_path(mc_block_id, block_id)).ok()?; + ) -> Option { + let path = self.get_state_file_path(mc_block_id, block_id); - if let Err(e) = file_db.seek(SeekFrom::Start(offset)) { - tracing::error!("failed to seek state file offset: {e:?}"); - return None; - } + tokio::task::spawn_blocking(move || { + // TODO: cache file handles + let mut file_db = FileDb::new(path, fs::OpenOptions::new().read(true)).ok()?; - // SAFETY: size must be checked - let mut result = BytesMut::with_capacity(size as usize); - let now = Instant::now(); - loop { - match file_db.read(&mut result) { - Ok(bytes_read) => { - tracing::debug!(bytes_read, "reading state file"); - if bytes_read == 0 || bytes_read == size as usize { - break; + if let Err(e) = file_db.seek(SeekFrom::Start(offset)) { + tracing::error!("failed to seek state file offset: {e:?}"); + return None; + } + + let mut buf_reader = BufReader::new(file_db.file()); + + let mut result = BytesMut::zeroed(size as usize); + let mut result_cursor = 0; + + let now = Instant::now(); + loop { + match buf_reader.read(&mut result[result_cursor..]) { + Ok(bytes_read) => { + tracing::info!("Reading state file. Bytes read: {}", bytes_read); + if bytes_read == 0 || bytes_read == size as usize { + break; + } + result_cursor += bytes_read; + } + Err(e) => { + tracing::error!("Failed to read state file. Err: {e:?}"); + return None; } - } - Err(e) => { - tracing::error!("failed to read state file. Err: {e:?}"); - return None; } } - } - tracing::info!( - "Finished reading buffer after: {} ms", - now.elapsed().as_millis() - ); + tracing::info!( + "Finished reading buffer after: {} ms", + now.elapsed().as_millis() + ); - // TODO: use `Bytes` - Some(result.to_vec()) + Some(result.freeze()) + }) + .await + .ok() + .flatten() } pub fn state_exists(&self, mc_block_id: &BlockId, block_id: &BlockId) -> bool { @@ -139,6 +150,7 @@ impl PersistentStateStorage { self.storage_path .clone() .join(mc_block_id.seqno.to_string()) + .join(block_id.root_hash.to_string()) } pub fn cancel(&self) { diff --git a/storage/src/store/runtime/persistent_state_keeper.rs b/storage/src/store/runtime/persistent_state_keeper.rs index 49f012b52..453c8b0ac 100644 --- a/storage/src/store/runtime/persistent_state_keeper.rs +++ b/storage/src/store/runtime/persistent_state_keeper.rs @@ -30,6 +30,8 @@ impl PersistentStateKeeper { } pub fn update(&self, block_handle: &Arc) -> Result<()> { + println!("UPDATE"); + if !self.initialized.load(Ordering::Acquire) { let prev_persistent_key_block = self .block_handle_storage diff --git a/storage/src/store/shard_state/cell_storage.rs b/storage/src/store/shard_state/cell_storage.rs index 99b211685..6d56d4bec 100644 --- a/storage/src/store/shard_state/cell_storage.rs +++ b/storage/src/store/shard_state/cell_storage.rs @@ -427,6 +427,7 @@ impl StorageCell { target.extend_from_slice(&[descriptor.d1, descriptor.d2]); target.extend_from_slice(&cell.bit_len().to_le_bytes()); target.extend_from_slice(cell.data()); + assert_eq!(cell.data().len(), descriptor.byte_len() as usize); for i in 0..descriptor.hash_count() { target.extend_from_slice(cell.hash(i).as_array()); @@ -616,7 +617,6 @@ impl CellImpl for StorageCell { impl Drop for StorageCell { fn drop(&mut self) { - println!("DROPPING"); self.cell_storage.drop_cell(DynCell::repr_hash(self)); for i in 0..4 { let state = self.reference_states[i].load(Ordering::Acquire); diff --git a/storage/src/store/shard_state/mod.rs b/storage/src/store/shard_state/mod.rs index 5603b4443..8533eb200 100644 --- a/storage/src/store/shard_state/mod.rs +++ b/storage/src/store/shard_state/mod.rs @@ -1,4 +1,3 @@ -use std::fs::File; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -7,6 +6,8 @@ use std::time::Instant; use anyhow::{Context, Result}; use everscale_types::models::*; use everscale_types::prelude::{Cell, HashBytes}; +use tycho_block_util::block::*; +use tycho_block_util::state::*; use self::cell_storage::*; use self::replace_transaction::ShardStateReplaceTransaction; @@ -15,9 +16,6 @@ use crate::db::*; use crate::utils::*; use crate::{models::BlockHandle, BlockHandleStorage, BlockStorage}; -use tycho_block_util::block::*; -use tycho_block_util::state::*; - mod cell_storage; mod entries_buffer; mod replace_transaction; diff --git a/storage/src/store/shard_state/replace_transaction.rs b/storage/src/store/shard_state/replace_transaction.rs index ac18ad308..35d75ddf9 100644 --- a/storage/src/store/shard_state/replace_transaction.rs +++ b/storage/src/store/shard_state/replace_transaction.rs @@ -1,4 +1,3 @@ -use std::fs::File; use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -14,7 +13,6 @@ use crate::db::*; use crate::utils::*; use tycho_block_util::state::*; -use tycho_block_util::*; use tycho_util::progress_bar::*; use tycho_util::FastHashMap; @@ -61,8 +59,6 @@ impl<'a> ShardStateReplaceTransaction<'a> { packet: Vec, progress_bar: &mut ProgressBar, ) -> Result { - use std::io::Write; - let cells_file = self.file_ctx.cells_file()?; self.reader.set_next_packet(packet); @@ -120,7 +116,7 @@ impl<'a> ShardStateReplaceTransaction<'a> { Ok(true) } - pub async fn finalize( + pub fn finalize( mut self, block_id: BlockId, progress_bar: &mut ProgressBar, @@ -214,7 +210,6 @@ impl<'a> ShardStateReplaceTransaction<'a> { } progress_bar.set_progress((total_size - file_pos) as u64); - tokio::task::yield_now().await; } if batch_len > 0 { @@ -253,7 +248,7 @@ impl<'a> ShardStateReplaceTransaction<'a> { &self, ctx: &mut FinalizationContext<'_>, cell_index: u32, - mut cell: RawCell<'_>, + cell: RawCell<'_>, ) -> Result<()> { use sha2::{Digest, Sha256}; @@ -310,6 +305,7 @@ impl<'a> ShardStateReplaceTransaction<'a> { }; let mut max_depths = [0u16; 4]; + let mut temp_descriptor = cell.descriptor; for i in 0..hash_count { let mut hasher = Sha256::new(); @@ -319,9 +315,9 @@ impl<'a> ShardStateReplaceTransaction<'a> { LevelMask::from_level(i) }; - cell.descriptor.d1 &= !(CellDescriptor::LEVEL_MASK | CellDescriptor::STORE_HASHES_MASK); - cell.descriptor.d1 |= u8::from(level_mask) << 5; - hasher.update([cell.descriptor.d1, cell.descriptor.d2]); + temp_descriptor.d1 &= !(CellDescriptor::LEVEL_MASK | CellDescriptor::STORE_HASHES_MASK); + temp_descriptor.d1 |= u8::from(level_mask) << 5; + hasher.update([temp_descriptor.d1, temp_descriptor.d2]); if i == 0 { hasher.update(cell.data); @@ -503,7 +499,14 @@ impl FilesContext { let cells_path = root_path.as_ref().join(format!("state_cells_{block_id}")); let hashes_path = root_path.as_ref().join(format!("state_hashes_{block_id}")); - let cells_file = Some(FileDb::open(&cells_path)?); + let cells_file = Some(FileDb::new( + &cells_path, + std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .read(true), + )?); Ok(Self { cells_file, diff --git a/storage/src/utils/mod.rs b/storage/src/utils/mod.rs index 359217fe1..d28b57e54 100644 --- a/storage/src/utils/mod.rs +++ b/storage/src/utils/mod.rs @@ -1,5 +1,3 @@ -pub use self::cell_writer::*; pub use self::stored_value::*; -mod cell_writer; mod stored_value; diff --git a/storage/src/utils/stored_value.rs b/storage/src/utils/stored_value.rs index 50186c97d..813edb106 100644 --- a/storage/src/utils/stored_value.rs +++ b/storage/src/utils/stored_value.rs @@ -4,7 +4,6 @@ use smallvec::SmallVec; use anyhow::Result; use everscale_types::cell::HashBytes; use everscale_types::models::{BlockId, BlockIdShort, ShardIdent}; -use tokio::io::AsyncReadExt; use tycho_util::byte_reader::ByteOrderRead; /// A trait for writing or reading data from a stack-allocated buffer diff --git a/storage/tests/mod.rs b/storage/tests/mod.rs index 793f2c307..46294c55a 100644 --- a/storage/tests/mod.rs +++ b/storage/tests/mod.rs @@ -1,16 +1,12 @@ -use std::path::Path; -use std::str::FromStr; -use std::time::Duration; - use anyhow::{anyhow, Result}; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytesize::ByteSize; use everscale_types::boc::Boc; -use everscale_types::cell::{Cell, HashBytes}; +use everscale_types::cell::{Cell, DynCell, HashBytes}; use everscale_types::models::{BlockId, ShardIdent, ShardState}; use serde::{Deserialize, Deserializer}; -use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff}; +use tycho_block_util::state::ShardStateStuff; use tycho_storage::{BlockMetaData, Db, DbOptions, Storage}; #[derive(Clone)] @@ -22,30 +18,11 @@ struct ShardStateCombined { impl ShardStateCombined { fn from_file(path: impl AsRef) -> Result { let bytes = std::fs::read(path.as_ref())?; - let cell = Boc::decode(bytes)?; + let cell = Boc::decode(&bytes)?; let state = cell.parse()?; Ok(Self { cell, state }) } - fn short_id(&self) -> ShardShortId { - match &self.state { - ShardState::Unsplit(s) => ShardShortId::Unsplit { - seqno: s.seqno, - shard_ident: s.shard_ident, - }, - ShardState::Split(s) => { - let left = s.left.load().unwrap(); - let right = s.right.load().unwrap(); - ShardShortId::Split { - left_seqno: left.seqno, - left_shard_ident: left.shard_ident, - right_seqno: right.seqno, - right_shard_ident: right.shard_ident, - } - } - } - } - fn gen_utime(&self) -> Option { match &self.state { ShardState::Unsplit(s) => Some(s.gen_utime), @@ -56,39 +33,7 @@ impl ShardStateCombined { fn min_ref_mc_seqno(&self) -> Option { match &self.state { ShardState::Unsplit(s) => Some(s.min_ref_mc_seqno), - ShardState::Split(s) => None, - } - } -} - -#[derive(Debug)] -enum ShardShortId { - Unsplit { - seqno: u32, - shard_ident: ShardIdent, - }, - Split { - left_seqno: u32, - left_shard_ident: ShardIdent, - right_seqno: u32, - right_shard_ident: ShardIdent, - }, -} - -impl ShardShortId { - pub fn shard_ident(&self) -> ShardIdent { - match self { - ShardShortId::Unsplit { shard_ident, .. } => *shard_ident, - ShardShortId::Split { - left_shard_ident, .. - } => *left_shard_ident, - } - } - - pub fn seqno(&self) -> u32 { - match self { - ShardShortId::Unsplit { seqno, .. } => *seqno, - ShardShortId::Split { left_seqno, .. } => *left_seqno, + ShardState::Split(_) => None, } } } @@ -191,80 +136,130 @@ impl TryFrom for GlobalConfig { } } +fn compare_cells(orig_cell: &DynCell, stored_cell: &DynCell) { + assert_eq!(orig_cell.repr_hash(), stored_cell.repr_hash()); + + let l = orig_cell.descriptor(); + let r = stored_cell.descriptor(); + + assert_eq!(l.d1, r.d1); + assert_eq!(l.d2, r.d2); + assert_eq!(orig_cell.data(), stored_cell.data()); + + for (orig_cell, stored_cell) in std::iter::zip(orig_cell.references(), stored_cell.references()) + { + compare_cells(orig_cell, stored_cell); + } +} + #[tokio::test] -async fn storage_init() { +async fn persistent_storage_everscale() -> Result<()> { tracing_subscriber::fmt::try_init().ok(); - tracing::info!("connect_new_node_to_bootstrap"); - let root_path = Path::new("tmp"); + let tmp_dir = tempfile::tempdir()?; + let root_path = tmp_dir.path(); + + // Init rocksdb let db_options = DbOptions { rocksdb_lru_capacity: ByteSize::kb(1024), cells_cache_size: ByteSize::kb(1024), }; - let db = Db::open(root_path.join("db_storage"), db_options).unwrap(); + let db = Db::open(root_path.join("db_storage"), db_options)?; + // Init storage let storage = Storage::new( db, root_path.join("file_storage"), db_options.cells_cache_size.as_u64(), - ) - .unwrap(); + )?; assert!(storage.node_state().load_init_mc_block_id().is_err()); // Read zerostate - let zero_state = ShardStateCombined::from_file("tests/everscale_zerostate.boc").unwrap(); + let zero_state_raw = ShardStateCombined::from_file("tests/everscale_zerostate.boc")?; // Read global config - let global_config = GlobalConfig::from_file("tests/global-config.json").unwrap(); + let global_config = GlobalConfig::from_file("tests/global-config.json")?; // Write zerostate to db - let (handle, _) = storage - .block_handle_storage() - .create_or_load_handle( - &global_config.block_id, - BlockMetaData::zero_state(zero_state.gen_utime().unwrap()), - ) - .unwrap(); + let (handle, _) = storage.block_handle_storage().create_or_load_handle( + &global_config.block_id, + BlockMetaData::zero_state(zero_state_raw.gen_utime().unwrap()), + )?; - let state = ShardStateStuff::new( + let zerostate = ShardStateStuff::new( global_config.block_id, - zero_state.cell.clone(), + zero_state_raw.cell.clone(), storage.shard_state_storage().min_ref_mc_state(), - ) - .unwrap(); + )?; storage .shard_state_storage() - .store_state(&handle, &state) - .await - .unwrap(); + .store_state(&handle, &zerostate) + .await?; + // Check seqno let min_ref_mc_state = storage.shard_state_storage().min_ref_mc_state(); - assert_eq!(min_ref_mc_state.seqno(), zero_state.min_ref_mc_seqno()); + assert_eq!(min_ref_mc_state.seqno(), zero_state_raw.min_ref_mc_seqno()); + + // Load zerostate from db + let loaded_state = storage + .shard_state_storage() + .load_state(zerostate.block_id()) + .await?; + + assert_eq!(zerostate.state(), loaded_state.state()); + assert_eq!(zerostate.block_id(), loaded_state.block_id()); + assert_eq!(zerostate.root_cell(), loaded_state.root_cell()); + + compare_cells( + zerostate.root_cell().as_ref(), + loaded_state.root_cell().as_ref(), + ); - // Write persistent state + // Write persistent state to file let persistent_state_keeper = storage.runtime_storage().persistent_state_keeper(); assert!(persistent_state_keeper.current().is_none()); storage .persistent_state_storage() - .prepare_persistent_states_dir(&state.block_id()) - .unwrap(); + .prepare_persistent_states_dir(&zerostate.block_id())?; storage .persistent_state_storage() .save_state( - &state.block_id(), - &state.block_id(), - zero_state.cell.repr_hash(), + &zerostate.block_id(), + &zerostate.block_id(), + zero_state_raw.cell.repr_hash(), + ) + .await?; + + // Check if state exists + let exist = storage + .persistent_state_storage() + .state_exists(&zerostate.block_id(), &zerostate.block_id()); + assert_eq!(exist, true); + + // Read persistent state + let offset = 0u64; + let max_size = 1_000_000u64; + + let persistent_state_storage = storage.persistent_state_storage(); + let persistent_state_data = persistent_state_storage + .read_state_part( + &zerostate.block_id(), + &zerostate.block_id(), + offset, + max_size, ) .await .unwrap(); - tokio::time::sleep(Duration::from_secs(10)).await; + // Check state + let cell = Boc::decode(&persistent_state_data)?; + assert_eq!(&cell, zerostate.root_cell()); - //println!("{:?}", zero_state.state); - //println!("{:?}", global_config); + // Clear files for test + tmp_dir.close()?; - //std::fs::remove_dir_all(root_path).unwrap() + Ok(()) }