Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed Jan 10, 2025
1 parent 9fdc4c9 commit 47b386c
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 79 deletions.
137 changes: 70 additions & 67 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,24 @@ impl Core {
received_quorum_rounds: Vec<QuorumRound>,
accepted_quorum_rounds: Vec<QuorumRound>,
) {
info!("Received quorum round per authority in ancestor state manager set to: {received_quorum_rounds:?}");
info!("Accepted quorum round per authority in ancestor state manager set to: {accepted_quorum_rounds:?}");
info!(
"Received quorum round per authority in ancestor state manager set to: {}",
self.context
.committee
.authorities()
.zip(received_quorum_rounds.iter())
.map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
.join(", ")
);
info!(
"Accepted quorum round per authority in ancestor state manager set to: {}",
self.context
.committee
.authorities()
.zip(accepted_quorum_rounds.iter())
.map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
.join(", ")
);
self.ancestor_state_manager
.set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
info!("Propagation round delay set to: {delay}");
Expand Down Expand Up @@ -831,10 +847,8 @@ impl Core {
clock_round: Round,
smart_select: bool,
) -> Vec<VerifiedBlock> {
let _s = self
.context
.metrics
.node_metrics
let node_metrics = &self.context.metrics.node_metrics;
let _s = node_metrics
.scope_processing_time
.with_label_values(&["Core::smart_ancestors_to_propose"])
.start_timer();
Expand Down Expand Up @@ -904,7 +918,7 @@ impl Core {
}

if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
self.context.metrics.node_metrics.smart_selection_wait.inc();
node_metrics.smart_selection_wait.inc();
debug!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake());
return vec![];
}
Expand All @@ -924,82 +938,71 @@ impl Core {
debug!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}");
parent_round_quorum.add(ancestor.author(), &self.context.committee);
ancestors_to_propose.push(ancestor);
self.context
.metrics
.node_metrics
node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "strong"])
.with_label_values(&[block_hostname, "timeout"])
.inc();
} else {
excluded_ancestors.push((score, ancestor));
}
}

// Inclusion of weak links for low score ancestors so there is no long
// list of blocks that we need to include later.
// Include partially propagated blocks from excluded authorities, to help propagate the blocks
// across the network with less latency impact.
// TODO: use a separate mechanism to propagate excluded ancestor blocks and remove this logic.
for (score, ancestor) in excluded_ancestors.iter() {
let excluded_author = ancestor.author();
let block_hostname = &self.context.committee.authority(excluded_author).hostname;
// Use the low quorum accepted round as reported by the network for which
// round to include for this ancestor.
let mut network_low_quorum_round = self
// A quorum of validators reported to have accepted blocks from the excluded_author up to the low quorum round.
let mut accepted_low_quorum_round = self
.ancestor_state_manager
.accepted_quorum_round_per_authority[excluded_author]
.0;

// If the network quourum round for this ancestor is greater than or equal
// If the accepted quorum round of this ancestor is greater than or equal
// to the clock round then we want to make sure to set it to clock_round - 1
// as that is the max round we can include as an ancestor.
network_low_quorum_round = network_low_quorum_round.min(quorum_round);

if let Some(last_block_ref) = self.last_included_ancestors[excluded_author] {
if last_block_ref.round < network_low_quorum_round {
if ancestor.round() == network_low_quorum_round {
// Include the ancestor block as it has been seen & accepted by a strong quorum
self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
ancestors_to_propose.push(ancestor.clone());
debug!("Included low scoring ancestor {ancestor} with score {score} seen at network low quorum accepted round {network_low_quorum_round} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "weak"])
.inc();
continue;
}

// Fetch ancestor block from the store for older round
let blocks = self
.dag_state
.read()
.get_uncommitted_blocks_at_slot(Slot::new(
network_low_quorum_round,
excluded_author,
));

if let Some(block) = blocks.first() {
self.last_included_ancestors[excluded_author] = Some(block.reference());
ancestors_to_propose.push(block.clone());
debug!("Included low scoring ancestor {block} with score {score} seen at network low quorum accepted round {network_low_quorum_round} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "weak"])
.inc();
continue;
} else {
debug!("No earlier uncommitted block found for low scoring ancestor {ancestor} with score {score} seen at network low quorum accepted round {network_low_quorum_round} to propose for round {clock_round}");
}
}
// as that is the max round the new block can include as an ancestor.
accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);

let last_included_round = self.last_included_ancestors[excluded_author]
.map(|block_ref| block_ref.round)
.unwrap_or(GENESIS_ROUND);
if last_included_round >= accepted_low_quorum_round {
trace!(
"Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {} >= accepted low quorum round {}",
ancestor.reference(), last_included_round, accepted_low_quorum_round,
);
node_metrics
.excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
.inc();
continue;
}

trace!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
.excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
// Include the ancestor block as it has been seen & accepted by a strong quorum.
let ancestor = if ancestor.round() == accepted_low_quorum_round {
ancestor.clone()
} else {
// Only cached blocks need to be propagated. Committed and GC'ed blocks do not need to be propagated.
let Some(ancestor) = self.dag_state.read().get_last_cached_block_in_range(
excluded_author,
last_included_round + 1,
accepted_low_quorum_round + 1,
) else {
trace!("Excluded low score ancestor {} with score {score} to propose for round {clock_round}: no suitable block found", ancestor.reference());
node_metrics
.excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
.inc();
continue;
};
ancestor
};
self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
ancestors_to_propose.push(ancestor.clone());
trace!("Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}", ancestor.reference());
node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "quorum"])
.inc();
}

