Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feature/empty-block-collation'
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed May 1, 2024
2 parents 1a50d92 + c9f1bc8 commit 5839867
Show file tree
Hide file tree
Showing 38 changed files with 1,517 additions and 1,070 deletions.
37 changes: 19 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ tikv-jemallocator = { version = "0.5", features = [
] }
tl-proto = "0.4"
tokio = { version = "1", default-features = false }
tokio-util = { version = "0.7", features = ["codec"] }
tokio-util = { version = "0.7.10", features = ["codec"] }
tracing = "0.1"
tracing-appender = "0.2.3"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
1 change: 1 addition & 0 deletions collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ sha2 = { workspace = true }
tl-proto = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "signal"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }

Expand Down
10 changes: 5 additions & 5 deletions collator/src/collator/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ impl McData {
BlockRef {
end_lt,
seqno: block_id.seqno,
root_hash: block_id.root_hash.clone(),
file_hash: block_id.file_hash.clone(),
root_hash: block_id.root_hash,
file_hash: block_id.file_hash,
}
}

Expand Down Expand Up @@ -265,7 +265,7 @@ impl PrevData {
}

pub fn get_blocks_ref(&self) -> Result<PrevBlockRef> {
if self.pure_states.len() < 1 || self.pure_states.len() > 2 {
if self.pure_states.is_empty() || self.pure_states.len() > 2 {
bail!(
"There should be 1 or 2 prev states. Actual count is {}",
self.pure_states.len()
Expand All @@ -277,8 +277,8 @@ impl PrevData {
block_refs.push(BlockRef {
end_lt: state.state().gen_lt,
seqno: state.block_id().seqno,
root_hash: state.block_id().root_hash.clone(),
file_hash: state.block_id().file_hash.clone(),
root_hash: state.block_id().root_hash,
file_hash: state.block_id().file_hash,
});
}

Expand Down
2 changes: 1 addition & 1 deletion collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ pub mod types;
mod utils;
pub mod validator;

pub use validator::test_impl as validator_test_impl;
// pub use validator::test_impl as validator_test_impl;
25 changes: 16 additions & 9 deletions collator/src/manager/collation_manager.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;

use everscale_types::models::{BlockId, ShardIdent};
use tycho_block_util::state::ShardStateStuff;

use tycho_core::internal_queue::iterator::QueueIteratorImpl;

use crate::validator::config::ValidatorConfig;
use crate::{
collator::{
collator_processor::CollatorProcessorStdImpl, Collator, CollatorEventListener,
Expand All @@ -24,10 +28,7 @@ use crate::{
async_queued_dispatcher::{AsyncQueuedDispatcher, STANDARD_DISPATCHER_QUEUE_BUFFER_SIZE},
schedule_async_action,
},
validator::{
validator_processor::{ValidatorProcessor, ValidatorProcessorStdImpl},
Validator, ValidatorEventListener, ValidatorStdImpl,
},
validator::{Validator, ValidatorEventListener, ValidatorStdImpl},
};

use super::collation_processor::CollationProcessor;
Expand Down Expand Up @@ -82,7 +83,7 @@ where
{
CollationManagerGenImpl::<
CollatorStdImpl<CollatorProcessorStdImpl<_, _, _>, _, _, _>,
ValidatorStdImpl<ValidatorProcessorStdImpl<_>, _>,
ValidatorStdImpl<_>,
MessageQueueAdapterStdImpl,
MP,
ST,
Expand All @@ -94,7 +95,7 @@ where
)
}
#[allow(private_bounds)]
pub fn create_std_manager_with_validator<MP, ST, V>(
pub fn create_std_manager_with_validator<MP, ST>(
config: CollationConfig,
mpool_adapter_builder: impl MempoolAdapterBuilder<MP> + Send,
state_adapter_builder: impl StateNodeAdapterBuilder<ST> + Send,
Expand All @@ -103,11 +104,10 @@ pub fn create_std_manager_with_validator<MP, ST, V>(
where
MP: MempoolAdapter,
ST: StateNodeAdapter,
V: ValidatorProcessor<ST>,
{
CollationManagerGenImpl::<
CollatorStdImpl<CollatorProcessorStdImpl<_, _, _>, _, _, _>,
ValidatorStdImpl<V, _>,
ValidatorStdImpl<_>,
MessageQueueAdapterStdImpl,
MP,
ST,
Expand Down Expand Up @@ -150,11 +150,18 @@ where
let state_node_adapter = state_adapter_builder.build(dispatcher.clone());
let state_node_adapter = Arc::new(state_node_adapter);

let validator_config = ValidatorConfig {
base_loop_delay: Duration::from_millis(50),
max_loop_delay: Duration::from_secs(10),
};

// create validator and start its tasks queue
let validator = Validator::create(
dispatcher.clone(),
vec![dispatcher.clone()],
state_node_adapter.clone(),
node_network.into(),
config.key_pair,
validator_config,
);

// create collation processor that will use these adapters
Expand Down
12 changes: 4 additions & 8 deletions collator/src/manager/collation_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ where

// notify validator, it will start overlay initialization
self.validator
.enqueue_add_session(Arc::new(new_session_info.clone().try_into()?))
.add_session(Arc::new(new_session_info.clone().try_into()?))
.await?;
} else {
tracing::info!(
Expand Down Expand Up @@ -678,13 +678,9 @@ where
candidate_id.as_short_id(),
candidate_chain_time,
);
let current_collator_keypair = self.config.key_pair;
self.validator
.enqueue_candidate_validation(
candidate_id,
session_info.seqno(),
current_collator_keypair,
)
let _handle = self
.validator
.validate(candidate_id, session_info.seqno())
.await?;

// chek if master block min interval elapsed and it needs to collate new master block
Expand Down
8 changes: 8 additions & 0 deletions collator/src/state_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub trait StateNodeAdapter: BlockProvider + Send + Sync + 'static {
pub struct StateNodeAdapterStdImpl {
listener: Arc<dyn StateNodeEventListener>,
blocks: Arc<Mutex<HashMap<ShardIdent, BTreeMap<u32, BlockStuffForSync>>>>,
blocks_mapping: Arc<Mutex<HashMap<BlockId, BlockId>>>,
storage: Storage,
broadcaster: broadcast::Sender<BlockId>,
}
Expand Down Expand Up @@ -95,6 +96,7 @@ impl StateNodeAdapterStdImpl {
storage,
blocks: Default::default(),
broadcaster,
blocks_mapping: Arc::new(Default::default()),
}
}

Expand Down Expand Up @@ -247,10 +249,16 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl {
.last()
.ok_or(anyhow!("no prev block"))?;

self.blocks_mapping
.lock()
.await
.insert(block.block_id, prev_block_id);

blocks
.entry(block.block_id.shard)
.or_insert_with(BTreeMap::new)
.insert(prev_block_id.seqno, block);

prev_block_id
}
false => {
Expand Down
6 changes: 2 additions & 4 deletions collator/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ pub async fn prepare_test_storage() -> anyhow::Result<(DummyArchiveProvider, Sto

// shard state
let shard_bytes = include_bytes!("../src/state_node/tests/data/test_state_2_0:80.boc");

let shard_root = Boc::decode(shard_bytes)?;

let shard_state = shard_root.parse::<ShardStateUnsplit>()?;
let shard_id = BlockId {
shard: shard_info.0,
Expand Down Expand Up @@ -153,11 +151,11 @@ impl BlockProvider for DummyArchiveProvider {
type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;

fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
fn get_next_block<'a>(&'a self, _prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
futures_util::future::ready(None).boxed()
}

fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> {
fn get_block<'a>(&'a self, _block_id: &'a BlockId) -> Self::GetBlockFut<'a> {
futures_util::future::ready(None).boxed()
}
}
1 change: 1 addition & 0 deletions collator/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl ShardStateStuffExt for ShardStateStuff {
}
}

#[derive(Clone)]
pub enum OnValidatedBlockEvent {
ValidByState,
Invalid,
Expand Down
6 changes: 6 additions & 0 deletions collator/src/validator/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use std::time::Duration;

pub struct ValidatorConfig {
pub base_loop_delay: Duration,
pub max_loop_delay: Duration,
}
3 changes: 1 addition & 2 deletions collator/src/validator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
pub(crate) use validator::*;

pub mod config;
pub mod network;
pub mod state;
pub mod test_impl;
pub mod types;
#[allow(clippy::module_inception)]
pub mod validator;
pub mod validator_processor;
Loading

0 comments on commit 5839867

Please sign in to comment.