diff --git a/config/papyrus/default_config.json b/config/papyrus/default_config.json index 7d1b69f099..ba6a082006 100644 --- a/config/papyrus/default_config.json +++ b/config/papyrus/default_config.json @@ -99,36 +99,6 @@ "privacy": "Public", "value": 0 }, - "consensus.test.#is_none": { - "description": "Flag for an optional field.", - "privacy": "TemporaryValue", - "value": true - }, - "consensus.test.cache_size": { - "description": "The cache size for the test simulation.", - "privacy": "Public", - "value": 1000 - }, - "consensus.test.drop_probability": { - "description": "The probability of dropping a message.", - "privacy": "Public", - "value": 0.0 - }, - "consensus.test.invalid_probability": { - "description": "The probability of sending an invalid message.", - "privacy": "Public", - "value": 0.0 - }, - "consensus.test.random_seed": { - "description": "The random seed for the test simulation to ensure repeatable test results.", - "privacy": "Public", - "value": 0 - }, - "consensus.test.sync_topic": { - "description": "The network topic for sync messages.", - "privacy": "Public", - "value": "consensus_test_sync" - }, "consensus.timeouts.precommit_timeout": { "description": "The timeout (seconds) for a precommit.", "privacy": "Public", diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index 1b47cf2d05..2d9ef95dfb 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -7,7 +7,6 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; -use futures::stream::StreamExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; use papyrus_common::metrics::COLLECT_PROFILING_METRICS; use papyrus_common::pending_classes::PendingClasses; @@ -15,11 +14,10 @@ use papyrus_common::BlockHashAndNumber; use papyrus_config::presentation::get_config_presentation; use papyrus_config::validators::config_validate; use papyrus_consensus::config::ConsensusConfig; -use papyrus_consensus::simulation_network_receiver::NetworkReceiver; use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext; use papyrus_monitoring_gateway::MonitoringServer; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager}; +use papyrus_network::network_manager::NetworkManager; use papyrus_network::{network_manager, NetworkConfig}; use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels}; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; @@ -31,7 +29,7 @@ use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerS use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig}; use papyrus_sync::sources::pending::PendingSource; use papyrus_sync::{StateSync, SyncConfig}; -use starknet_api::block::{BlockHash, BlockNumber}; +use starknet_api::block::BlockHash; use starknet_api::felt; use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated}; use starknet_client::reader::PendingData; @@ -188,65 +186,24 @@ fn spawn_consensus( let network_channels = network_manager .register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?; - // TODO(matan): connect this to an actual channel. - if let Some(test_config) = config.test.as_ref() { - let sync_channels = network_manager - .register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?; - let context = PapyrusConsensusContext::new( - storage_reader.clone(), - network_channels.messages_to_broadcast_sender.clone(), - config.num_validators, - Some(sync_channels.messages_to_broadcast_sender), - ); - let network_receiver = NetworkReceiver::new( - network_channels.broadcasted_messages_receiver, - test_config.cache_size, - test_config.random_seed, - test_config.drop_probability, - test_config.invalid_probability, - ); - let broadcast_channels = BroadcastTopicChannels { - messages_to_broadcast_sender: network_channels.messages_to_broadcast_sender, - broadcasted_messages_receiver: Box::new(network_receiver), - reported_messages_sender: network_channels.reported_messages_sender, - continue_propagation_sender: network_channels.continue_propagation_sender, - }; - let sync_receiver = - sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| { - BlockNumber(vote.expect("Sync channel should never have errors").height) - }); - Ok(tokio::spawn(async move { - Ok(papyrus_consensus::run_consensus( - context, - config.start_height, - config.validator_id, - config.consensus_delay, - config.timeouts.clone(), - broadcast_channels, - sync_receiver, - ) - .await?) - })) - } else { - let context = PapyrusConsensusContext::new( - storage_reader.clone(), - network_channels.messages_to_broadcast_sender.clone(), - config.num_validators, - None, - ); - Ok(tokio::spawn(async move { - Ok(papyrus_consensus::run_consensus( - context, - config.start_height, - config.validator_id, - config.consensus_delay, - config.timeouts.clone(), - network_channels, - futures::stream::pending(), - ) - .await?) - })) - } + let context = PapyrusConsensusContext::new( + storage_reader.clone(), + network_channels.messages_to_broadcast_sender.clone(), + config.num_validators, + None, + ); + Ok(tokio::spawn(async move { + Ok(papyrus_consensus::run_consensus( + context, + config.start_height, + config.validator_id, + config.consensus_delay, + config.timeouts.clone(), + network_channels, + futures::stream::pending(), + ) + .await?) + })) } async fn run_sync( diff --git a/crates/sequencing/papyrus_consensus/src/bin/run_consensus.rs b/crates/sequencing/papyrus_consensus/src/bin/run_simulation.rs similarity index 92% rename from crates/sequencing/papyrus_consensus/src/bin/run_consensus.rs rename to crates/sequencing/papyrus_consensus/src/bin/run_simulation.rs index b226ef3c68..c93f565a65 100644 --- a/crates/sequencing/papyrus_consensus/src/bin/run_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/bin/run_simulation.rs @@ -117,7 +117,7 @@ struct PapyrusArgs { prevote_timeout: Option, #[arg(long = "precommit_timeout", help = "The timeout (seconds) for a precommit.")] precommit_timeout: Option, - #[arg(long = "cache_size", help = "Cache size for the test simulation.")] + #[arg(long = "cache_size", help = "The cache size for the test network receiver.")] cache_size: Option, #[arg(long = "random_seed", help = "Random seed for test simulation.")] random_seed: Option, @@ -142,9 +142,12 @@ struct RunConsensusArgs { default_value = "60", value_parser = parse_duration )] stagnation_threshold: Duration, - #[arg(long = "duration", help = "Maximum test duration in seconds.", - default_value = "123456789123456789", - value_parser = parse_duration)] + #[arg( + long = "duration", + help = "Maximum test duration in seconds.", + default_value = "123456789123456789", + value_parser = parse_duration + )] max_test_duration: Duration, } @@ -261,12 +264,11 @@ async fn build_node(data_dir: &str, logs_dir: &str, i: usize, papyrus_args: &Pap let data_dir = format!("{}/data{}", data_dir, i); let mut cmd = format!( - "RUST_LOG=papyrus_consensus=debug,papyrus=info target/release/papyrus_node \ + "RUST_LOG=papyrus_consensus=debug,papyrus=info target/release/run_consensus \ --network.#is_none false --base_layer.node_url {} --storage.db_config.path_prefix {} \ --consensus.#is_none false --consensus.validator_id 0x{} --consensus.num_validators {} \ --network.tcp_port {} --rpc.server_address 127.0.0.1:{} \ - --monitoring_gateway.server_address 127.0.0.1:{} --consensus.test.#is_none false \ - --collect_metrics true ", + --monitoring_gateway.server_address 127.0.0.1:{} --collect_metrics true ", papyrus_args.base_layer_node_url, data_dir, i, @@ -280,16 +282,24 @@ async fn build_node(data_dir: &str, logs_dir: &str, i: usize, papyrus_args: &Pap ("timeouts.proposal_timeout", papyrus_args.proposal_timeout), ("timeouts.prevote_timeout", papyrus_args.prevote_timeout), ("timeouts.precommit_timeout", papyrus_args.precommit_timeout), - ("test.drop_probability", papyrus_args.drop_probability), - ("test.invalid_probability", papyrus_args.invalid_probability), + ]; + for (key, value) in conditional_params { + if let Some(v) = value { + cmd.push_str(&format!("--consensus.{} {} ", key, v)); + } + } + + let conditional_test_params = [ + ("drop_probability", papyrus_args.drop_probability), + ("invalid_probability", papyrus_args.invalid_probability), // Convert optional parameters to f64 for consistency in the vector, // types were validated during parsing. - ("test.cache_size", papyrus_args.cache_size.map(|v| v as f64)), - ("test.random_seed", papyrus_args.random_seed.map(|v| v as f64)), + ("cache_size", papyrus_args.cache_size.map(|v| v as f64)), + ("random_seed", papyrus_args.random_seed.map(|v| v as f64)), ]; - for (key, value) in conditional_params.iter() { + for (key, value) in conditional_test_params { if let Some(v) = value { - cmd.push_str(&format!("--consensus.{} {} ", key, v)); + cmd.push_str(&format!("--test.{} {} ", key, v)); } } @@ -366,10 +376,11 @@ async fn main() { let _lock = LockDir::new(&db_dir).unwrap(); println!("Running cargo build..."); - Command::new("cargo") - .args(["build", "--release", "--package", "papyrus_node"]) + let build_status = Command::new("cargo") + .args(["build", "--release", "--package", "papyrus_node", "--bin", "run_consensus"]) .status() .unwrap(); + assert!(build_status.success()); println!("DB files will be stored in: {db_dir}"); println!("Logs will be stored in: {logs_dir}"); diff --git a/crates/sequencing/papyrus_consensus/src/config.rs b/crates/sequencing/papyrus_consensus/src/config.rs index 1f49b03c4b..2c27fb101c 100644 --- a/crates/sequencing/papyrus_consensus/src/config.rs +++ b/crates/sequencing/papyrus_consensus/src/config.rs @@ -11,7 +11,6 @@ use papyrus_config::converters::{ }; use papyrus_config::dumping::{ append_sub_config_name, - ser_optional_sub_config, ser_param, ser_required_param, SerializeConfig, @@ -39,8 +38,6 @@ pub struct ConsensusConfig { pub consensus_delay: Duration, /// Timeouts configuration for consensus. pub timeouts: TimeoutsConfig, - /// Test configuration for consensus. - pub test: Option, } impl SerializeConfig for ConsensusConfig { @@ -78,7 +75,6 @@ impl SerializeConfig for ConsensusConfig { ), ]); config.extend(append_sub_config_name(self.timeouts.dump(), "timeouts")); - config.extend(ser_optional_sub_config(&self.test, "test")); config } } @@ -92,71 +88,6 @@ impl Default for ConsensusConfig { num_validators: 4, consensus_delay: Duration::from_secs(5), timeouts: TimeoutsConfig::default(), - test: None, - } - } -} - -/// Test configuration for consensus. -#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)] -pub struct ConsensusTestConfig { - /// The cache size for the test simulation. - pub cache_size: usize, - /// The random seed for the test simulation to ensure repeatable test results. - pub random_seed: u64, - /// The probability of dropping a message. - pub drop_probability: f64, - /// The probability of sending an invalid message. - pub invalid_probability: f64, - /// The network topic for sync messages. - pub sync_topic: String, -} - -impl SerializeConfig for ConsensusTestConfig { - fn dump(&self) -> BTreeMap { - BTreeMap::from_iter([ - ser_param( - "cache_size", - &self.cache_size, - "The cache size for the test simulation.", - ParamPrivacyInput::Public, - ), - ser_param( - "random_seed", - &self.random_seed, - "The random seed for the test simulation to ensure repeatable test results.", - ParamPrivacyInput::Public, - ), - ser_param( - "drop_probability", - &self.drop_probability, - "The probability of dropping a message.", - ParamPrivacyInput::Public, - ), - ser_param( - "invalid_probability", - &self.invalid_probability, - "The probability of sending an invalid message.", - ParamPrivacyInput::Public, - ), - ser_param( - "sync_topic", - &self.sync_topic, - "The network topic for sync messages.", - ParamPrivacyInput::Public, - ), - ]) - } -} - -impl Default for ConsensusTestConfig { - fn default() -> Self { - Self { - cache_size: 1000, - random_seed: 0, - drop_probability: 0.0, - invalid_probability: 0.0, - sync_topic: "consensus_test_sync".to_string(), } } }