Skip to content

Commit

Permalink
broadcast dag block in any condition (#4138)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackzhhuang authored Jun 18, 2024
1 parent 715fb1a commit 8f65a0b
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 5 deletions.
19 changes: 18 additions & 1 deletion block-relayer/src/block_relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use starcoin_txpool::TxPoolService;
use starcoin_txpool_api::TxPoolSyncService;
use starcoin_types::block::ExecutedBlock;
use starcoin_types::sync_status::SyncStatus;
use starcoin_types::system_events::{NewBranch, SyncStatusChangeEvent};
use starcoin_types::system_events::{NewBranch, NewDagBlock, SyncStatusChangeEvent};
use starcoin_types::{
block::{Block, BlockBody},
compact_block::{CompactBlock, ShortId},
Expand Down Expand Up @@ -292,6 +292,23 @@ impl EventHandler<Self, NewHeadBlock> for BlockRelayer {
}
}

impl EventHandler<Self, NewDagBlock> for BlockRelayer {
fn handle_event(&mut self, event: NewDagBlock, ctx: &mut ServiceContext<BlockRelayer>) {
debug!(
"[block-relay] Handle new dag block event, block_id: {:?}",
event.executed_block.block().id()
);
let network = match ctx.get_shared::<NetworkServiceRef>() {
Ok(network) => network,
Err(e) => {
error!("Get network service error: {:?}", e);
return;
}
};
self.broadcast_compact_block(network, event.executed_block);
}
}

impl EventHandler<Self, NewBranch> for BlockRelayer {
fn handle_event(&mut self, event: NewBranch, ctx: &mut ServiceContext<BlockRelayer>) {
debug!(
Expand Down
9 changes: 8 additions & 1 deletion sync/src/block_connector/block_connector_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,14 @@ where
debug!("try connect mined block: {}", id);

match self.chain_service.try_connect(new_block.as_ref().clone()) {
std::result::Result::Ok(()) => debug!("Process mined block {} success.", id),
std::result::Result::Ok(()) => {
debug!("Process mined block {} success.", id);

match self.chain_service.broadcast_new_dag_block(id) {
std::result::Result::Ok(_) => (),
Err(e) => warn!("Process mined block {} fail, error: {:?}", id, e),
}
}
Err(e) => {
warn!("Process mined block {} fail, error: {:?}", id, e);
}
Expand Down
48 changes: 46 additions & 2 deletions sync/src/block_connector/write_block_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use starcoin_service_registry::bus::{Bus, BusService};
use starcoin_service_registry::{ServiceContext, ServiceRef};
use starcoin_storage::Store;
use starcoin_txpool_api::TxPoolSyncService;
use starcoin_types::block::BlockInfo;
use starcoin_types::block::{BlockInfo, DagHeaderType};
use starcoin_types::system_events::NewDagBlock;
#[cfg(test)]
use starcoin_types::{account::Account, block::BlockNumber};
use starcoin_types::{
Expand Down Expand Up @@ -508,7 +509,7 @@ where
Ok(blocks)
}

fn broadcast_new_head(&self, block: ExecutedBlock) {
pub fn broadcast_new_head(&self, block: ExecutedBlock) {
if let Some(metrics) = self.metrics.as_ref() {
metrics
.chain_select_head_total
Expand All @@ -523,6 +524,49 @@ where
}
}

// In dag chain, the new block minted by the node itself may have the less total difficulty than other nodes'.
// Hence, the new block may not be the new head of the chain.
// But it still needs to be broadcasted to other nodes.
pub fn broadcast_new_dag_block(&self, block_id: HashValue) -> Result<()> {
if let Some(metrics) = self.metrics.as_ref() {
metrics
.chain_select_head_total
.with_label_values(&["new_head"])
.inc()
}

let chain = self.get_main();

// the execution process broadcast the block already
if chain.current_header().id() == block_id {
return Ok(());
}

// the single chain no need to broadcast the block, it is only for dag
if chain.check_dag_type(chain.status().head())? == DagHeaderType::Single {
return Ok(());
}

let block = chain.get_block(block_id)?.ok_or_else(|| {
format_err!(
"failed to get the block after executing the block: {:?}",
block_id
)
})?;
let block_info = chain.get_block_info(Some(block_id))?.ok_or_else(|| {
format_err!(
"failed to get the block info after executing the block: {:?}",
block_id
)
})?;

self.bus
.broadcast(NewDagBlock {
executed_block: Arc::new(ExecutedBlock { block, block_info }),
})
.map_err(|e| format_err!("Broadcast NewDagBlock error: {:?}", e))
}

fn broadcast_new_branch(&self, block: ExecutedBlock) {
if let Some(metrics) = self.metrics.as_ref() {
metrics
Expand Down
6 changes: 5 additions & 1 deletion types/src/system_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct NewHeadBlock {
pub executed_block: Arc<ExecutedBlock>,
// pub tips: Option<Vec<HashValue>>,
}

#[derive(Clone, Debug)]
pub struct NewDagBlock {
pub executed_block: Arc<ExecutedBlock>,
}

/// may be uncle block
Expand Down

0 comments on commit 8f65a0b

Please sign in to comment.