Skip to content

Commit

Permalink
chore(code/blocksync): Tentative fix for proposal-and-parts mode (#595
Browse files Browse the repository at this point in the history
)

* Tentative fix for blocksync with proposal-and-parts mode

* Update code/crates/consensus/src/handle/received_proposed_value.rs

Co-authored-by: Romain Ruetschi <[email protected]>

* Rename and format

* Enable BlockSync by default in tests

* Add BlockSync tests with all three consensus modes

---------

Co-authored-by: Romain Ruetschi <[email protected]>
  • Loading branch information
ancazamfir and romac authored Nov 22, 2024
1 parent 56a3aa0 commit 70a1a90
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 30 deletions.
13 changes: 8 additions & 5 deletions code/crates/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tracing::{debug, error, info, warn};
use malachite_blocksync as blocksync;
use malachite_common::{
CommitCertificate, Context, Round, SignedExtension, Timeout, TimeoutStep, ValidatorSet,
ValueOrigin,
};
use malachite_config::TimeoutConfig;
use malachite_consensus::{Effect, Resume, ValueToPropose};
Expand Down Expand Up @@ -62,7 +63,7 @@ pub enum Msg<Ctx: Context> {
ProposeValue(Ctx::Height, Round, Ctx::Value, Option<SignedExtension<Ctx>>),

/// Received and assembled the full value proposed by a validator
ReceivedProposedValue(ProposedValue<Ctx>),
ReceivedProposedValue(ProposedValue<Ctx>, ValueOrigin),

/// Get the status of the consensus state machine
GetStatus(RpcReplyPort<Status<Ctx>>),
Expand Down Expand Up @@ -311,7 +312,9 @@ where
reply_to,
},
&myself,
|proposed| Msg::<Ctx>::ReceivedProposedValue(proposed),
|proposed| {
Msg::<Ctx>::ReceivedProposedValue(proposed, ValueOrigin::BlockSync)
},
None,
)?;

Expand Down Expand Up @@ -379,7 +382,7 @@ where
reply_to,
},
&myself,
|value| Msg::ReceivedProposedValue(value),
|value| Msg::ReceivedProposedValue(value, ValueOrigin::Consensus),
None,
)
.map_err(|e| {
Expand Down Expand Up @@ -418,9 +421,9 @@ where
Ok(())
}

Msg::ReceivedProposedValue(value) => {
Msg::ReceivedProposedValue(value, origin) => {
let result = self
.process_input(&myself, state, ConsensusInput::ProposedValue(value))
.process_input(&myself, state, ConsensusInput::ProposedValue(value, origin))
.await;

if let Err(e) = result {
Expand Down
2 changes: 1 addition & 1 deletion code/crates/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,5 @@ pub use signing::SigningScheme;
pub use threshold::{Threshold, ThresholdParam, ThresholdParams};
pub use timeout::{Timeout, TimeoutStep};
pub use validator_set::{Address, Validator, ValidatorSet, VotingPower};
pub use value::{NilOrVal, Value};
pub use value::{NilOrVal, Value, ValueOrigin};
pub use vote::{Extension, Vote, VoteType};
9 changes: 9 additions & 0 deletions code/crates/common/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,12 @@ where
/// The ID of the value.
fn id(&self) -> Self::Id;
}

/// Protocols that diseminate `Value`
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum ValueOrigin {
/// Block Synchronization protocol
BlockSync,
/// Consensus protocol
Consensus,
}
4 changes: 3 additions & 1 deletion code/crates/consensus/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ where
Input::Proposal(proposal) => on_proposal(co, state, metrics, proposal).await,
Input::Propose(value) => on_propose(co, state, metrics, value).await,
Input::TimeoutElapsed(timeout) => on_timeout_elapsed(co, state, metrics, timeout).await,
Input::ProposedValue(value) => on_proposed_value(co, state, metrics, value).await,
Input::ProposedValue(value, origin) => {
on_proposed_value(co, state, metrics, value, origin).await
}
Input::CommitCertificate(certificate) => {
on_commit_certificate(co, state, metrics, certificate).await
}
Expand Down
1 change: 0 additions & 1 deletion code/crates/consensus/src/handle/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ where
"Received proposal from a non-proposer"
);

// TODO - why when we replay proposals the proposer is wrong
return Ok(false);
};

Expand Down
9 changes: 6 additions & 3 deletions code/crates/consensus/src/handle/proposed_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub async fn on_proposed_value<Ctx>(
state: &mut State<Ctx>,
metrics: &Metrics,
proposed_value: ProposedValue<Ctx>,
origin: ValueOrigin,
) -> Result<(), Error<Ctx>>
where
Ctx: Context,
Expand All @@ -31,14 +32,16 @@ where
debug!("Received value for next height, queuing for later");
state
.input_queue
.push_back(Input::ProposedValue(proposed_value));
.push_back(Input::ProposedValue(proposed_value, origin));
}
return Ok(());
}

state.store_value(&proposed_value);

