Skip to content

Commit

Permalink
refactor(node): remove test code from production code
Browse files Browse the repository at this point in the history
Remove the test config from ConsensusConfig, this is now just part of the test binary.
Consensus simulations now call to the run_consensus binary.
  • Loading branch information
matan-starkware committed Sep 19, 2024
1 parent b0a7f22 commit 5a2200a
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 177 deletions.
30 changes: 0 additions & 30 deletions config/papyrus/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,36 +99,6 @@
"privacy": "Public",
"value": 0
},
"consensus.test.#is_none": {
"description": "Flag for an optional field.",
"privacy": "TemporaryValue",
"value": true
},
"consensus.test.cache_size": {
"description": "The cache size for the test simulation.",
"privacy": "Public",
"value": 1000
},
"consensus.test.drop_probability": {
"description": "The probability of dropping a message.",
"privacy": "Public",
"value": 0.0
},
"consensus.test.invalid_probability": {
"description": "The probability of sending an invalid message.",
"privacy": "Public",
"value": 0.0
},
"consensus.test.random_seed": {
"description": "The random seed for the test simulation to ensure repeatable test results.",
"privacy": "Public",
"value": 0
},
"consensus.test.sync_topic": {
"description": "The network topic for sync messages.",
"privacy": "Public",
"value": "consensus_test_sync"
},
"consensus.timeouts.precommit_timeout": {
"description": "The timeout (seconds) for a precommit.",
"privacy": "Public",
Expand Down
83 changes: 20 additions & 63 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

use futures::stream::StreamExt;
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
use papyrus_common::metrics::COLLECT_PROFILING_METRICS;
use papyrus_common::pending_classes::PendingClasses;
use papyrus_common::BlockHashAndNumber;
use papyrus_config::presentation::get_config_presentation;
use papyrus_config::validators::config_validate;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_network::network_manager::NetworkManager;
use papyrus_network::{network_manager, NetworkConfig};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
Expand All @@ -31,7 +29,7 @@ use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerS
use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig};
use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{StateSync, SyncConfig};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::block::BlockHash;
use starknet_api::felt;
use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated};
use starknet_client::reader::PendingData;
Expand Down Expand Up @@ -188,65 +186,24 @@ fn spawn_consensus(

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
// TODO(matan): connect this to an actual channel.
if let Some(test_config) = config.test.as_ref() {
let sync_channels = network_manager
.register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
config.num_validators,
Some(sync_channels.messages_to_broadcast_sender),
);
let network_receiver = NetworkReceiver::new(
network_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
let broadcast_channels = BroadcastTopicChannels {
messages_to_broadcast_sender: network_channels.messages_to_broadcast_sender,
broadcasted_messages_receiver: Box::new(network_receiver),
reported_messages_sender: network_channels.reported_messages_sender,
continue_propagation_sender: network_channels.continue_propagation_sender,
};
let sync_receiver =
sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| {
BlockNumber(vote.expect("Sync channel should never have errors").height)
});
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
broadcast_channels,
sync_receiver,
)
.await?)
}))
} else {
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
config.num_validators,
None,
);
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
network_channels,
futures::stream::pending(),
)
.await?)
}))
}
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
config.num_validators,
None,
);
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
network_channels,
futures::stream::pending(),
)
.await?)
}))
}

async fn run_sync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ struct PapyrusArgs {
prevote_timeout: Option<f64>,
#[arg(long = "precommit_timeout", help = "The timeout (seconds) for a precommit.")]
precommit_timeout: Option<f64>,
#[arg(long = "cache_size", help = "Cache size for the test simulation.")]
#[arg(long = "cache_size", help = "The cache size for the test network receiver.")]
cache_size: Option<usize>,
#[arg(long = "random_seed", help = "Random seed for test simulation.")]
random_seed: Option<u64>,
Expand All @@ -142,9 +142,12 @@ struct RunConsensusArgs {
default_value = "60", value_parser = parse_duration
)]
stagnation_threshold: Duration,
#[arg(long = "duration", help = "Maximum test duration in seconds.",
default_value = "123456789123456789",
value_parser = parse_duration)]
#[arg(
long = "duration",
help = "Maximum test duration in seconds.",
default_value = "123456789123456789",
value_parser = parse_duration
)]
max_test_duration: Duration,
}

