diff --git a/config/papyrus/default_config.json b/config/papyrus/default_config.json index 7d1b69f099..ba6a082006 100644 --- a/config/papyrus/default_config.json +++ b/config/papyrus/default_config.json @@ -99,36 +99,6 @@ "privacy": "Public", "value": 0 }, - "consensus.test.#is_none": { - "description": "Flag for an optional field.", - "privacy": "TemporaryValue", - "value": true - }, - "consensus.test.cache_size": { - "description": "The cache size for the test simulation.", - "privacy": "Public", - "value": 1000 - }, - "consensus.test.drop_probability": { - "description": "The probability of dropping a message.", - "privacy": "Public", - "value": 0.0 - }, - "consensus.test.invalid_probability": { - "description": "The probability of sending an invalid message.", - "privacy": "Public", - "value": 0.0 - }, - "consensus.test.random_seed": { - "description": "The random seed for the test simulation to ensure repeatable test results.", - "privacy": "Public", - "value": 0 - }, - "consensus.test.sync_topic": { - "description": "The network topic for sync messages.", - "privacy": "Public", - "value": "consensus_test_sync" - }, "consensus.timeouts.precommit_timeout": { "description": "The timeout (seconds) for a precommit.", "privacy": "Public", diff --git a/crates/papyrus_node/src/bin/run_consensus.rs b/crates/papyrus_node/src/bin/run_consensus.rs new file mode 100644 index 0000000000..e50df173e7 --- /dev/null +++ b/crates/papyrus_node/src/bin/run_consensus.rs @@ -0,0 +1,111 @@ +use clap::Parser; +use futures::stream::StreamExt; +use papyrus_consensus::config::ConsensusConfig; +use papyrus_consensus::simulation_network_receiver::NetworkReceiver; +use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext; +use papyrus_network::gossipsub_impl::Topic; +use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager}; +use papyrus_node::run::{run, PapyrusResources, PapyrusTaskHandles}; +use papyrus_node::test_utils::build_configs; +use papyrus_p2p_sync::BUFFER_SIZE; +use papyrus_storage::StorageReader; +use starknet_api::block::BlockNumber; +use tokio::task::JoinHandle; + +/// Test configuration for consensus. +#[derive(Parser, Debug, Clone, PartialEq)] +pub struct TestConfig { + #[arg(long = "cache_size", help = "The cache size for the test network receiver.")] + pub cache_size: usize, + #[arg( + long = "random_seed", + help = "The random seed for the test simulation to ensure repeatable test results." + )] + pub random_seed: u64, + #[arg(long = "drop_probability", help = "The probability of dropping a message.")] + pub drop_probability: f64, + #[arg(long = "invalid_probability", help = "The probability of sending an invalid message.")] + pub invalid_probability: f64, + #[arg(long = "sync_topic", help = "The network topic for sync messages.")] + pub sync_topic: String, +} + +impl Default for TestConfig { + fn default() -> Self { + Self { + cache_size: 1000, + random_seed: 0, + drop_probability: 0.0, + invalid_probability: 0.0, + sync_topic: "consensus_test_sync".to_string(), + } + } +} + +fn build_consensus( + consensus_config: ConsensusConfig, + test_config: TestConfig, + storage_reader: StorageReader, + network_manager: &mut NetworkManager, +) -> anyhow::Result>>> { + let network_channels = network_manager.register_broadcast_topic( + Topic::new(consensus_config.network_topic.clone()), + BUFFER_SIZE, + )?; + // TODO(matan): connect this to an actual channel. + let sync_channels = network_manager + .register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?; + let context = PapyrusConsensusContext::new( + storage_reader.clone(), + network_channels.messages_to_broadcast_sender.clone(), + consensus_config.num_validators, + Some(sync_channels.messages_to_broadcast_sender), + ); + let sync_receiver = + sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| { + BlockNumber(vote.expect("Sync channel should never have errors").height) + }); + let network_receiver = NetworkReceiver::new( + network_channels.broadcasted_messages_receiver, + test_config.cache_size, + test_config.random_seed, + test_config.drop_probability, + test_config.invalid_probability, + ); + let broadcast_channels = BroadcastTopicChannels { + messages_to_broadcast_sender: network_channels.messages_to_broadcast_sender, + broadcasted_messages_receiver: Box::new(network_receiver), + reported_messages_sender: network_channels.reported_messages_sender, + continue_propagation_sender: network_channels.continue_propagation_sender, + }; + + Ok(Some(tokio::spawn(async move { + Ok(papyrus_consensus::run_consensus( + context, + consensus_config.start_height, + consensus_config.validator_id, + consensus_config.consensus_delay, + consensus_config.timeouts.clone(), + broadcast_channels, + sync_receiver, + ) + .await?) + }))) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let (test_config, node_config) = build_configs::()?; + + let mut resources = PapyrusResources::new(&node_config)?; + + let consensus_handle = build_consensus( + node_config.consensus.clone().unwrap(), + test_config, + resources.storage_reader.clone(), + resources.maybe_network_manager.as_mut().unwrap(), + )?; + let tasks = PapyrusTaskHandles { consensus_handle, ..Default::default() }; + + run(node_config, resources, tasks).await +} diff --git a/crates/papyrus_node/src/config/snapshots/papyrus_node__config__config_test__dump_default_config.snap b/crates/papyrus_node/src/config/snapshots/papyrus_node__config__config_test__dump_default_config.snap index dc72d69c4d..9f0d39a923 100644 --- a/crates/papyrus_node/src/config/snapshots/papyrus_node__config__config_test__dump_default_config.snap +++ b/crates/papyrus_node/src/config/snapshots/papyrus_node__config__config_test__dump_default_config.snap @@ -115,44 +115,6 @@ expression: dumped_default_config }, "privacy": "Public" }, - "consensus.test.#is_none": { - "description": "Flag for an optional field.", - "value": true, - "privacy": "TemporaryValue" - }, - "consensus.test.cache_size": { - "description": "The cache size for the test simulation.", - "value": { - "$serde_json::private::Number": "1000" - }, - "privacy": "Public" - }, - "consensus.test.drop_probability": { - "description": "The probability of dropping a message.", - "value": { - "$serde_json::private::Number": "0.0" - }, - "privacy": "Public" - }, - "consensus.test.invalid_probability": { - "description": "The probability of sending an invalid message.", - "value": { - "$serde_json::private::Number": "0.0" - }, - "privacy": "Public" - }, - "consensus.test.random_seed": { - "description": "The random seed for the test simulation to ensure repeatable test results.", - "value": { - "$serde_json::private::Number": "0" - }, - "privacy": "Public" - }, - "consensus.test.sync_topic": { - "description": "The network topic for sync messages.", - "value": "consensus_test_sync", - "privacy": "Public" - }, "consensus.timeouts.precommit_timeout": { "description": "The timeout (seconds) for a precommit.", "value": { diff --git a/crates/papyrus_node/src/lib.rs b/crates/papyrus_node/src/lib.rs index b2fc83392d..c94a33787c 100644 --- a/crates/papyrus_node/src/lib.rs +++ b/crates/papyrus_node/src/lib.rs @@ -7,4 +7,6 @@ pub mod config; #[cfg(test)] mod precision_test; pub mod run; +/// Not cfg(test) because it's used for component tests which compile normally. +pub mod test_utils; pub mod version; diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index 1b47cf2d05..2d9ef95dfb 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -7,7 +7,6 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; -use futures::stream::StreamExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; use papyrus_common::metrics::COLLECT_PROFILING_METRICS; use papyrus_common::pending_classes::PendingClasses; @@ -15,11 +14,10 @@ use papyrus_common::BlockHashAndNumber; use papyrus_config::presentation::get_config_presentation; use papyrus_config::validators::config_validate; use papyrus_consensus::config::ConsensusConfig; -use papyrus_consensus::simulation_network_receiver::NetworkReceiver; use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext; use papyrus_monitoring_gateway::MonitoringServer; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager}; +use papyrus_network::network_manager::NetworkManager; use papyrus_network::{network_manager, NetworkConfig}; use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels}; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; @@ -31,7 +29,7 @@ use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerS use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig}; use papyrus_sync::sources::pending::PendingSource; use papyrus_sync::{StateSync, SyncConfig}; -use starknet_api::block::{BlockHash, BlockNumber}; +use starknet_api::block::BlockHash; use starknet_api::felt; use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated}; use starknet_client::reader::PendingData; @@ -188,65 +186,24 @@ fn spawn_consensus( let network_channels = network_manager .register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?; - // TODO(matan): connect this to an actual channel. - if let Some(test_config) = config.test.as_ref() { - let sync_channels = network_manager - .register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?; - let context = PapyrusConsensusContext::new( - storage_reader.clone(), - network_channels.messages_to_broadcast_sender.clone(), - config.num_validators, - Some(sync_channels.messages_to_broadcast_sender), - ); - let network_receiver = NetworkReceiver::new( - network_channels.broadcasted_messages_receiver, - test_config.cache_size, - test_config.random_seed, - test_config.drop_probability, - test_config.invalid_probability, - ); - let broadcast_channels = BroadcastTopicChannels { - messages_to_broadcast_sender: network_channels.messages_to_broadcast_sender, - broadcasted_messages_receiver: Box::new(network_receiver), - reported_messages_sender: network_channels.reported_messages_sender, - continue_propagation_sender: network_channels.continue_propagation_sender, - }; - let sync_receiver = - sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| { - BlockNumber(vote.expect("Sync channel should never have errors").height) - }); - Ok(tokio::spawn(async move { - Ok(papyrus_consensus::run_consensus( - context, - config.start_height, - config.validator_id, - config.consensus_delay, - config.timeouts.clone(), - broadcast_channels, - sync_receiver, - ) - .await?) - })) - } else { - let context = PapyrusConsensusContext::new( - storage_reader.clone(), - network_channels.messages_to_broadcast_sender.clone(), - config.num_validators, - None, - ); - Ok(tokio::spawn(async move { - Ok(papyrus_consensus::run_consensus( - context, - config.start_height, - config.validator_id, - config.consensus_delay, - config.timeouts.clone(), - network_channels, - futures::stream::pending(), - ) - .await?) - })) - } + let context = PapyrusConsensusContext::new( + storage_reader.clone(), + network_channels.messages_to_broadcast_sender.clone(), + config.num_validators, + None, + ); + Ok(tokio::spawn(async move { + Ok(papyrus_consensus::run_consensus( + context, + config.start_height, + config.validator_id, + config.consensus_delay, + config.timeouts.clone(), + network_channels, + futures::stream::pending(), + ) + .await?) + })) } async fn run_sync( diff --git a/crates/papyrus_node/src/test_utils.rs b/crates/papyrus_node/src/test_utils.rs new file mode 100644 index 0000000000..02016079bc --- /dev/null +++ b/crates/papyrus_node/src/test_utils.rs @@ -0,0 +1,48 @@ +use std::env::args; + +use clap::Parser; +use papyrus_config::ConfigError; + +use crate::config::NodeConfig; + +// Test arguments passed on the command line are prefixed with `test.`. +const TEST_ARG_PREFIX: &str = "--test."; + +/// Split the elements of `input_args` into 2 groups: +/// 1. Those prefixed with "--test." +/// 2. Other. +/// +/// Presumes input is: program_name (--flag_name value)* +pub fn split_args(input_args: Vec) -> (Vec, Vec) { + input_args[1..].chunks(2).fold( + (vec![input_args[0].clone()], vec![input_args[0].clone()]), + |(mut matching_args, mut mismatched_args), input_arg| { + let (name, value) = (&input_arg[0], &input_arg[1]); + // String leading `--` for comparison. + if &name[..TEST_ARG_PREFIX.len()] == TEST_ARG_PREFIX { + matching_args.push(format!("--{}", &name[TEST_ARG_PREFIX.len()..])); + matching_args.push(value.clone()); + } else { + mismatched_args.push(name.clone()); + mismatched_args.push(value.clone()); + } + (matching_args, mismatched_args) + }, + ) +} + +/// Build both the node and test configs from the command line arguments. +pub fn build_configs() -> Result<(T, NodeConfig), ConfigError> { + let input_args = args().collect::>(); + let (test_input_args, node_input_args) = split_args(input_args); + + let mut test_config = T::default(); + test_config.update_from(test_input_args.iter()); + + let node_config = NodeConfig::load_and_process(node_input_args); + if let Err(ConfigError::CommandInput(clap_err)) = node_config { + clap_err.exit(); + } + let node_config = node_config?; + Ok((test_config, node_config)) +} diff --git a/crates/sequencing/papyrus_consensus/src/bin/run_consensus.rs b/crates/sequencing/papyrus_consensus/src/bin/run_simulation.rs similarity index 92% rename from crates/sequencing/papyrus_consensus/src/bin/run_consensus.rs rename to crates/sequencing/papyrus_consensus/src/bin/run_simulation.rs index b226ef3c68..c93f565a65 100644 --- a/crates/sequencing/papyrus_consensus/src/bin/run_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/bin/run_simulation.rs @@ -117,7 +117,7 @@ struct PapyrusArgs { prevote_timeout: Option, #[arg(long = "precommit_timeout", help = "The timeout (seconds) for a precommit.")] precommit_timeout: Option, - #[arg(long = "cache_size", help = "Cache size for the test simulation.")] + #[arg(long = "cache_size", help = "The cache size for the test network receiver.")] cache_size: Option, #[arg(long = "random_seed", help = "Random seed for test simulation.")] random_seed: Option, @@ -142,9 +142,12 @@ struct RunConsensusArgs { default_value = "60", value_parser = parse_duration )] stagnation_threshold: Duration, - #[arg(long = "duration", help = "Maximum test duration in seconds.", - default_value = "123456789123456789", - value_parser = parse_duration)] + #[arg( + long = "duration", + help = "Maximum test duration in seconds.", + default_value = "123456789123456789", + value_parser = parse_duration + )] max_test_duration: Duration, } @@ -261,12 +264,11 @@ async fn build_node(data_dir: &str, logs_dir: &str, i: usize, papyrus_args: &Pap let data_dir = format!("{}/data{}", data_dir, i); let mut cmd = format!( - "RUST_LOG=papyrus_consensus=debug,papyrus=info target/release/papyrus_node \ + "RUST_LOG=papyrus_consensus=debug,papyrus=info target/release/run_consensus \ --network.#is_none false --base_layer.node_url {} --storage.db_config.path_prefix {} \ --consensus.#is_none false --consensus.validator_id 0x{} --consensus.num_validators {} \ --network.tcp_port {} --rpc.server_address 127.0.0.1:{} \ - --monitoring_gateway.server_address 127.0.0.1:{} --consensus.test.#is_none false \ - --collect_metrics true ", + --monitoring_gateway.server_address 127.0.0.1:{} --collect_metrics true ", papyrus_args.base_layer_node_url, data_dir, i, @@ -280,16 +282,24 @@ async fn build_node(data_dir: &str, logs_dir: &str, i: usize, papyrus_args: &Pap ("timeouts.proposal_timeout", papyrus_args.proposal_timeout), ("timeouts.prevote_timeout", papyrus_args.prevote_timeout), ("timeouts.precommit_timeout", papyrus_args.precommit_timeout), - ("test.drop_probability", papyrus_args.drop_probability), - ("test.invalid_probability", papyrus_args.invalid_probability), + ]; + for (key, value) in conditional_params { + if let Some(v) = value { + cmd.push_str(&format!("--consensus.{} {} ", key, v)); + } + } + + let conditional_test_params = [ + ("drop_probability", papyrus_args.drop_probability), + ("invalid_probability", papyrus_args.invalid_probability), // Convert optional parameters to f64 for consistency in the vector, // types were validated during parsing. - ("test.cache_size", papyrus_args.cache_size.map(|v| v as f64)), - ("test.random_seed", papyrus_args.random_seed.map(|v| v as f64)), + ("cache_size", papyrus_args.cache_size.map(|v| v as f64)), + ("random_seed", papyrus_args.random_seed.map(|v| v as f64)), ]; - for (key, value) in conditional_params.iter() { + for (key, value) in conditional_test_params { if let Some(v) = value { - cmd.push_str(&format!("--consensus.{} {} ", key, v)); + cmd.push_str(&format!("--test.{} {} ", key, v)); } } @@ -366,10 +376,11 @@ async fn main() { let _lock = LockDir::new(&db_dir).unwrap(); println!("Running cargo build..."); - Command::new("cargo") - .args(["build", "--release", "--package", "papyrus_node"]) + let build_status = Command::new("cargo") + .args(["build", "--release", "--package", "papyrus_node", "--bin", "run_consensus"]) .status() .unwrap(); + assert!(build_status.success()); println!("DB files will be stored in: {db_dir}"); println!("Logs will be stored in: {logs_dir}"); diff --git a/crates/sequencing/papyrus_consensus/src/config.rs b/crates/sequencing/papyrus_consensus/src/config.rs index 1f49b03c4b..2c27fb101c 100644 --- a/crates/sequencing/papyrus_consensus/src/config.rs +++ b/crates/sequencing/papyrus_consensus/src/config.rs @@ -11,7 +11,6 @@ use papyrus_config::converters::{ }; use papyrus_config::dumping::{ append_sub_config_name, - ser_optional_sub_config, ser_param, ser_required_param, SerializeConfig, @@ -39,8 +38,6 @@ pub struct ConsensusConfig { pub consensus_delay: Duration, /// Timeouts configuration for consensus. pub timeouts: TimeoutsConfig, - /// Test configuration for consensus. - pub test: Option, } impl SerializeConfig for ConsensusConfig { @@ -78,7 +75,6 @@ impl SerializeConfig for ConsensusConfig { ), ]); config.extend(append_sub_config_name(self.timeouts.dump(), "timeouts")); - config.extend(ser_optional_sub_config(&self.test, "test")); config } } @@ -92,71 +88,6 @@ impl Default for ConsensusConfig { num_validators: 4, consensus_delay: Duration::from_secs(5), timeouts: TimeoutsConfig::default(), - test: None, - } - } -} - -/// Test configuration for consensus. -#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)] -pub struct ConsensusTestConfig { - /// The cache size for the test simulation. - pub cache_size: usize, - /// The random seed for the test simulation to ensure repeatable test results. - pub random_seed: u64, - /// The probability of dropping a message. - pub drop_probability: f64, - /// The probability of sending an invalid message. - pub invalid_probability: f64, - /// The network topic for sync messages. - pub sync_topic: String, -} - -impl SerializeConfig for ConsensusTestConfig { - fn dump(&self) -> BTreeMap { - BTreeMap::from_iter([ - ser_param( - "cache_size", - &self.cache_size, - "The cache size for the test simulation.", - ParamPrivacyInput::Public, - ), - ser_param( - "random_seed", - &self.random_seed, - "The random seed for the test simulation to ensure repeatable test results.", - ParamPrivacyInput::Public, - ), - ser_param( - "drop_probability", - &self.drop_probability, - "The probability of dropping a message.", - ParamPrivacyInput::Public, - ), - ser_param( - "invalid_probability", - &self.invalid_probability, - "The probability of sending an invalid message.", - ParamPrivacyInput::Public, - ), - ser_param( - "sync_topic", - &self.sync_topic, - "The network topic for sync messages.", - ParamPrivacyInput::Public, - ), - ]) - } -} - -impl Default for ConsensusTestConfig { - fn default() -> Self { - Self { - cache_size: 1000, - random_seed: 0, - drop_probability: 0.0, - invalid_probability: 0.0, - sync_topic: "consensus_test_sync".to_string(), } } }