Expand Down
100 changes: 88 additions & 12 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,10 +574,40 @@ impl DagState {
blocks
}

/// Returns the last block proposed per authority with `round < end_round`.
// Retrieves the cached block within the range [start_round, end_round) from a given authority.
// NOTE: end_round must be greater than GENESIS_ROUND.
pub(crate) fn get_last_cached_block_in_range(
&self,
authority: AuthorityIndex,
start_round: Round,
end_round: Round,
) -> Option<VerifiedBlock> {
if end_round == GENESIS_ROUND {
panic!(
"Attempted to retrieve blocks earlier than the genesis round which is impossible"
);
}

let block_ref = self.recent_refs_by_authority[authority]
.range((
Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
Excluded(BlockRef::new(
end_round,
AuthorityIndex::MIN,
BlockDigest::MIN,
)),
))
.last()?;

self.recent_blocks
.get(block_ref)
.map(|block_info| block_info.block.clone())
}

/// Returns the last block proposed per authority with `evicted round < round < end_round`.
/// The method is guaranteed to return results only when the `end_round` is not earlier of the
/// available cached data for each authority, otherwise the method will panic - it's the caller's
/// responsibility to ensure that is not requesting filtering for earlier rounds .
/// available cached data for each authority (evicted round + 1), otherwise the method will panic.
/// It's the caller's responsibility to ensure that is not requesting for earlier rounds.
/// In case of equivocation for an authority's last slot only one block will be returned (the last in order).
pub(crate) fn get_last_cached_block_per_authority(
&self,
Expand Down Expand Up @@ -2108,7 +2138,7 @@ mod test {

#[rstest]
#[tokio::test]
async fn test_get_cached_last_block_per_authority(#[values(0, 1)] gc_depth: u32) {
async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) {
// GIVEN
const CACHED_ROUNDS: Round = 2;
let (mut context, _) = Context::new_for_test(4);
Expand Down Expand Up @@ -2161,14 +2191,46 @@ mod test {

// WHEN search for the latest blocks
let end_round = 4;
let expected_rounds = vec![0, 1, 2, 3];

// THEN
let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
assert_eq!(
last_blocks.iter().map(|b| b.round()).collect::<Vec<_>>(),
expected_rounds
);

// THEN
assert_eq!(last_blocks[0].round(), 0);
assert_eq!(last_blocks[1].round(), 1);
assert_eq!(last_blocks[2].round(), 2);
assert_eq!(last_blocks[3].round(), 3);
for (i, expected_round) in expected_rounds.iter().enumerate() {
let round = dag_state
.get_last_cached_block_in_range(
context.committee.to_authority_index(i).unwrap(),
0,
end_round,
)
.map(|b| b.round())
.unwrap_or_default();
assert_eq!(round, *expected_round, "Authority {i}");
}

// WHEN starting from round 2
let start_round = 2;
let expected_rounds = [0, 0, 2, 3];

// THEN
for (i, expected_round) in expected_rounds.iter().enumerate() {
let round = dag_state
.get_last_cached_block_in_range(
context.committee.to_authority_index(i).unwrap(),
start_round,
end_round,
)
.map(|b| b.round())
.unwrap_or_default();
assert_eq!(round, *expected_round, "Authority {i}");
}

// WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
// WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
// a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND.
//
Expand All @@ -2178,13 +2240,27 @@ mod test {

// AND we request before round 3
let end_round = 3;
let expected_rounds = vec![0, 1, 2, 2];

// THEN
let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
assert_eq!(
last_blocks.iter().map(|b| b.round()).collect::<Vec<_>>(),
expected_rounds
);

// THEN
assert_eq!(last_blocks[0].round(), 0);
assert_eq!(last_blocks[1].round(), 1);
assert_eq!(last_blocks[2].round(), 2);
assert_eq!(last_blocks[3].round(), 2);
for (i, expected_round) in expected_rounds.iter().enumerate() {
let round = dag_state
.get_last_cached_block_in_range(
context.committee.to_authority_index(i).unwrap(),
0,
end_round,
)
.map(|b| b.round())
.unwrap_or_default();
assert_eq!(round, *expected_round, "Authority {i}");
}
}

#[tokio::test]
Expand Down

0 comments on commit 47b386c

Please sign in to comment.