Skip to content

Commit

Permalink
refactor(core): simplify storage impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Apr 29, 2024
1 parent 72e8cb1 commit 5e5678f
Show file tree
Hide file tree
Showing 18 changed files with 327 additions and 342 deletions.
2 changes: 1 addition & 1 deletion collator/src/state_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl {
//TODO: make real implementation
//STUB: create dummy blcok handle
let handle = BlockHandle::new(
block.block_id,
&block.block_id,
Default::default(),
Arc::new(Default::default()),
);
Expand Down
11 changes: 4 additions & 7 deletions core/src/block_strider/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use tycho_storage::Storage;

pub trait BlockStriderState: Send + Sync + 'static {
fn load_last_traversed_master_block_id(&self) -> BlockId;

fn is_traversed(&self, block_id: &BlockId) -> bool;

fn commit_traversed(&self, block_id: &BlockId);
}

Expand All @@ -18,17 +20,12 @@ impl BlockStriderState for Arc<Storage> {
}

fn is_traversed(&self, block_id: &BlockId) -> bool {
self.block_handle_storage()
.load_handle(block_id)
.expect("db is dead")
.is_some()
self.block_handle_storage().load_handle(block_id).is_some()
}

fn commit_traversed(&self, block_id: &BlockId) {
if block_id.is_masterchain() {
self.node_state()
.store_last_mc_block_id(block_id)
.expect("db is dead");
self.node_state().store_last_mc_block_id(block_id);
}
// other blocks are stored with state applier: todo rework this?
}
Expand Down
22 changes: 7 additions & 15 deletions core/src/block_strider/state_applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ where
tracing::info!(id = ?cx.block.id(), "applying block");

let state_storage = self.inner.storage.shard_state_storage();
let handle_storage = self.inner.storage.block_handle_storage();

// Load handle
let handle = self
Expand Down Expand Up @@ -130,9 +131,7 @@ where
metrics::histogram!("tycho_subscriber_handle_block_seconds").record(started_at.elapsed());

// Mark block as applied
handle.meta().set_is_applied();
let handle_storage = self.inner.storage.block_handle_storage();
handle_storage.store_handle(&handle)?;
handle_storage.store_block_applied(&handle);

