diff --git a/collator/src/state_node.rs b/collator/src/state_node.rs index e5d4edbbb..d991d34e5 100644 --- a/collator/src/state_node.rs +++ b/collator/src/state_node.rs @@ -167,7 +167,7 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl { //TODO: make real implementation //STUB: create dummy blcok handle let handle = BlockHandle::new( - block.block_id, + &block.block_id, Default::default(), Arc::new(Default::default()), ); diff --git a/core/src/block_strider/state.rs b/core/src/block_strider/state.rs index dc035fdd7..ebf79ea6b 100644 --- a/core/src/block_strider/state.rs +++ b/core/src/block_strider/state.rs @@ -6,7 +6,9 @@ use tycho_storage::Storage; pub trait BlockStriderState: Send + Sync + 'static { fn load_last_traversed_master_block_id(&self) -> BlockId; + fn is_traversed(&self, block_id: &BlockId) -> bool; + fn commit_traversed(&self, block_id: &BlockId); } @@ -18,17 +20,12 @@ impl BlockStriderState for Arc { } fn is_traversed(&self, block_id: &BlockId) -> bool { - self.block_handle_storage() - .load_handle(block_id) - .expect("db is dead") - .is_some() + self.block_handle_storage().load_handle(block_id).is_some() } fn commit_traversed(&self, block_id: &BlockId) { if block_id.is_masterchain() { - self.node_state() - .store_last_mc_block_id(block_id) - .expect("db is dead"); + self.node_state().store_last_mc_block_id(block_id); } // other blocks are stored with state applier: todo rework this? } diff --git a/core/src/block_strider/state_applier.rs b/core/src/block_strider/state_applier.rs index 3717dc045..bbc237549 100644 --- a/core/src/block_strider/state_applier.rs +++ b/core/src/block_strider/state_applier.rs @@ -49,6 +49,7 @@ where tracing::info!(id = ?cx.block.id(), "applying block"); let state_storage = self.inner.storage.shard_state_storage(); + let handle_storage = self.inner.storage.block_handle_storage(); // Load handle let handle = self @@ -130,9 +131,7 @@ where metrics::histogram!("tycho_subscriber_handle_block_seconds").record(started_at.elapsed()); // Mark block as applied - handle.meta().set_is_applied(); - let handle_storage = self.inner.storage.block_handle_storage(); - handle_storage.store_handle(&handle)?; + handle_storage.store_block_applied(&handle); // Done Ok(()) @@ -143,7 +142,7 @@ where mc_block_id: &BlockId, block: &BlockStuff, archive_data: &ArchiveData, - ) -> Result> { + ) -> Result { let block_storage = self.inner.storage.block_storage(); let info = block.load_info()?; @@ -259,11 +258,7 @@ pub mod test { .unwrap(); for block in &blocks { - let handle = storage - .block_handle_storage() - .load_handle(block) - .unwrap() - .unwrap(); + let handle = storage.block_handle_storage().load_handle(block).unwrap(); assert!(handle.meta().is_applied()); storage .shard_state_storage() @@ -306,7 +301,7 @@ pub mod test { let (handle, _) = storage.block_handle_storage().create_or_load_handle( &block_id, BlockMetaData::zero_state(master.state().gen_utime), - )?; + ); storage .shard_state_storage() @@ -331,16 +326,13 @@ pub mod test { let (handle, _) = storage.block_handle_storage().create_or_load_handle( &shard_id, BlockMetaData::zero_state(shard.state().gen_utime), - )?; + ); storage .shard_state_storage() .store_state(&handle, &shard) .await?; - storage - .node_state() - .store_last_mc_block_id(&master_id) - .unwrap(); + storage.node_state().store_last_mc_block_id(&master_id); Ok((provider, storage)) } } diff --git a/core/src/blockchain_rpc/service.rs b/core/src/blockchain_rpc/service.rs index 8f642536e..a6fb27658 100644 --- a/core/src/blockchain_rpc/service.rs +++ b/core/src/blockchain_rpc/service.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use anyhow::Context; use bytes::Buf; use serde::{Deserialize, Serialize}; use tycho_network::{Response, Service, ServiceRequest}; @@ -196,7 +197,7 @@ impl Inner { .key_blocks_iterator(KeyBlocksDirection::ForwardFrom(req.block_id.seqno)) .take(limit + 1); - if let Some(id) = iterator.next().transpose()? { + if let Some(id) = iterator.next() { anyhow::ensure!( id.root_hash == req.block_id.root_hash, "first block root hash mismatch" @@ -208,7 +209,7 @@ impl Inner { } let mut ids = Vec::with_capacity(limit); - while let Some(id) = iterator.next().transpose()? { + while let Some(id) = iterator.next() { ids.push(id); if ids.len() >= limit { break; @@ -239,7 +240,7 @@ impl Inner { let get_block_full = async { let mut is_link = false; - let block = match block_handle_storage.load_handle(&req.block_id)? { + let block = match block_handle_storage.load_handle(&req.block_id) { Some(handle) if handle.meta().has_data() && handle.has_proof_or_link(&mut is_link) => { @@ -277,14 +278,15 @@ impl Inner { let block_storage = self.storage().block_storage(); let get_next_block_full = async { - let next_block_id = match block_handle_storage.load_handle(&req.prev_block_id)? { + let next_block_id = match block_handle_storage.load_handle(&req.prev_block_id) { Some(handle) if handle.meta().has_next1() => block_connection_storage - .load_connection(&req.prev_block_id, BlockConnection::Next1)?, + .load_connection(&req.prev_block_id, BlockConnection::Next1) + .context("connection not found")?, _ => return Ok(BlockFull::Empty), }; let mut is_link = false; - let block = match block_handle_storage.load_handle(&next_block_id)? { + let block = match block_handle_storage.load_handle(&next_block_id) { Some(handle) if handle.meta().has_data() && handle.has_proof_or_link(&mut is_link) => { @@ -356,8 +358,12 @@ impl Inner { let node_state = self.storage.node_state(); let get_archive_id = || { - let last_applied_mc_block = node_state.load_last_mc_block_id()?; - let shards_client_mc_block_id = node_state.load_shards_client_mc_block_id()?; + let last_applied_mc_block = node_state + .load_last_mc_block_id() + .context("last mc block not found")?; + let shards_client_mc_block_id = node_state + .load_shards_client_mc_block_id() + .context("shard client mc block not found")?; Ok::<_, anyhow::Error>((last_applied_mc_block, shards_client_mc_block_id)) }; diff --git a/core/tests/common/storage.rs b/core/tests/common/storage.rs index 7b2712723..bf7b4d5a4 100644 --- a/core/tests/common/storage.rs +++ b/core/tests/common/storage.rs @@ -26,7 +26,7 @@ pub(crate) async fn init_empty_storage() -> Result<(Arc, TempDir)> { root_path.join("file_storage"), db_options.cells_cache_size.as_u64(), )?; - assert!(storage.node_state().load_init_mc_block_id().is_err()); + assert!(storage.node_state().load_init_mc_block_id().is_none()); Ok((storage, tmp_dir)) } @@ -68,7 +68,7 @@ pub(crate) async fn init_storage() -> Result<(Arc, TempDir)> { let handle = storage .block_handle_storage() - .load_handle(&block_id)? + .load_handle(&block_id) .unwrap(); assert_eq!(handle.id(), block_stuff.id()); diff --git a/storage/src/models/block_handle.rs b/storage/src/models/block_handle.rs index 36950fc63..86c598d39 100644 --- a/storage/src/models/block_handle.rs +++ b/storage/src/models/block_handle.rs @@ -6,6 +6,22 @@ use tokio::sync::RwLock; use super::BlockMeta; use tycho_util::FastDashMap; +#[derive(Clone)] +#[repr(transparent)] +pub struct WeakBlockHandle { + inner: Weak, +} + +impl WeakBlockHandle { + pub fn strong_count(&self) -> usize { + self.inner.strong_count() + } + + pub fn upgrade(&self) -> Option { + self.inner.upgrade().map(|inner| BlockHandle { inner }) + } +} + #[derive(Clone)] #[repr(transparent)] pub struct BlockHandle { @@ -14,13 +30,13 @@ pub struct BlockHandle { impl BlockHandle { pub fn new( - id: BlockId, + id: &BlockId, meta: BlockMeta, - cache: Arc>>, + cache: Arc>, ) -> Self { Self { inner: Arc::new(Inner { - id, + id: *id, meta, block_data_lock: Default::default(), proof_data_block: Default::default(), @@ -70,20 +86,44 @@ impl BlockHandle { self.inner.meta.masterchain_ref_seqno() } } + + pub fn downgrade(&self) -> WeakBlockHandle { + WeakBlockHandle { + inner: Arc::downgrade(&self.inner), + } + } } -impl Drop for BlockHandle { - fn drop(&mut self) { - self.inner - .cache - .remove_if(&self.inner.id, |_, weak| weak.strong_count() == 0); +unsafe impl arc_swap::RefCnt for BlockHandle { + type Base = Inner; + + fn into_ptr(me: Self) -> *mut Self::Base { + arc_swap::RefCnt::into_ptr(me.inner) + } + + fn as_ptr(me: &Self) -> *mut Self::Base { + arc_swap::RefCnt::as_ptr(&me.inner) + } + + unsafe fn from_ptr(ptr: *const Self::Base) -> Self { + Self { + inner: arc_swap::RefCnt::from_ptr(ptr), + } } } -struct Inner { +#[doc(hidden)] +pub struct Inner { id: BlockId, meta: BlockMeta, block_data_lock: RwLock<()>, proof_data_block: RwLock<()>, - cache: Arc>>, + cache: Arc>, +} + +impl Drop for Inner { + fn drop(&mut self) { + self.cache + .remove_if(&self.id, |_, weak| weak.strong_count() == 0); + } } diff --git a/storage/src/models/block_meta.rs b/storage/src/models/block_meta.rs index 6410d9e85..322925ae8 100644 --- a/storage/src/models/block_meta.rs +++ b/storage/src/models/block_meta.rs @@ -1,6 +1,5 @@ use std::sync::atomic::{AtomicU64, Ordering}; -use anyhow::Result; use bytes::Buf; use everscale_types::models::BlockInfo; @@ -205,17 +204,17 @@ impl StoredValue for BlockMeta { buffer.write_raw_slice(&self.gen_utime.to_le_bytes()); } - fn deserialize(reader: &mut &[u8]) -> Result + fn deserialize(reader: &mut &[u8]) -> Self where Self: Sized, { let flags = reader.get_u64_le(); let gen_utime = reader.get_u32_le(); - Ok(Self { + Self { flags: AtomicU64::new(flags), gen_utime, - }) + } } } diff --git a/storage/src/models/mod.rs b/storage/src/models/mod.rs index b106684ee..4c1d4cfd1 100644 --- a/storage/src/models/mod.rs +++ b/storage/src/models/mod.rs @@ -1,4 +1,4 @@ -pub use block_handle::BlockHandle; +pub use block_handle::{BlockHandle, WeakBlockHandle}; pub use block_meta::{BlockMeta, BlockMetaData, BriefBlockMeta}; mod block_handle; diff --git a/storage/src/store/block/mod.rs b/storage/src/store/block/mod.rs index c0ab87b42..92be364bf 100644 --- a/storage/src/store/block/mod.rs +++ b/storage/src/store/block/mod.rs @@ -94,7 +94,7 @@ impl BlockStorage { let block_id = block.id(); let (handle, status) = self .block_handle_storage - .create_or_load_handle(block_id, meta_data)?; + .create_or_load_handle(block_id, meta_data); let archive_id = ArchiveEntryId::Block(block_id); let mut updated = false; @@ -105,7 +105,7 @@ impl BlockStorage { if !handle.meta().has_data() { self.add_data(&archive_id, data)?; if handle.meta().set_has_data() { - self.block_handle_storage.store_handle(&handle)?; + self.block_handle_storage.store_handle(&handle); updated = true; } } @@ -173,7 +173,7 @@ impl BlockStorage { BlockProofHandle::Existing(handle) => (handle, HandleCreationStatus::Fetched), BlockProofHandle::New(meta_data) => self .block_handle_storage - .create_or_load_handle(block_id, meta_data)?, + .create_or_load_handle(block_id, meta_data), }; let mut updated = false; @@ -186,7 +186,7 @@ impl BlockStorage { if !handle.meta().has_proof_link() { self.add_data(&archive_id, data)?; if handle.meta().set_has_proof_link() { - self.block_handle_storage.store_handle(&handle)?; + self.block_handle_storage.store_handle(&handle); updated = true; } } @@ -200,7 +200,7 @@ impl BlockStorage { if !handle.meta().has_proof() { self.add_data(&archive_id, data)?; if handle.meta().set_has_proof() { - self.block_handle_storage.store_handle(&handle)?; + self.block_handle_storage.store_handle(&handle); updated = true; } } @@ -497,10 +497,10 @@ impl BlockStorage { let target_block = match gc_type { BlocksGcKind::BeforePreviousKeyBlock => self .block_handle_storage - .find_prev_key_block(key_block_id.seqno)?, + .find_prev_key_block(key_block_id.seqno), BlocksGcKind::BeforePreviousPersistentState => self .block_handle_storage - .find_prev_persistent_key_block(key_block_id.seqno)?, + .find_prev_persistent_key_block(key_block_id.seqno), }; // Load target block data @@ -609,7 +609,7 @@ impl BlockStorage { let (tx, rx) = tokio::sync::oneshot::channel(); let lock = self.block_subscriptions_lock.lock().await; - match block_handle_storage.load_handle(&block_id)? { + match block_handle_storage.load_handle(&block_id) { Some(handle) if handle.meta().has_data() => { drop(lock); @@ -631,17 +631,17 @@ impl BlockStorage { let (tx, rx) = tokio::sync::oneshot::channel(); let lock = self.block_subscriptions_lock.lock().await; - let next_block_id = match block_handle_storage.load_handle(&prev_block_id)? { - Some(handle) if handle.meta().has_next1() => { - block_connection_storage.load_connection(&prev_block_id, BlockConnection::Next1)? - } + let next_block_id = match block_handle_storage.load_handle(&prev_block_id) { + Some(handle) if handle.meta().has_next1() => block_connection_storage + .load_connection(&prev_block_id, BlockConnection::Next1) + .context("connection no found")?, _ => { self.add_block_subscription(prev_block_id, tx); return Ok(rx); } }; - match block_handle_storage.load_handle(&next_block_id)? { + match block_handle_storage.load_handle(&next_block_id) { Some(handle) if handle.meta().has_data() => { drop(lock); @@ -771,12 +771,12 @@ pub enum BlocksGcKind { #[derive(Clone)] pub enum BlockProofHandle { - Existing(Arc), + Existing(BlockHandle), New(BlockMetaData), } -impl From> for BlockProofHandle { - fn from(handle: Arc) -> Self { +impl From for BlockProofHandle { + fn from(handle: BlockHandle) -> Self { Self::Existing(handle) } } @@ -788,7 +788,7 @@ impl From for BlockProofHandle { } pub struct StoreBlockResult { - pub handle: Arc, + pub handle: BlockHandle, pub updated: bool, pub new: bool, } @@ -823,8 +823,7 @@ fn remove_blocks( }; // Read only prefix with shard ident and seqno - let BlockIdShort { shard, seqno } = - ::deserialize(&mut std::convert::identity(key))?; + let BlockIdShort { shard, seqno } = BlockIdShort::from_slice(key); // Don't gc latest blocks if top_blocks.contains_shard_seqno(&shard, seqno) { diff --git a/storage/src/store/block_connection/mod.rs b/storage/src/store/block_connection/mod.rs index 17d0a139c..d5e199dd4 100644 --- a/storage/src/store/block_connection/mod.rs +++ b/storage/src/store/block_connection/mod.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use anyhow::Result; use everscale_types::models::*; use crate::db::*; @@ -22,72 +21,62 @@ impl BlockConnectionStorage { handle: &BlockHandle, direction: BlockConnection, connected_block_id: &BlockId, - ) -> Result<()> { + ) { // Use strange match because all columns have different types let store = match direction { - BlockConnection::Prev1 => { - if handle.meta().has_prev1() { - return Ok(()); - } - store_block_connection_impl(&self.db.prev1, handle, connected_block_id)?; + BlockConnection::Prev1 if !handle.meta().has_prev1() => { + store_block_connection_impl(&self.db.prev1, handle, connected_block_id); handle.meta().set_has_prev1() } - BlockConnection::Prev2 => { - if handle.meta().has_prev2() { - return Ok(()); - } - store_block_connection_impl(&self.db.prev2, handle, connected_block_id)?; + BlockConnection::Prev2 if !handle.meta().has_prev2() => { + store_block_connection_impl(&self.db.prev2, handle, connected_block_id); handle.meta().set_has_prev2() } - BlockConnection::Next1 => { - if handle.meta().has_next1() { - return Ok(()); - } - store_block_connection_impl(&self.db.next1, handle, connected_block_id)?; + BlockConnection::Next1 if !handle.meta().has_next1() => { + store_block_connection_impl(&self.db.next1, handle, connected_block_id); handle.meta().set_has_next1() } - BlockConnection::Next2 => { - if handle.meta().has_next2() { - return Ok(()); - } - store_block_connection_impl(&self.db.next2, handle, connected_block_id)?; + BlockConnection::Next2 if !handle.meta().has_next2() => { + store_block_connection_impl(&self.db.next2, handle, connected_block_id); handle.meta().set_has_next2() } + _ => return, }; - if store { - let id = handle.id(); + if !store { + return; + } - if handle.is_key_block() { - let mut write_batch = weedb::rocksdb::WriteBatch::default(); + let id = handle.id(); - write_batch.put_cf( - &self.db.block_handles.cf(), - id.root_hash.as_slice(), - handle.meta().to_vec(), - ); - write_batch.put_cf( - &self.db.key_blocks.cf(), - id.seqno.to_be_bytes(), - id.to_vec(), - ); + if handle.is_key_block() { + let mut write_batch = weedb::rocksdb::WriteBatch::default(); - self.db.raw().write(write_batch)?; - } else { - self.db - .block_handles - .insert(id.root_hash.as_slice(), handle.meta().to_vec())?; - } - } + write_batch.put_cf( + &self.db.block_handles.cf(), + id.root_hash.as_slice(), + handle.meta().to_vec(), + ); + write_batch.put_cf( + &self.db.key_blocks.cf(), + id.seqno.to_be_bytes(), + id.to_vec(), + ); - Ok(()) + self.db.raw().write(write_batch).unwrap(); + } else { + self.db + .block_handles + .insert(id.root_hash.as_slice(), handle.meta().to_vec()) + .unwrap(); + } } pub fn load_connection( &self, block_id: &BlockId, direction: BlockConnection, - ) -> Result { + ) -> Option { match direction { BlockConnection::Prev1 => load_block_connection_impl(&self.db.prev1, block_id), BlockConnection::Prev2 => load_block_connection_impl(&self.db.prev2, block_id), @@ -106,11 +95,7 @@ pub enum BlockConnection { } #[inline] -fn store_block_connection_impl( - db: &Table, - handle: &BlockHandle, - block_id: &BlockId, -) -> Result<(), weedb::rocksdb::Error> +fn store_block_connection_impl(db: &Table, handle: &BlockHandle, block_id: &BlockId) where T: ColumnFamily, { @@ -118,24 +103,13 @@ where handle.id().root_hash.as_slice(), write_block_id_le(block_id), ) + .unwrap() } -#[inline] -fn load_block_connection_impl(db: &Table, block_id: &BlockId) -> Result +fn load_block_connection_impl(db: &Table, block_id: &BlockId) -> Option where T: ColumnFamily, { - match db.get(block_id.root_hash.as_slice())? { - Some(value) => read_block_id_le(value.as_ref()) - .ok_or_else(|| BlockConnectionStorageError::InvalidBlockId.into()), - None => Err(BlockConnectionStorageError::NotFound.into()), - } -} - -#[derive(Debug, thiserror::Error)] -enum BlockConnectionStorageError { - #[error("Invalid connection block id")] - InvalidBlockId, - #[error("Block connection not found")] - NotFound, + let data = db.get(block_id.root_hash.as_slice()).unwrap()?; + Some(read_block_id_le(&data)) } diff --git a/storage/src/store/block_handle/mod.rs b/storage/src/store/block_handle/mod.rs index 94bd0eaca..2a34b8135 100644 --- a/storage/src/store/block_handle/mod.rs +++ b/storage/src/store/block_handle/mod.rs @@ -1,6 +1,5 @@ -use std::sync::{Arc, Weak}; +use std::sync::Arc; -use anyhow::Result; use everscale_types::models::BlockId; use tycho_block_util::block::TopBlocks; use tycho_block_util::state::is_persistent_state; @@ -12,7 +11,7 @@ use crate::util::*; pub struct BlockHandleStorage { db: Arc, - cache: Arc>>, + cache: Arc>, } impl BlockHandleStorage { @@ -23,103 +22,129 @@ impl BlockHandleStorage { } } - pub fn store_block_applied(&self, handle: &Arc) -> Result { - if handle.meta().set_is_applied() { - self.store_handle(handle)?; - Ok(true) - } else { - Ok(false) + pub fn store_block_applied(&self, handle: &BlockHandle) -> bool { + let updated = handle.meta().set_is_applied(); + if updated { + self.store_handle(handle); } + updated } pub fn create_or_load_handle( &self, block_id: &BlockId, meta_data: BlockMetaData, - ) -> Result<(Arc, HandleCreationStatus)> { - if let Some(handle) = self.load_handle(block_id)? { - return Ok((handle, HandleCreationStatus::Fetched)); - } + ) -> (BlockHandle, HandleCreationStatus) { + use dashmap::mapref::entry::Entry; - if let Some(handle) = self.create_handle(*block_id, BlockMeta::with_data(meta_data))? { - return Ok((handle, HandleCreationStatus::Created)); - } + let block_handles = &self.db.block_handles; - if let Some(handle) = self.load_handle(block_id)? { - return Ok((handle, HandleCreationStatus::Fetched)); + // Fast path - lookup in cache + if let Some(weak) = self.cache.get(block_id) { + if let Some(handle) = weak.upgrade() { + return (handle, HandleCreationStatus::Fetched); + } } - Err(BlockHandleStorageError::FailedToCreateBlockHandle.into()) - } + match block_handles.get(block_id.root_hash.as_slice()).unwrap() { + // Try to load block handle from an existing data + Some(data) => { + let meta = BlockMeta::from_slice(data.as_ref()); - pub fn load_handle(&self, block_id: &BlockId) -> Result>> { - Ok(loop { - if let Some(weak) = self.cache.get(block_id) { - if let Some(handle) = weak.upgrade() { - break Some(handle); - } + // Fill the cache with a new handle + let handle = self.fill_cache(block_id, meta); + + // Done + (handle, HandleCreationStatus::Fetched) + } + None => { + // Create a new handle + let handle = BlockHandle::new( + block_id, + BlockMeta::with_data(meta_data), + self.cache.clone(), + ); + + // Fill the cache with the new handle + match self.cache.entry(*block_id) { + Entry::Vacant(entry) => { + entry.insert(handle.downgrade()); + } + Entry::Occupied(mut entry) => match entry.get().upgrade() { + // Another thread has created the handle + Some(handle) => return (handle, HandleCreationStatus::Fetched), + None => { + entry.insert(handle.downgrade()); + } + }, + }; + + // Store the handle in the storage + self.store_handle(&handle); + + // Done + (handle, HandleCreationStatus::Created) } + } + } - if let Some(meta) = self.db.block_handles.get(block_id.root_hash.as_slice())? { - let meta = BlockMeta::from_slice(meta.as_ref())?; - if let Some(handle) = self.create_handle(*block_id, meta)? { - break Some(handle); - } - } else { - break None; + pub fn load_handle(&self, block_id: &BlockId) -> Option { + let block_handles = &self.db.block_handles; + + // Fast path - lookup in cache + if let Some(weak) = self.cache.get(block_id) { + if let Some(handle) = weak.upgrade() { + return Some(handle); } - }) + } + + // Load meta from storage + let meta = match block_handles.get(block_id.root_hash.as_slice()).unwrap() { + Some(data) => BlockMeta::from_slice(data.as_ref()), + None => return None, + }; + + // Fill the cache with a new handle + Some(self.fill_cache(block_id, meta)) } - pub fn store_handle(&self, handle: &BlockHandle) -> Result<()> { + pub fn store_handle(&self, handle: &BlockHandle) { let id = handle.id(); self.db .block_handles - .insert(id.root_hash.as_slice(), handle.meta().to_vec())?; + .insert(id.root_hash.as_slice(), handle.meta().to_vec()) + .unwrap(); if handle.is_key_block() { self.db .key_blocks - .insert(id.seqno.to_be_bytes(), id.to_vec())?; + .insert(id.seqno.to_be_bytes(), id.to_vec()) + .unwrap(); } - - Ok(()) } - pub fn load_key_block_handle(&self, seqno: u32) -> Result> { - let key_block_id = self - .db - .key_blocks - .get(seqno.to_be_bytes())? - .map(|value| BlockId::from_slice(value.as_ref())) - .transpose()? - .ok_or(BlockHandleStorageError::KeyBlockNotFound)?; - - self.load_handle(&key_block_id)?.ok_or_else(|| { - BlockHandleStorageError::KeyBlockHandleNotFound(key_block_id.seqno).into() - }) + pub fn load_key_block_handle(&self, seqno: u32) -> Option { + let key_blocks = &self.db.key_blocks; + let key_block_id = match key_blocks.get(seqno.to_be_bytes()).unwrap() { + Some(data) => BlockId::from_slice(data.as_ref()), + None => return None, + }; + self.load_handle(&key_block_id) } - pub fn find_last_key_block(&self) -> Result> { + pub fn find_last_key_block(&self) -> Option { let mut iter = self.db.key_blocks.raw_iterator(); iter.seek_to_last(); - // Load key block from current iterator value - let key_block_id = iter - .value() - .map(BlockId::from_slice) - .transpose()? - .ok_or(BlockHandleStorageError::KeyBlockNotFound)?; - - self.load_handle(&key_block_id)?.ok_or_else(|| { - BlockHandleStorageError::KeyBlockHandleNotFound(key_block_id.seqno).into() - }) + // Load key block from the current iterator value + let key_block_id = BlockId::from_slice(iter.value()?); + self.load_handle(&key_block_id) } - pub fn find_prev_key_block(&self, seqno: u32) -> Result>> { + pub fn find_prev_key_block(&self, seqno: u32) -> Option { if seqno == 0 { - return Ok(None); + return None; } // Create iterator and move it to the previous key block before the specified @@ -127,20 +152,13 @@ impl BlockHandleStorage { iter.seek_for_prev((seqno - 1u32).to_be_bytes()); // Load key block from current iterator value - iter.value() - .map(BlockId::from_slice) - .transpose()? - .map(|key_block_id| { - self.load_handle(&key_block_id)?.ok_or_else(|| { - BlockHandleStorageError::KeyBlockHandleNotFound(key_block_id.seqno).into() - }) - }) - .transpose() + let key_block_id = BlockId::from_slice(iter.value()?); + self.load_handle(&key_block_id) } - pub fn find_prev_persistent_key_block(&self, seqno: u32) -> Result>> { + pub fn find_prev_persistent_key_block(&self, seqno: u32) -> Option { if seqno == 0 { - return Ok(None); + return None; } // Create iterator and move it to the previous key block before the specified @@ -148,51 +166,43 @@ impl BlockHandleStorage { iter.seek_for_prev((seqno - 1u32).to_be_bytes()); // Loads key block from current iterator value and moves it backward - let mut get_key_block = move || -> Result>> { + let mut get_key_block = move || -> Option { // Load key block id - let key_block_id = match iter.value().map(BlockId::from_slice).transpose()? { - Some(prev_key_block) => prev_key_block, - None => return Ok(None), - }; + let key_block_id = BlockId::from_slice(iter.value()?); // Load block handle for this id - let handle = self.load_handle(&key_block_id)?.ok_or( - BlockHandleStorageError::KeyBlockHandleNotFound(key_block_id.seqno), - )?; + let handle = self.load_handle(&key_block_id)?; // Move iterator backward iter.prev(); // Done - Ok(Some(handle)) + Some(handle) }; // Load previous key block - let mut key_block = match get_key_block()? { - Some(id) => id, - None => return Ok(None), - }; + let mut key_block = get_key_block()?; // Load previous key blocks and check if the `key_block` is for persistent state - while let Some(prev_key_block) = get_key_block()? { + while let Some(prev_key_block) = get_key_block() { if is_persistent_state( key_block.meta().gen_utime(), prev_key_block.meta().gen_utime(), ) { // Found - return Ok(Some(key_block)); + return Some(key_block); } key_block = prev_key_block; } // Not found - Ok(None) + None } pub fn key_blocks_iterator( &self, direction: KeyBlocksDirection, - ) -> impl Iterator> + '_ { + ) -> impl Iterator + '_ { let mut raw_iterator = self.db.key_blocks.raw_iterator(); let reverse = match direction { KeyBlocksDirection::ForwardFrom(seqno) => { @@ -240,25 +250,24 @@ impl BlockHandleStorage { total_removed } - fn create_handle( - &self, - block_id: BlockId, - meta: BlockMeta, - ) -> Result>> { + fn fill_cache(&self, block_id: &BlockId, meta: BlockMeta) -> BlockHandle { use dashmap::mapref::entry::Entry; - let handle = match self.cache.entry(block_id) { + match self.cache.entry(*block_id) { Entry::Vacant(entry) => { - let handle = Arc::new(BlockHandle::new(block_id, meta, self.cache.clone())); - entry.insert(Arc::downgrade(&handle)); + let handle = BlockHandle::new(block_id, meta, self.cache.clone()); + entry.insert(handle.downgrade()); handle } - Entry::Occupied(_) => return Ok(None), - }; - - self.store_handle(&handle)?; - - Ok(Some(handle)) + Entry::Occupied(mut entry) => match entry.get().upgrade() { + Some(handle) => handle, + None => { + let handle = BlockHandle::new(block_id, meta, self.cache.clone()); + entry.insert(handle.downgrade()); + handle + } + }, + } } } @@ -280,7 +289,7 @@ struct KeyBlocksIterator<'a> { } impl Iterator for KeyBlocksIterator<'_> { - type Item = Result; + type Item = BlockId; fn next(&mut self) -> Option { let value = self.raw_iterator.value().map(BlockId::from_slice)?; @@ -289,17 +298,6 @@ impl Iterator for KeyBlocksIterator<'_> { } else { self.raw_iterator.next(); } - Some(value) } } - -#[derive(thiserror::Error, Debug)] -enum BlockHandleStorageError { - #[error("Failed to create block handle")] - FailedToCreateBlockHandle, - #[error("Key block not found")] - KeyBlockNotFound, - #[error("Key block handle not found: {}", .0)] - KeyBlockHandleNotFound(u32), -} diff --git a/storage/src/store/node_state/mod.rs b/storage/src/store/node_state/mod.rs index e1a78cdcc..5abab6622 100644 --- a/storage/src/store/node_state/mod.rs +++ b/storage/src/store/node_state/mod.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use anyhow::Result; use everscale_types::models::*; use parking_lot::Mutex; @@ -24,107 +23,94 @@ impl NodeStateStorage { } } - pub fn store_historical_sync_start(&self, id: &BlockId) -> Result<()> { + pub fn store_historical_sync_start(&self, id: &BlockId) { let node_states = &self.db.node_states; - node_states.insert(HISTORICAL_SYNC_LOW, id.to_vec())?; - Ok(()) + node_states + .insert(HISTORICAL_SYNC_LOW, id.to_vec()) + .unwrap() } - pub fn load_historical_sync_start(&self) -> Result> { - Ok(match self.db.node_states.get(HISTORICAL_SYNC_LOW)? { - Some(data) => Some(BlockId::from_slice(data.as_ref())?), + pub fn load_historical_sync_start(&self) -> Option { + match self.db.node_states.get(HISTORICAL_SYNC_LOW).unwrap() { + Some(data) => Some(BlockId::from_slice(data.as_ref())), None => None, - }) + } } - pub fn store_historical_sync_end(&self, id: &BlockId) -> Result<()> { + pub fn store_historical_sync_end(&self, id: &BlockId) { let node_states = &self.db.node_states; - node_states.insert(HISTORICAL_SYNC_HIGH, id.to_vec())?; - Ok(()) + node_states + .insert(HISTORICAL_SYNC_HIGH, id.to_vec()) + .unwrap(); } - pub fn load_historical_sync_end(&self) -> Result { + pub fn load_historical_sync_end(&self) -> Option { let node_states = &self.db.node_states; - let data = node_states - .get(HISTORICAL_SYNC_HIGH)? - .ok_or(NodeStateStorageError::HighBlockNotFound)?; - BlockId::from_slice(data.as_ref()) + let data = node_states.get(HISTORICAL_SYNC_HIGH).unwrap()?; + Some(BlockId::from_slice(data.as_ref())) } - #[allow(unused)] - pub fn store_last_uploaded_archive(&self, archive_id: u32) -> Result<()> { + pub fn store_last_uploaded_archive(&self, archive_id: u32) { let node_states = &self.db.node_states; - node_states.insert(LAST_UPLOADED_ARCHIVE, archive_id.to_le_bytes())?; - Ok(()) + node_states + .insert(LAST_UPLOADED_ARCHIVE, archive_id.to_le_bytes()) + .unwrap(); } - #[allow(unused)] - pub fn load_last_uploaded_archive(&self) -> Result> { - Ok(match self.db.node_states.get(LAST_UPLOADED_ARCHIVE)? { + pub fn load_last_uploaded_archive(&self) -> Option { + match self.db.node_states.get(LAST_UPLOADED_ARCHIVE).unwrap() { Some(data) if data.len() >= 4 => { Some(u32::from_le_bytes(data[..4].try_into().unwrap())) } _ => None, - }) + } } - pub fn store_last_mc_block_id(&self, id: &BlockId) -> Result<()> { + pub fn store_last_mc_block_id(&self, id: &BlockId) { self.store_block_id(&self.last_mc_block_id, id) } - pub fn load_last_mc_block_id(&self) -> Result { + pub fn load_last_mc_block_id(&self) -> Option { self.load_block_id(&self.last_mc_block_id) } - pub fn store_init_mc_block_id(&self, id: &BlockId) -> Result<()> { + pub fn store_init_mc_block_id(&self, id: &BlockId) { self.store_block_id(&self.init_mc_block_id, id) } - pub fn load_init_mc_block_id(&self) -> Result { + pub fn load_init_mc_block_id(&self) -> Option { self.load_block_id(&self.init_mc_block_id) } - pub fn store_shards_client_mc_block_id(&self, id: &BlockId) -> Result<()> { + pub fn store_shards_client_mc_block_id(&self, id: &BlockId) { self.store_block_id(&self.shards_client_mc_block_id, id) } - pub fn load_shards_client_mc_block_id(&self) -> Result { + pub fn load_shards_client_mc_block_id(&self) -> Option { self.load_block_id(&self.shards_client_mc_block_id) } #[inline(always)] - fn store_block_id(&self, (cache, key): &BlockIdCache, block_id: &BlockId) -> Result<()> { + fn store_block_id(&self, (cache, key): &BlockIdCache, block_id: &BlockId) { let node_states = &self.db.node_states; - node_states.insert(key, write_block_id_le(block_id))?; + node_states + .insert(key, write_block_id_le(block_id)) + .unwrap(); *cache.lock() = Some(*block_id); - Ok(()) } #[inline(always)] - fn load_block_id(&self, (cache, key): &BlockIdCache) -> Result { + fn load_block_id(&self, (cache, key): &BlockIdCache) -> Option { if let Some(cached) = &*cache.lock() { - return Ok(*cached); + return Some(*cached); } - let value = match self.db.node_states.get(key)? { - Some(data) => read_block_id_le(&data).ok_or(NodeStateStorageError::InvalidBlockId)?, - None => return Err(NodeStateStorageError::StateNotFound.into()), - }; + let value = read_block_id_le(&self.db.node_states.get(key).unwrap()?); *cache.lock() = Some(value); - Ok(value) + Some(value) } } -#[derive(thiserror::Error, Debug)] -pub enum NodeStateStorageError { - #[error("High block not found")] - HighBlockNotFound, - #[error("State not found")] - StateNotFound, - #[error("Invalid block id")] - InvalidBlockId, -} - type BlockIdCache = (Mutex>, &'static [u8]); const HISTORICAL_SYNC_LOW: &[u8] = b"background_sync_low"; diff --git a/storage/src/store/persistent_state/mod.rs b/storage/src/store/persistent_state/mod.rs index da6858b2c..81f8aa7da 100644 --- a/storage/src/store/persistent_state/mod.rs +++ b/storage/src/store/persistent_state/mod.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; use bytes::{Bytes, BytesMut}; use everscale_types::cell::HashBytes; use everscale_types::models::BlockId; @@ -166,12 +166,15 @@ impl PersistentStateStorage { // Keep 2 days of states + 1 state before let block = { let now = tycho_util::time::now_sec(); - let mut key_block = self.block_handle_storage.find_last_key_block()?; + let mut key_block = self + .block_handle_storage + .find_last_key_block() + .context("no key blocks found")?; loop { match self .block_handle_storage - .find_prev_persistent_key_block(key_block.id().seqno)? + .find_prev_persistent_key_block(key_block.id().seqno) { Some(prev_key_block) => { if prev_key_block.meta().gen_utime() + 2 * KEY_BLOCK_UTIME_STEP < now { diff --git a/storage/src/store/runtime/persistent_state_keeper.rs b/storage/src/store/runtime/persistent_state_keeper.rs index 453c8b0ac..da820106a 100644 --- a/storage/src/store/runtime/persistent_state_keeper.rs +++ b/storage/src/store/runtime/persistent_state_keeper.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; use anyhow::Result; -use arc_swap::ArcSwapOption; +use arc_swap::ArcSwapAny; use tokio::sync::Notify; use tycho_block_util::state::*; @@ -14,7 +14,7 @@ pub struct PersistentStateKeeper { block_handle_storage: Arc, initialized: AtomicBool, persistent_state_changed: Notify, - current_persistent_state: ArcSwapOption, + current_persistent_state: ArcSwapAny>, last_utime: AtomicU32, } @@ -29,13 +29,11 @@ impl PersistentStateKeeper { } } - pub fn update(&self, block_handle: &Arc) -> Result<()> { - println!("UPDATE"); - + pub fn update(&self, block_handle: &BlockHandle) -> Result<()> { if !self.initialized.load(Ordering::Acquire) { let prev_persistent_key_block = self .block_handle_storage - .find_prev_persistent_key_block(block_handle.id().seqno)?; + .find_prev_persistent_key_block(block_handle.id().seqno); if let Some(handle) = &prev_persistent_key_block { self.last_utime @@ -74,7 +72,7 @@ impl PersistentStateKeeper { self.last_utime.load(Ordering::Acquire) } - pub fn current(&self) -> Option> { + pub fn current(&self) -> Option { self.current_persistent_state.load_full() } diff --git a/storage/src/store/shard_state/mod.rs b/storage/src/store/shard_state/mod.rs index b7c03a7cd..f1ce93d23 100644 --- a/storage/src/store/shard_state/mod.rs +++ b/storage/src/store/shard_state/mod.rs @@ -133,7 +133,7 @@ impl ShardStateStorage { self.db.raw().write(batch)?; Ok(if handle.meta().set_has_state() { - self.block_handle_storage.store_handle(handle)?; + self.block_handle_storage.store_handle(handle); true } else { false @@ -204,7 +204,7 @@ impl ShardStateStorage { }, }; - let block_id = BlockIdShort::deserialize(&mut std::convert::identity(key))?; + let block_id = BlockIdShort::from_slice(key); let root_hash = HashBytes::wrap(value.try_into().expect("invalid value")); // Skip blocks from zero state and top blocks @@ -270,7 +270,7 @@ impl ShardStateStorage { }; // Find block handle - let handle = match self.block_handle_storage.load_handle(&mc_block_id)? { + let handle = match self.block_handle_storage.load_handle(&mc_block_id) { Some(handle) if handle.meta().has_data() => handle, // Skip blocks without handle or data _ => return Ok(None), @@ -292,11 +292,7 @@ impl ShardStateStorage { }; // Find block handle - let min_ref_block_handle = match self - .block_handle_storage - .load_handle(&min_ref_block_id) - .context("Failed to find min ref mc block handle")? - { + let min_ref_block_handle = match self.block_handle_storage.load_handle(&min_ref_block_id) { Some(handle) if handle.meta().has_data() => handle, // Skip blocks without handle or data _ => return Ok(None), diff --git a/storage/src/store/shard_state/replace_transaction.rs b/storage/src/store/shard_state/replace_transaction.rs index 8db83a14b..3e6aa290f 100644 --- a/storage/src/store/shard_state/replace_transaction.rs +++ b/storage/src/store/shard_state/replace_transaction.rs @@ -119,7 +119,7 @@ impl<'a> ShardStateReplaceTransaction<'a> { mut self, block_id: BlockId, progress_bar: &mut ProgressBar, - ) -> Result> { + ) -> Result { // 2^7 bits + 1 bytes const MAX_DATA_SIZE: usize = 128; const CELLS_PER_BATCH: u64 = 1_000_000; @@ -231,11 +231,11 @@ impl<'a> ShardStateReplaceTransaction<'a> { let cell_id = HashBytes::from_slice(&root[..32]); let cell = self.cell_storage.load_cell(cell_id)?; - Ok(Arc::new(ShardStateStuff::new( + Ok(ShardStateStuff::new( block_id, Cell::from(cell as Arc<_>), self.min_ref_mc_state, - )?)) + )?) } None => Err(ReplaceTransactionError::NotFound.into()), } diff --git a/storage/src/util/stored_value.rs b/storage/src/util/stored_value.rs index c37374412..a4da922f2 100644 --- a/storage/src/util/stored_value.rs +++ b/storage/src/util/stored_value.rs @@ -1,7 +1,6 @@ use bytes::Buf; use smallvec::SmallVec; -use anyhow::Result; use everscale_types::cell::HashBytes; use everscale_types::models::{BlockId, BlockIdShort, ShardIdent}; @@ -22,7 +21,7 @@ pub trait StoredValue { /// moved to the end of the deserialized data. /// /// NOTE: `reader` should not be used after this call in case of an error - fn deserialize(reader: &mut &[u8]) -> Result + fn deserialize(reader: &mut &[u8]) -> Self where Self: Sized; @@ -30,7 +29,7 @@ pub trait StoredValue { /// /// [`StoredValue::deserialize`] #[inline(always)] - fn from_slice(mut data: &[u8]) -> Result + fn from_slice(mut data: &[u8]) -> Self where Self: Sized, { @@ -95,13 +94,13 @@ impl StoredValue for BlockId { buffer.write_raw_slice(self.file_hash.as_slice()); } - fn deserialize(reader: &mut &[u8]) -> Result + fn deserialize(reader: &mut &[u8]) -> Self where Self: Sized, { debug_assert!(reader.remaining() >= Self::SIZE_HINT); - let shard = ShardIdent::deserialize(reader)?; + let shard = ShardIdent::deserialize(reader); let seqno = reader.get_u32(); let mut root_hash = HashBytes::default(); @@ -109,12 +108,12 @@ impl StoredValue for BlockId { let mut file_hash = HashBytes::default(); file_hash.0.copy_from_slice(&reader[32..]); - Ok(Self { + Self { shard, seqno, root_hash, file_hash, - }) + } } } @@ -131,7 +130,7 @@ impl StoredValue for ShardIdent { buffer.write_raw_slice(&self.prefix().to_be_bytes()); } - fn deserialize(reader: &mut &[u8]) -> Result + fn deserialize(reader: &mut &[u8]) -> Self where Self: Sized, { @@ -139,7 +138,7 @@ impl StoredValue for ShardIdent { let workchain = reader.get_u32() as i32; let prefix = reader.get_u64(); - Ok(unsafe { Self::new_unchecked(workchain, prefix) }) + unsafe { Self::new_unchecked(workchain, prefix) } } } @@ -156,15 +155,15 @@ impl StoredValue for BlockIdShort { buffer.write_raw_slice(&self.seqno.to_be_bytes()); } - fn deserialize(reader: &mut &[u8]) -> Result + fn deserialize(reader: &mut &[u8]) -> Self where Self: Sized, { debug_assert!(reader.remaining() >= BlockIdShort::SIZE_HINT); - let shard = ShardIdent::deserialize(reader)?; + let shard = ShardIdent::deserialize(reader); let seqno = reader.get_u32(); - Ok(Self { shard, seqno }) + Self { shard, seqno } } } @@ -180,10 +179,8 @@ pub fn write_block_id_le(block_id: &BlockId) -> [u8; 80] { } /// Reads `BlockId` in little-endian format -pub fn read_block_id_le(data: &[u8]) -> Option { - if data.len() < 80 { - return None; - } +pub fn read_block_id_le(data: &[u8]) -> BlockId { + assert!(data.len() >= 80); let mut workchain = [0; 4]; workchain.copy_from_slice(&data[0..4]); @@ -205,12 +202,12 @@ pub fn read_block_id_le(data: &[u8]) -> Option { let shard = unsafe { ShardIdent::new_unchecked(workchain, shard) }; - Some(BlockId { + BlockId { shard, seqno, root_hash: root_hash.into(), file_hash: file_hash.into(), - }) + } } #[cfg(test)] @@ -241,6 +238,6 @@ mod tests { let serialized = write_block_id_le(&block_id); assert_eq!(serialized, SERIALIZED); - assert_eq!(read_block_id_le(&serialized).unwrap(), block_id); + assert_eq!(read_block_id_le(&serialized), block_id); } } diff --git a/storage/tests/mod.rs b/storage/tests/mod.rs index 06ccfc2c8..b945e216a 100644 --- a/storage/tests/mod.rs +++ b/storage/tests/mod.rs @@ -73,7 +73,7 @@ async fn persistent_storage_everscale() -> Result<()> { root_path.join("file_storage"), db_options.cells_cache_size.as_u64(), )?; - assert!(storage.node_state().load_init_mc_block_id().is_err()); + assert!(storage.node_state().load_init_mc_block_id().is_none()); // Read zerostate let zero_state_raw = ShardStateCombined::from_file("tests/everscale_zerostate.boc")?; @@ -85,7 +85,7 @@ async fn persistent_storage_everscale() -> Result<()> { let (handle, _) = storage.block_handle_storage().create_or_load_handle( &block_id, BlockMetaData::zero_state(zero_state_raw.gen_utime().unwrap()), - )?; + ); let zerostate = ShardStateStuff::new( block_id,