if state.params.value_payload.parts_only() {
// There are two cases where we need to generate an internal Proposal message for consensus to process the full proposal:
// a) In parts-only mode, where we do not get a Proposal message but only the proposal parts
// b) In any mode if the proposed value was provided by BlockSync, where we do net get a Proposal message but only the full value and the certificate
if state.params.value_payload.parts_only() || origin == ValueOrigin::BlockSync {
let proposal = Ctx::new_proposal(
proposed_value.height,
proposed_value.round,
Expand Down
9 changes: 6 additions & 3 deletions code/crates/consensus/src/input.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use derive_where::derive_where;

use malachite_common::{CommitCertificate, Context, SignedProposal, SignedVote, Timeout};
use malachite_common::{
CommitCertificate, Context, SignedProposal, SignedVote, Timeout, ValueOrigin,
};

use crate::types::ProposedValue;
use crate::ValueToPropose;
Expand All @@ -26,8 +28,9 @@ where
/// A timeout has elapsed
TimeoutElapsed(Timeout),

/// Received the full proposed value corresponding to a proposal
ProposedValue(ProposedValue<Ctx>),
/// Received the full proposed value corresponding to a proposal.
/// The origin denotes whether the value was received via consensus or BlockSync.
ProposedValue(ProposedValue<Ctx>, ValueOrigin),

/// Received a commit certificate from BlockSync
CommitCertificate(CommitCertificate<Ctx>),
Expand Down
25 changes: 14 additions & 11 deletions code/crates/consensus/tests/full_proposal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use malachite_common::{Context, Round, SignedProposal, Validity};
use malachite_common::{Context, Round, SignedProposal, Validity, ValueOrigin};
use malachite_consensus::{FullProposal, FullProposalKeeper, Input, ProposedValue};
use malachite_test::utils::validators::make_validators;
use malachite_test::{Address, Proposal, Value};
Expand Down Expand Up @@ -66,15 +66,18 @@ fn val_msg(
value: u64,
validity: Validity,
) -> Input<TestContext> {
Input::ProposedValue(ProposedValue {
height: Height::new(1),
round: Round::new(round),
valid_round: Round::Nil,
value: Value::new(value),
validity,
validator_address,
extension: Default::default(),
})
Input::ProposedValue(
ProposedValue {
height: Height::new(1),
round: Round::new(round),
valid_round: Round::Nil,
value: Value::new(value),
validity,
validator_address,
extension: Default::default(),
},
ValueOrigin::Consensus,
)
}

fn prop_at_round_and_value(
Expand Down Expand Up @@ -275,7 +278,7 @@ fn full_proposal_keeper_tests() {
for m in s.input {
match m {
Input::Proposal(p) => keeper.store_proposal(p),
Input::ProposedValue(v) => keeper.store_value(&v),
Input::ProposedValue(v, _) => keeper.store_value(&v),
_ => continue,
}
}
Expand Down
2 changes: 1 addition & 1 deletion code/crates/starknet/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ pub fn make_node_config<const N: usize>(test: &Test<N>, i: usize) -> NodeConfig
gossip_batch_size: 100,
},
blocksync: BlockSyncConfig {
enabled: false,
enabled: true,
status_update_interval: Duration::from_secs(2),
request_timeout: Duration::from_secs(5),
},
Expand Down
36 changes: 33 additions & 3 deletions code/crates/starknet/test/tests/blocksync.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::time::Duration;

use malachite_config::ValuePayload;
use malachite_starknet_test::{Test, TestNode, TestParams};

#[tokio::test]
pub async fn crash_restart_from_1() {
pub async fn crash_restart_from_start(params: TestParams) {
const HEIGHT: u64 = 10;

// Node 1 starts with 10 voting power.
Expand Down Expand Up @@ -40,12 +40,42 @@ pub async fn crash_restart_from_1() {
Duration::from_secs(60), // Timeout for the whole test
TestParams {
enable_blocksync: true, // Enable BlockSync
..Default::default()
..params
},
)
.await
}

#[tokio::test]
pub async fn crash_restart_from_start_parts_only() {
let params = TestParams {
value_payload: ValuePayload::PartsOnly,
..Default::default()
};

crash_restart_from_start(params).await
}

#[tokio::test]
pub async fn crash_restart_from_start_proposal_only() {
let params = TestParams {
value_payload: ValuePayload::ProposalOnly,
..Default::default()
};

crash_restart_from_start(params).await
}

#[tokio::test]
pub async fn crash_restart_from_start_proposal_and_parts() {
let params = TestParams {
value_payload: ValuePayload::ProposalAndParts,
..Default::default()
};

crash_restart_from_start(params).await
}

#[tokio::test]
pub async fn crash_restart_from_latest() {
const HEIGHT: u64 = 10;
Expand Down
2 changes: 1 addition & 1 deletion code/scripts/spawn.bash
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fi

# Environment variables
export MALACHITE__CONSENSUS__P2P__PROTOCOL__TYPE="gossipsub"
export MALACHITE__CONSENSUS__VALUE_PAYLOAD="parts-only"
export MALACHITE__CONSENSUS__VALUE_PAYLOAD="proposal-and-parts"
export MALACHITE__CONSENSUS__MAX_BLOCK_SIZE="50KiB"
export MALACHITE__CONSENSUS__TIMEOUT_PROPOSE="5s"
export MALACHITE__CONSENSUS__TIMEOUT_PROPOSE_DELTA="1s"
Expand Down

0 comments on commit 70a1a90

Please sign in to comment.