// Done
Ok(())
Expand All @@ -143,7 +142,7 @@ where
mc_block_id: &BlockId,
block: &BlockStuff,
archive_data: &ArchiveData,
) -> Result<Arc<BlockHandle>> {
) -> Result<BlockHandle> {
let block_storage = self.inner.storage.block_storage();

let info = block.load_info()?;
Expand Down Expand Up @@ -259,11 +258,7 @@ pub mod test {
.unwrap();

for block in &blocks {
let handle = storage
.block_handle_storage()
.load_handle(block)
.unwrap()
.unwrap();
let handle = storage.block_handle_storage().load_handle(block).unwrap();
assert!(handle.meta().is_applied());
storage
.shard_state_storage()
Expand Down Expand Up @@ -306,7 +301,7 @@ pub mod test {
let (handle, _) = storage.block_handle_storage().create_or_load_handle(
&block_id,
BlockMetaData::zero_state(master.state().gen_utime),
)?;
);

storage
.shard_state_storage()
Expand All @@ -331,16 +326,13 @@ pub mod test {
let (handle, _) = storage.block_handle_storage().create_or_load_handle(
&shard_id,
BlockMetaData::zero_state(shard.state().gen_utime),
)?;
);
storage
.shard_state_storage()
.store_state(&handle, &shard)
.await?;

storage
.node_state()
.store_last_mc_block_id(&master_id)
.unwrap();
storage.node_state().store_last_mc_block_id(&master_id);
Ok((provider, storage))
}
}
22 changes: 14 additions & 8 deletions core/src/blockchain_rpc/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use anyhow::Context;
use bytes::Buf;
use serde::{Deserialize, Serialize};
use tycho_network::{Response, Service, ServiceRequest};
Expand Down Expand Up @@ -196,7 +197,7 @@ impl Inner {
.key_blocks_iterator(KeyBlocksDirection::ForwardFrom(req.block_id.seqno))
.take(limit + 1);

if let Some(id) = iterator.next().transpose()? {
if let Some(id) = iterator.next() {
anyhow::ensure!(
id.root_hash == req.block_id.root_hash,
"first block root hash mismatch"
Expand All @@ -208,7 +209,7 @@ impl Inner {
}

let mut ids = Vec::with_capacity(limit);
while let Some(id) = iterator.next().transpose()? {
while let Some(id) = iterator.next() {
ids.push(id);
if ids.len() >= limit {
break;
Expand Down Expand Up @@ -239,7 +240,7 @@ impl Inner {

let get_block_full = async {
let mut is_link = false;
let block = match block_handle_storage.load_handle(&req.block_id)? {
let block = match block_handle_storage.load_handle(&req.block_id) {
Some(handle)
if handle.meta().has_data() && handle.has_proof_or_link(&mut is_link) =>
{
Expand Down Expand Up @@ -277,14 +278,15 @@ impl Inner {
let block_storage = self.storage().block_storage();

let get_next_block_full = async {
let next_block_id = match block_handle_storage.load_handle(&req.prev_block_id)? {
let next_block_id = match block_handle_storage.load_handle(&req.prev_block_id) {
Some(handle) if handle.meta().has_next1() => block_connection_storage
.load_connection(&req.prev_block_id, BlockConnection::Next1)?,
.load_connection(&req.prev_block_id, BlockConnection::Next1)
.context("connection not found")?,
_ => return Ok(BlockFull::Empty),
};

let mut is_link = false;
let block = match block_handle_storage.load_handle(&next_block_id)? {
let block = match block_handle_storage.load_handle(&next_block_id) {
Some(handle)
if handle.meta().has_data() && handle.has_proof_or_link(&mut is_link) =>
{
Expand Down Expand Up @@ -356,8 +358,12 @@ impl Inner {
let node_state = self.storage.node_state();

let get_archive_id = || {
let last_applied_mc_block = node_state.load_last_mc_block_id()?;
let shards_client_mc_block_id = node_state.load_shards_client_mc_block_id()?;
let last_applied_mc_block = node_state
.load_last_mc_block_id()
.context("last mc block not found")?;
let shards_client_mc_block_id = node_state
.load_shards_client_mc_block_id()
.context("shard client mc block not found")?;
Ok::<_, anyhow::Error>((last_applied_mc_block, shards_client_mc_block_id))
};

Expand Down
4 changes: 2 additions & 2 deletions core/tests/common/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(crate) async fn init_empty_storage() -> Result<(Arc<Storage>, TempDir)> {
root_path.join("file_storage"),
db_options.cells_cache_size.as_u64(),
)?;
assert!(storage.node_state().load_init_mc_block_id().is_err());
assert!(storage.node_state().load_init_mc_block_id().is_none());

Ok((storage, tmp_dir))
}
Expand Down Expand Up @@ -68,7 +68,7 @@ pub(crate) async fn init_storage() -> Result<(Arc<Storage>, TempDir)> {

let handle = storage
.block_handle_storage()
.load_handle(&block_id)?
.load_handle(&block_id)
.unwrap();

assert_eq!(handle.id(), block_stuff.id());
Expand Down
60 changes: 50 additions & 10 deletions storage/src/models/block_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,22 @@ use tokio::sync::RwLock;
use super::BlockMeta;
use tycho_util::FastDashMap;

#[derive(Clone)]
#[repr(transparent)]
pub struct WeakBlockHandle {
inner: Weak<Inner>,
}

impl WeakBlockHandle {
pub fn strong_count(&self) -> usize {
self.inner.strong_count()
}

pub fn upgrade(&self) -> Option<BlockHandle> {
self.inner.upgrade().map(|inner| BlockHandle { inner })
}
}

#[derive(Clone)]
#[repr(transparent)]
pub struct BlockHandle {
Expand All @@ -14,13 +30,13 @@ pub struct BlockHandle {

impl BlockHandle {
pub fn new(
id: BlockId,
id: &BlockId,
meta: BlockMeta,
cache: Arc<FastDashMap<BlockId, Weak<BlockHandle>>>,
cache: Arc<FastDashMap<BlockId, WeakBlockHandle>>,
) -> Self {
Self {
inner: Arc::new(Inner {
id,
id: *id,
meta,
block_data_lock: Default::default(),
proof_data_block: Default::default(),
Expand Down Expand Up @@ -70,20 +86,44 @@ impl BlockHandle {
self.inner.meta.masterchain_ref_seqno()
}
}

pub fn downgrade(&self) -> WeakBlockHandle {
WeakBlockHandle {
inner: Arc::downgrade(&self.inner),
}
}
}

impl Drop for BlockHandle {
fn drop(&mut self) {
self.inner
.cache
.remove_if(&self.inner.id, |_, weak| weak.strong_count() == 0);
unsafe impl arc_swap::RefCnt for BlockHandle {
type Base = Inner;

fn into_ptr(me: Self) -> *mut Self::Base {
arc_swap::RefCnt::into_ptr(me.inner)
}

fn as_ptr(me: &Self) -> *mut Self::Base {
arc_swap::RefCnt::as_ptr(&me.inner)
}

unsafe fn from_ptr(ptr: *const Self::Base) -> Self {
Self {
inner: arc_swap::RefCnt::from_ptr(ptr),
}
}
}

struct Inner {
#[doc(hidden)]
pub struct Inner {
id: BlockId,
meta: BlockMeta,
block_data_lock: RwLock<()>,
proof_data_block: RwLock<()>,
cache: Arc<FastDashMap<BlockId, Weak<BlockHandle>>>,
cache: Arc<FastDashMap<BlockId, WeakBlockHandle>>,
}

impl Drop for Inner {
fn drop(&mut self) {
self.cache
.remove_if(&self.id, |_, weak| weak.strong_count() == 0);
}
}
7 changes: 3 additions & 4 deletions storage/src/models/block_meta.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::atomic::{AtomicU64, Ordering};

use anyhow::Result;
use bytes::Buf;
use everscale_types::models::BlockInfo;

Expand Down Expand Up @@ -205,17 +204,17 @@ impl StoredValue for BlockMeta {
buffer.write_raw_slice(&self.gen_utime.to_le_bytes());
}

fn deserialize(reader: &mut &[u8]) -> Result<Self>
fn deserialize(reader: &mut &[u8]) -> Self
where
Self: Sized,
{
let flags = reader.get_u64_le();
let gen_utime = reader.get_u32_le();

Ok(Self {
Self {
flags: AtomicU64::new(flags),
gen_utime,
})
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion storage/src/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub use block_handle::BlockHandle;
pub use block_handle::{BlockHandle, WeakBlockHandle};
pub use block_meta::{BlockMeta, BlockMetaData, BriefBlockMeta};

mod block_handle;
Expand Down
Loading

0 comments on commit 5e5678f

Please sign in to comment.