Expand Down Expand Up @@ -261,12 +264,11 @@ async fn build_node(data_dir: &str, logs_dir: &str, i: usize, papyrus_args: &Pap
let data_dir = format!("{}/data{}", data_dir, i);

let mut cmd = format!(
"RUST_LOG=papyrus_consensus=debug,papyrus=info target/release/papyrus_node \
"RUST_LOG=papyrus_consensus=debug,papyrus=info target/release/run_consensus \
--network.#is_none false --base_layer.node_url {} --storage.db_config.path_prefix {} \
--consensus.#is_none false --consensus.validator_id 0x{} --consensus.num_validators {} \
--network.tcp_port {} --rpc.server_address 127.0.0.1:{} \
--monitoring_gateway.server_address 127.0.0.1:{} --consensus.test.#is_none false \
--collect_metrics true ",
--monitoring_gateway.server_address 127.0.0.1:{} --collect_metrics true ",
papyrus_args.base_layer_node_url,
data_dir,
i,
Expand All @@ -280,16 +282,24 @@ async fn build_node(data_dir: &str, logs_dir: &str, i: usize, papyrus_args: &Pap
("timeouts.proposal_timeout", papyrus_args.proposal_timeout),
("timeouts.prevote_timeout", papyrus_args.prevote_timeout),
("timeouts.precommit_timeout", papyrus_args.precommit_timeout),
("test.drop_probability", papyrus_args.drop_probability),
("test.invalid_probability", papyrus_args.invalid_probability),
];
for (key, value) in conditional_params {
if let Some(v) = value {
cmd.push_str(&format!("--consensus.{} {} ", key, v));
}
}

let conditional_test_params = [
("drop_probability", papyrus_args.drop_probability),
("invalid_probability", papyrus_args.invalid_probability),
// Convert optional parameters to f64 for consistency in the vector,
// types were validated during parsing.
("test.cache_size", papyrus_args.cache_size.map(|v| v as f64)),
("test.random_seed", papyrus_args.random_seed.map(|v| v as f64)),
("cache_size", papyrus_args.cache_size.map(|v| v as f64)),
("random_seed", papyrus_args.random_seed.map(|v| v as f64)),
];
for (key, value) in conditional_params.iter() {
for (key, value) in conditional_test_params {
if let Some(v) = value {
cmd.push_str(&format!("--consensus.{} {} ", key, v));
cmd.push_str(&format!("--test.{} {} ", key, v));
}
}

Expand Down Expand Up @@ -366,10 +376,11 @@ async fn main() {
let _lock = LockDir::new(&db_dir).unwrap();

println!("Running cargo build...");
Command::new("cargo")
.args(["build", "--release", "--package", "papyrus_node"])
let build_status = Command::new("cargo")
.args(["build", "--release", "--package", "papyrus_node", "--bin", "run_consensus"])
.status()
.unwrap();
assert!(build_status.success());

println!("DB files will be stored in: {db_dir}");
println!("Logs will be stored in: {logs_dir}");
Expand Down
69 changes: 0 additions & 69 deletions crates/sequencing/papyrus_consensus/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use papyrus_config::converters::{
};
use papyrus_config::dumping::{
append_sub_config_name,
ser_optional_sub_config,
ser_param,
ser_required_param,
SerializeConfig,
Expand Down Expand Up @@ -39,8 +38,6 @@ pub struct ConsensusConfig {
pub consensus_delay: Duration,
/// Timeouts configuration for consensus.
pub timeouts: TimeoutsConfig,
/// Test configuration for consensus.
pub test: Option<ConsensusTestConfig>,
}

impl SerializeConfig for ConsensusConfig {
Expand Down Expand Up @@ -78,7 +75,6 @@ impl SerializeConfig for ConsensusConfig {
),
]);
config.extend(append_sub_config_name(self.timeouts.dump(), "timeouts"));
config.extend(ser_optional_sub_config(&self.test, "test"));
config
}
}
Expand All @@ -92,71 +88,6 @@ impl Default for ConsensusConfig {
num_validators: 4,
consensus_delay: Duration::from_secs(5),
timeouts: TimeoutsConfig::default(),
test: None,
}
}
}

/// Test configuration for consensus.
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
pub struct ConsensusTestConfig {
/// The cache size for the test simulation.
pub cache_size: usize,
/// The random seed for the test simulation to ensure repeatable test results.
pub random_seed: u64,
/// The probability of dropping a message.
pub drop_probability: f64,
/// The probability of sending an invalid message.
pub invalid_probability: f64,
/// The network topic for sync messages.
pub sync_topic: String,
}

impl SerializeConfig for ConsensusTestConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
BTreeMap::from_iter([
ser_param(
"cache_size",
&self.cache_size,
"The cache size for the test simulation.",
ParamPrivacyInput::Public,
),
ser_param(
"random_seed",
&self.random_seed,
"The random seed for the test simulation to ensure repeatable test results.",
ParamPrivacyInput::Public,
),
ser_param(
"drop_probability",
&self.drop_probability,
"The probability of dropping a message.",
ParamPrivacyInput::Public,
),
ser_param(
"invalid_probability",
&self.invalid_probability,
"The probability of sending an invalid message.",
ParamPrivacyInput::Public,
),
ser_param(
"sync_topic",
&self.sync_topic,
"The network topic for sync messages.",
ParamPrivacyInput::Public,
),
])
}
}

impl Default for ConsensusTestConfig {
fn default() -> Self {
Self {
cache_size: 1000,
random_seed: 0,
drop_probability: 0.0,
invalid_probability: 0.0,
sync_topic: "consensus_test_sync".to_string(),
}
}
}
Expand Down

0 comments on commit 5a2200a

Please sign in to comment.