Skip to content

Commit

Permalink
refactor(mempool): less clone() stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo committed May 17, 2024
1 parent 5fb3c86 commit 5424d60
Show file tree
Hide file tree
Showing 17 changed files with 168 additions and 170 deletions.
4 changes: 2 additions & 2 deletions consensus/src/dag/anchor_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ impl AnchorStage {
// 1 is an anchor candidate (surprisingly, nothing special about this point)
0 | 1 => None,
2 => Some(AnchorStage::Proof {
leader: leader.clone(),
leader: *leader,
is_used: AtomicBool::new(false),
}),
3 => Some(AnchorStage::Trigger {
leader: leader.clone(),
leader: *leader,
is_used: AtomicBool::new(false),
}),
_ => unreachable!(),
Expand Down
111 changes: 55 additions & 56 deletions consensus/src/dag/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ impl Dag {
pub fn init(&self, dag_round: DagRound) {
let mut rounds = self.rounds.lock();
assert!(rounds.is_empty(), "DAG already initialized");
rounds.insert(dag_round.round().clone(), dag_round);
rounds.insert(dag_round.round(), dag_round);
}

pub fn top(&self, round: &Round, peer_schedule: &PeerSchedule) -> DagRound {
pub fn top(&self, round: Round, peer_schedule: &PeerSchedule) -> DagRound {
let mut rounds = self.rounds.lock();
let mut top = match rounds.last_key_value() {
None => unreachable!("DAG cannot be empty if properly initialized?"),
Expand Down Expand Up @@ -85,8 +85,8 @@ impl Dag {
}
}

async fn latest_trigger(next_round: &DagRound) -> Option<ValidPoint> {
let mut next_dag_round = next_round.clone();
async fn latest_trigger(next_dag_round: &DagRound) -> Option<ValidPoint> {
let mut next_dag_round = next_dag_round.clone();
let mut latest_trigger = None;
while let Some(current_dag_round) = next_dag_round.prev().get() {
if let Some(AnchorStage::Trigger {
Expand Down Expand Up @@ -133,64 +133,63 @@ impl Dag {
panic!("anchor proof round not in DAG")
};
loop {
let Some(proof_round) = future_round.scan(&proof.point.body.location.round) else {
let Some(proof_round) = future_round.scan(proof.point.body.location.round) else {
panic!("anchor proof round not in DAG while a point from it was received")
};
if proof_round.round() == &MempoolConfig::GENESIS_ROUND {
if proof_round.round() == MempoolConfig::GENESIS_ROUND {
break;
}
match proof_round.anchor_stage() {
Some(AnchorStage::Proof {
ref leader,
ref is_used,
}) => {
assert_eq!(
proof.point.body.location.round,
*proof_round.round(),
"anchor proof round does not match"
);
assert_eq!(
proof.point.body.location.author, leader,
"anchor proof author does not match prescribed by round"
);
let Some(anchor_round) = proof_round.prev().get() else {
break;
};
if is_used.load(Ordering::Relaxed) {
break;
};
let mut proofs = FuturesUnordered::new();
proof_round.view(leader, |loc| {
for (_, version) in loc.versions() {
proofs.push(version.clone())
}
});
let mut anchor = None;
'v: while let Some((proof, _)) = proofs.next().await {
if let Some(valid) = proof.into_valid() {
let Some(valid) = proof_round.vertex_by_proof(&valid).await else {
panic!("anchor proof is not linked to anchor, validation broken")
};
_ = anchor.insert(valid);
is_used.store(true, Ordering::Relaxed);
break 'v;
}
}
let anchor = anchor
.expect("any anchor proof points to anchor point, validation is broken");
anchor_stack.push((anchor.clone(), anchor_round.clone()));

let Some(next_proof) = proof_round
.valid_point(&anchor.point.anchor_id(LinkField::Proof))
.await
else {
break;
let Some(AnchorStage::Proof {
ref leader,
ref is_used,
}) = proof_round.anchor_stage()
else {
panic!("anchor proof round is not expected, validation is broken")
};
assert_eq!(
proof.point.body.location.round,
proof_round.round(),
"anchor proof round does not match"
);
assert_eq!(
proof.point.body.location.author, leader,
"anchor proof author does not match prescribed by round"
);
let Some(anchor_round) = proof_round.prev().get() else {
break;
};
if is_used.load(Ordering::Relaxed) {
break;
};
let mut proofs = FuturesUnordered::new();
proof_round.view(leader, |loc| {
for (_, version) in loc.versions() {
proofs.push(version.clone())
}
});
let mut anchor = None;
'v: while let Some((proof, _)) = proofs.next().await {
if let Some(valid) = proof.into_valid() {
let Some(valid) = proof_round.vertex_by_proof(&valid).await else {
panic!("anchor proof is not linked to anchor, validation broken")
};
proof = next_proof;
future_round = anchor_round;
_ = anchor.insert(valid);
is_used.store(true, Ordering::Relaxed);
break 'v;
}
_ => panic!("anchor proof round is not expected, validation is broken"),
}
let anchor =
anchor.expect("any anchor proof points to anchor point, validation is broken");
anchor_stack.push((anchor.clone(), anchor_round.clone()));

let Some(next_proof) = proof_round
.valid_point(&anchor.point.anchor_id(LinkField::Proof))
.await
else {
break;
};
proof = next_proof;
future_round = anchor_round;
}
anchor_stack
}
Expand All @@ -210,7 +209,7 @@ impl Dag {
anchor_round: &DagRound, // r+1
) -> Vec<Arc<Point>> {
assert_eq!(
*anchor_round.round(),
anchor_round.round(),
anchor.body.location.round,
"passed anchor round does not match anchor point's round"
);
Expand Down
10 changes: 5 additions & 5 deletions consensus/src/dag/dag_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl InclusionState {
None => assert!(false, "Coding error: own point is not trusted"),
Some(valid) => {
_ = signed.set(Ok(Signed {
at: valid.point.body.location.round.clone(),
at: valid.point.body.location.round,
with: valid.point.signature.clone(),
}))
}
Expand All @@ -132,9 +132,9 @@ impl InclusionState {
pub fn signed(&self) -> Option<&'_ Result<Signed, ()>> {
self.0.get()?.signed.get()
}
pub fn signed_point(&self, at: &Round) -> Option<&'_ ValidPoint> {
pub fn signed_point(&self, at: Round) -> Option<&'_ ValidPoint> {
let signable = self.0.get()?;
if &signable.signed.get()?.as_ref().ok()?.at == at {
if signable.signed.get()?.as_ref().ok()?.at == at {
signable.first_completed.valid()
} else {
None
Expand Down Expand Up @@ -168,7 +168,7 @@ pub struct Signed {
impl Signable {
pub fn sign(
&self,
at: &Round,
at: Round,
key_pair: Option<&KeyPair>, // same round for own point and next round for other's
time_range: RangeInclusive<UnixTime>,
) -> bool {
Expand All @@ -178,7 +178,7 @@ impl Signable {
_ = self.signed.get_or_init(|| {
this_call_signed = true;
Ok(Signed {
at: at.clone(),
at,
with: Signature::new(key_pair, &valid.point.digest),
})
});
Expand Down
40 changes: 20 additions & 20 deletions consensus/src/dag/dag_round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ impl DagRound {
}

pub fn new(round: Round, peer_schedule: &PeerSchedule, prev: WeakDagRound) -> Self {
let peers = peer_schedule.peers_for(&round);
let peers = peer_schedule.peers_for(round);
let locations = FastDashMap::with_capacity_and_hasher(peers.len(), Default::default());
Self(Arc::new(DagRoundInner {
round,
node_count: NodeCount::try_from(peers.len())
.expect(&format!("peer schedule updated for {round:?}")),
key_pair: peer_schedule.local_keys(&round),
key_pair: peer_schedule.local_keys(round),
anchor_stage: AnchorStage::of(round, peer_schedule),
locations,
prev,
Expand All @@ -70,16 +70,16 @@ impl DagRound {

pub fn next(&self, peer_schedule: &PeerSchedule) -> Self {
let next_round = self.round().next();
let peers = peer_schedule.peers_for(&next_round);
let peers = peer_schedule.peers_for(next_round);
let locations = FastDashMap::with_capacity_and_hasher(peers.len(), Default::default());
Self(Arc::new(DagRoundInner {
round: next_round,
node_count: NodeCount::try_from(peers.len())
.expect(&format!("peer schedule updated for {next_round:?}")),
key_pair: peer_schedule.local_keys(&next_round),
key_pair: peer_schedule.local_keys(next_round),
anchor_stage: AnchorStage::of(next_round, peer_schedule),
locations,
prev: self.as_weak(),
prev: self.to_weak(),
}))
}

Expand All @@ -96,12 +96,12 @@ impl DagRound {
}))
}

pub fn round(&self) -> &'_ Round {
&self.0.round
pub fn round(&self) -> Round {
self.0.round
}

pub fn node_count(&self) -> &'_ NodeCount {
&self.0.node_count
pub fn node_count(&self) -> NodeCount {
self.0.node_count
}

pub fn key_pair(&self) -> Option<&'_ KeyPair> {
Expand Down Expand Up @@ -141,14 +141,14 @@ impl DagRound {
&self.0.prev
}

pub fn as_weak(&self) -> WeakDagRound {
pub fn to_weak(&self) -> WeakDagRound {
WeakDagRound(Arc::downgrade(&self.0))
}

pub async fn vertex_by_proof(&self, proof: &ValidPoint) -> Option<ValidPoint> {
match proof.point.body.proof {
Some(ref proven) => {
let dag_round = self.scan(&proof.point.body.location.round.prev())?;
let dag_round = self.scan(proof.point.body.location.round.prev())?;
dag_round
.valid_point_exact(&proof.point.body.location.author, &proven.digest)
.await
Expand All @@ -158,7 +158,7 @@ impl DagRound {
}

pub async fn valid_point(&self, point_id: &PointId) -> Option<ValidPoint> {
match self.scan(&point_id.location.round) {
match self.scan(point_id.location.round) {
Some(linked) => {
linked
.valid_point_exact(&point_id.location.author, &point_id.digest)
Expand All @@ -178,7 +178,7 @@ impl DagRound {
point: &Arc<Point>,
downloader: &Downloader,
) -> Option<BoxFuture<'static, InclusionState>> {
self.scan(&point.body.location.round)
self.scan(point.body.location.round)
.and_then(|linked| linked.add_exact(&point, downloader))
}

Expand All @@ -187,10 +187,10 @@ impl DagRound {
point: &Arc<Point>,
downloader: &Downloader,
) -> Option<BoxFuture<'static, InclusionState>> {
if &point.body.location.round != self.round() {
if point.body.location.round != self.round() {
panic!("Coding error: dag round mismatches point round on add")
}
let dag_round = self.as_weak();
let dag_round = self.to_weak();
let digest = &point.digest;
self.edit(&point.body.location.author, |loc| {
let state = loc.state().clone();
Expand All @@ -211,15 +211,15 @@ impl DagRound {
if !Verifier::verify(point, peer_schedule).is_ok() {
panic!("Coding error: malformed point")
}
let point = Verifier::validate(point.clone(), self.as_weak(), downloader.clone()).await;
let point = Verifier::validate(point.clone(), self.to_weak(), downloader.clone()).await;
if point.trusted().is_none() {
panic!("Coding error: not a trusted point")
}
let state = self.insert_exact(&point)?.await;
if let Some(signable) = state.signable() {
signable.sign(
self.round(),
peer_schedule.local_keys(&self.round().next()).as_deref(),
peer_schedule.local_keys(self.round().next()).as_deref(),
MempoolConfig::sign_time_range(),
);
}
Expand All @@ -236,13 +236,13 @@ impl DagRound {
if dag_point.valid().is_some() {
panic!("Coding error: failed to insert valid point as invalid")
}
self.scan(&dag_point.location().round)
self.scan(dag_point.location().round)
.map(|linked| linked.insert_exact(dag_point))
.flatten()
}

fn insert_exact(&self, dag_point: &DagPoint) -> Option<BoxFuture<'static, InclusionState>> {
if &dag_point.location().round != self.round() {
if dag_point.location().round != self.round() {
panic!("Coding error: dag round mismatches point round on insert")
}
self.edit(&dag_point.location().author, |loc| {
Expand All @@ -254,7 +254,7 @@ impl DagRound {
})
}

pub fn scan(&self, round: &Round) -> Option<Self> {
pub fn scan(&self, round: Round) -> Option<Self> {
assert!(
round <= self.round(),
"Coding error: cannot scan DAG rounds chain for a future round"
Expand Down
Loading

0 comments on commit 5424d60

Please sign in to comment.