Skip to content

Commit

Permalink
feat: reduce blocks lock duration
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Jun 11, 2024
1 parent a3cf4fb commit 53ca964
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 41 deletions.
65 changes: 28 additions & 37 deletions collator/src/state_node.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Instant;

use anyhow::{Context, Result};
use async_trait::async_trait;
use everscale_types::models::{BlockId, BlockIdShort, ShardIdent};
use tokio::sync::{broadcast, Mutex};
use tokio::sync::broadcast;
use tycho_block_util::block::{BlockStuff, BlockStuffAug};
use tycho_block_util::state::ShardStateStuff;
use tycho_storage::{BlockHandle, Storage};
use tycho_util::metrics::HistogramGuard;
use tycho_util::FastDashMap;

use crate::tracing_targets;
use crate::types::BlockStuffForSync;
Expand Down Expand Up @@ -66,7 +66,7 @@ pub trait StateNodeAdapter: Send + Sync + 'static {

pub struct StateNodeAdapterStdImpl {
listener: Arc<dyn StateNodeEventListener>,
blocks: Arc<Mutex<HashMap<ShardIdent, BTreeMap<u32, BlockStuffForSync>>>>,
blocks: FastDashMap<ShardIdent, BTreeMap<u32, BlockStuffForSync>>,
storage: Storage,
broadcaster: broadcast::Sender<BlockId>,
}
Expand Down Expand Up @@ -124,9 +124,9 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl {

async fn accept_block(&self, block: BlockStuffForSync) -> Result<()> {
tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Block accepted: {}", block.block_id.as_short_id());
let mut blocks = self.blocks.lock().await;

let block_id = block.block_id;
blocks
self.blocks
.entry(block.block_id.shard)
.or_insert_with(BTreeMap::new)
.insert(block.block_id.seqno, block);
Expand Down Expand Up @@ -160,9 +160,8 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl {
let seqno = block_id.seqno;

{
let blocks_guard = self.blocks.lock().await;
if let Some(shard_blocks) = blocks_guard.get(&shard) {
let block = shard_blocks.get(&seqno);
let has_block = if let Some(shard_blocks) = self.blocks.get(&shard) {
let has_block = shard_blocks.contains_key(&seqno);

if shard.is_masterchain() {
let prev_mc_block = shard_blocks
Expand All @@ -178,38 +177,32 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl {
}
}

match block {
None => {
let _histogram = HistogramGuard::begin(
"tycho_collator_adapter_on_block_accepted_ext_time",
);
has_block
} else {
false
};

tracing::info!(target: tracing_targets::STATE_NODE_ADAPTER, "Block handled external: {:?}", block_id);
self.listener.on_block_accepted_external(state).await?;
}
Some(block) => {
let _histogram =
HistogramGuard::begin("tycho_collator_adapter_on_block_accepted_time");
match has_block {
false => {
let _histogram =
HistogramGuard::begin("tycho_collator_adapter_on_block_accepted_ext_time");

tracing::info!(target: tracing_targets::STATE_NODE_ADAPTER, "Block handled: {:?}", block_id);
self.listener.on_block_accepted(&block.block_id).await?;
}
tracing::info!(target: tracing_targets::STATE_NODE_ADAPTER, "Block handled external: {:?}", block_id);
self.listener.on_block_accepted_external(state).await?;
}
} else {
let _histogram =
HistogramGuard::begin("tycho_collator_adapter_on_block_accepted_alt_ext_time");
true => {
let _histogram =
HistogramGuard::begin("tycho_collator_adapter_on_block_accepted_time");

tracing::info!(target: tracing_targets::STATE_NODE_ADAPTER, "Block handled external. Shard ID not found in blocks buffer: {:?}", block_id);
self.listener.on_block_accepted_external(state).await?;
tracing::info!(target: tracing_targets::STATE_NODE_ADAPTER, "Block handled: {:?}", block_id);
self.listener.on_block_accepted(&block_id).await?;
}
}
}

{
let mut blocks_guard = self.blocks.lock().await;
for (shard, seqno) in &to_split {
if let Some(shard_blocks) = blocks_guard.get_mut(shard) {
*shard_blocks = shard_blocks.split_off(seqno);
}
for (shard, seqno) in &to_split {
if let Some(mut shard_blocks) = self.blocks.get_mut(shard) {
*shard_blocks = shard_blocks.split_off(seqno);
}
}

Expand All @@ -224,13 +217,11 @@ impl StateNodeAdapterStdImpl {
) -> Option<Result<BlockStuffAug>> {
let mut receiver = self.broadcaster.subscribe();
loop {
let blocks = self.blocks.lock().await;
if let Some(shard_blocks) = blocks.get(&block_id.shard()) {
if let Some(shard_blocks) = self.blocks.get(&block_id.shard()) {
if let Some(block) = shard_blocks.get(&block_id.seqno()) {
return Some(Ok(block.block_stuff_aug.clone()));
}
}
drop(blocks);

loop {
match receiver.recv().await {
Expand Down
3 changes: 3 additions & 0 deletions rpc/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tycho_core::block_strider::{
};
use tycho_core::blockchain_rpc::BlockchainRpcClient;
use tycho_storage::{CodeHashesIter, KeyBlocksDirection, Storage, TransactionsIterBuilder};
use tycho_util::metrics::HistogramGuard;
use tycho_util::time::now_sec;
use tycho_util::FastHashMap;

Expand Down Expand Up @@ -350,6 +351,8 @@ impl Inner {
}

async fn update(&self, block: &BlockStuff, state: Option<&ShardStateStuff>) -> Result<()> {
let _histogram = HistogramGuard::begin("tycho_rpc_state_update_time");

let is_masterchain = block.id().is_masterchain();
if is_masterchain {
self.update_mc_block_cache(block)?;
Expand Down
11 changes: 7 additions & 4 deletions scripts/gen-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,13 @@ def core_block_strider() -> RowPanel:

def jrpc() -> RowPanel:
metrics = [
create_heatmap_panel(
"tycho_rpc_state_update_time", "Time to update RPC state on block"
),
create_heatmap_panel(
"tycho_storage_update_rpc_state_time",
"Time to update RPC storage on block",
),
create_counter_panel(
"tycho_rpc_in_req_total", "Number of incoming JRPC requests over time"
),
Expand Down Expand Up @@ -363,10 +370,6 @@ def collator_do_collate() -> RowPanel:
"tycho_collator_adapter_on_block_accepted_ext_time",
"on_block_accepted_external with blocks guard",
),
create_heatmap_panel(
"tycho_collator_adapter_on_block_accepted_alt_ext_time",
"on_block_accepted_external without blocks guard",
),
create_heatmap_panel(
"tycho_collator_adapter_on_block_accepted_time", "on_block_accepted"
),
Expand Down
3 changes: 3 additions & 0 deletions storage/src/store/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use everscale_types::prelude::*;
use metrics::atomics::AtomicU64;
use tycho_block_util::block::BlockStuff;
use tycho_block_util::state::ShardStateStuff;
use tycho_util::metrics::HistogramGuard;
use tycho_util::FastHashMap;
use weedb::{rocksdb, OwnedSnapshot};

Expand Down Expand Up @@ -530,6 +531,8 @@ impl RpcStorage {
let span = tracing::Span::current();
let db = self.db.clone();
tokio::task::spawn_blocking(move || {
let _histogram = HistogramGuard::begin("tycho_storage_update_rpc_state_time");

let _span = span.enter();

let extra = block.block().load_extra()?;
Expand Down

0 comments on commit 53ca964

Please sign in to comment.