Skip to content

Commit

Permalink
feat(collator): add import externals on hot start
Browse files Browse the repository at this point in the history
  • Loading branch information
serejkaaa512 committed Jul 10, 2024
1 parent fd5f2d0 commit 23ee8c1
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 26 deletions.
132 changes: 109 additions & 23 deletions collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,20 @@ impl CollatorStdImpl {
);
let working_state =
Self::build_and_validate_working_state(mc_state, prev_states, &self.state_tracker)?;
if let Some(processed_upto_anchor_id) = working_state
.prev_shard_data
.processed_upto()
.externals
.as_ref()
.map(|upto| upto.processed_to.0)
{
self.import_anchors_on_init(
processed_upto_anchor_id,
working_state.prev_shard_data.gen_chain_time(),
)
.await?;
}

self.set_working_state(working_state);

self.timer = std::time::Instant::now();
Expand Down Expand Up @@ -480,35 +494,17 @@ impl CollatorStdImpl {
///
/// Returns: (`next_anchor`, `has_externals`)
async fn import_next_anchor(&mut self) -> Result<(Arc<MempoolAnchor>, bool)> {
// TODO: make real implementation

let labels = [("workchain", self.shard_id.workchain().to_string())];

let _histogram =
HistogramGuardWithLabels::begin("tycho_collator_import_next_anchor_time", &labels);

let timer = std::time::Instant::now();

// TODO: use get_next_anchor() only once
let next_anchor = if let Some(prev_anchor_id) = self.last_imported_anchor_id {
self.mpool_adapter.get_next_anchor(prev_anchor_id).await?
} else {
let anchor_id_opt = self
.working_state()
.prev_shard_data
.processed_upto()
.externals
.as_ref()
.map(|upto| upto.processed_to.0);
match anchor_id_opt {
Some(anchor_id) => self
.mpool_adapter
.get_anchor_by_id(anchor_id)
.await?
.unwrap(),
None => self.mpool_adapter.get_next_anchor(0).await?,
}
};
let next_anchor = self
.mpool_adapter
.get_next_anchor(self.last_imported_anchor_id.unwrap_or_default())
.await?;

let externals_count = next_anchor.externals_count_for(&self.shard_id);
let has_externals = externals_count > 0;
Expand Down Expand Up @@ -543,6 +539,96 @@ impl CollatorStdImpl {
Ok((next_anchor, has_externals))
}

/// 1. Get anchor from `externals_processed_upto`
/// 2. Get next anchors until last_chain_time
/// 3. Store anchors in cache
async fn import_anchors_on_init(
&mut self,
processed_upto_anchor_id: u32,
last_block_chain_time: u64,
) -> Result<()> {
let labels = [("workchain", self.shard_id.workchain().to_string())];

let _histogram = HistogramGuardWithLabels::begin(
"tycho_collator_import_next_anchors_hot_start_time",
&labels,
);

let timer = std::time::Instant::now();

let mut next_anchor = self
.mpool_adapter
.get_anchor_by_id(processed_upto_anchor_id)
.await?
.unwrap();

#[derive(Debug)]
struct AnchorInfo {
id: u32,
chain_time: u64,
externals_count: usize,
}

let mut anchors: Vec<AnchorInfo> = vec![];
let mut externals_count = next_anchor.externals_count_for(&self.shard_id);
let has_externals = externals_count > 0;
if has_externals {
self.has_pending_externals = true;
}

let mut next_anchor_chain_time = next_anchor.chain_time();
let mut last_anchor_id = processed_upto_anchor_id;
self.anchors_cache
.push_back((processed_upto_anchor_id, CachedMempoolAnchor {
anchor: next_anchor.clone(),
has_externals,
}));

anchors.push(AnchorInfo {
id: last_anchor_id,
chain_time: next_anchor_chain_time,
externals_count,
});
while last_block_chain_time > next_anchor_chain_time {
next_anchor = self.mpool_adapter.get_next_anchor(last_anchor_id).await?;

externals_count = next_anchor.externals_count_for(&self.shard_id);
let has_externals = externals_count > 0;
if has_externals {
self.has_pending_externals = true;
}

next_anchor_chain_time = next_anchor.chain_time();
last_anchor_id = next_anchor.id();
self.anchors_cache
.push_back((processed_upto_anchor_id, CachedMempoolAnchor {
anchor: next_anchor.clone(),
has_externals,
}));

anchors.push(AnchorInfo {
id: last_anchor_id,
chain_time: next_anchor_chain_time,
externals_count,
});
}

metrics::counter!("tycho_collator_ext_msgs_imported_count", &labels)
.increment(anchors.iter().map(|a| a.externals_count).sum::<usize>() as u64);

self.last_imported_anchor_id = Some(last_anchor_id);
self.last_imported_anchor_chain_time = Some(next_anchor_chain_time);
self.last_imported_anchor_author = Some(next_anchor.author());

tracing::debug!(target: tracing_targets::COLLATOR,
elapsed = timer.elapsed().as_millis(),
"imported anchors on init ({:?})",
anchors
);

Ok(())
}

fn get_last_imported_anchor_chain_time(&self) -> u64 {
self.last_imported_anchor_chain_time.unwrap()
}
Expand Down Expand Up @@ -656,7 +742,7 @@ impl CollatorStdImpl {
tracing::info!(target: tracing_targets::COLLATOR,
"there are unprocessed internals from previous block, will collate next block",
);
let next_chain_time = self.working_state().prev_shard_data.gen_chain_time() as u64;
let next_chain_time = self.working_state().prev_shard_data.gen_chain_time();
self.do_collate(next_chain_time, None).await?;
} else {
// otherwise import next anchor and return it notify to manager
Expand Down
7 changes: 4 additions & 3 deletions collator/src/collator/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ pub(super) struct PrevData {
pure_states: Vec<ShardStateStuff>,
pure_state_root: Cell,

gen_chain_time: u32,
gen_chain_time: u64,
gen_lt: u64,
total_validator_fees: CurrencyCollection,
gas_used_from_last_anchor: u64,
Expand Down Expand Up @@ -221,6 +221,7 @@ impl PrevData {
)?];

let gen_utime = observable_states[0].state().gen_utime;
let gen_utime_ms = observable_states[0].state().gen_utime_ms;
let gen_lt = observable_states[0].state().gen_lt;
let observable_accounts = observable_states[0].state().load_accounts()?;
let total_validator_fees = observable_states[0].state().total_validator_fees.clone();
Expand All @@ -237,7 +238,7 @@ impl PrevData {
pure_states: pure_prev_states,
pure_state_root: pure_prev_state_root.clone(),

gen_chain_time: gen_utime,
gen_chain_time: gen_utime as u64 * 1000 + gen_utime_ms as u64,
gen_lt,
total_validator_fees,
gas_used_from_last_anchor,
Expand Down Expand Up @@ -295,7 +296,7 @@ impl PrevData {
&self.pure_state_root
}

pub fn gen_chain_time(&self) -> u32 {
pub fn gen_chain_time(&self) -> u64 {
self.gen_chain_time
}

Expand Down

0 comments on commit 23ee8c1

Please sign in to comment.