diff --git a/Cargo.lock b/Cargo.lock index 46269280ef4..9176bf664e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7404,11 +7404,13 @@ dependencies = [ "infra_utils", "lazy_static", "mockall", + "papyrus_config", "papyrus_consensus", "papyrus_network", "papyrus_protobuf", "papyrus_storage", "papyrus_test_utils", + "reqwest 0.11.27", "serde", "serde_json", "starknet-types-core", @@ -7419,6 +7421,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "url", ] [[package]] @@ -10542,6 +10545,7 @@ dependencies = [ "tempfile", "tokio", "tracing", + "url", ] [[package]] @@ -10759,6 +10763,7 @@ dependencies = [ "starknet_state_sync_types", "tokio", "tracing", + "url", "validator", ] diff --git a/config/sequencer/default_config.json b/config/sequencer/default_config.json index b94a14b9fff..bcc892bc81b 100644 --- a/config/sequencer/default_config.json +++ b/config/sequencer/default_config.json @@ -609,6 +609,11 @@ "privacy": "Public", "value": "0.0.0.0:8080" }, + "consensus_manager_config.cende_config.recorder_url": { + "description": "The URL of the Pythonic cende_recorder", + "pointer_target": "recorder_url", + "privacy": "Private" + }, "consensus_manager_config.consensus_config.chain_id": { "description": "The chain id of the Starknet chain.", "pointer_target": "chain_id", @@ -954,6 +959,11 @@ "privacy": "Public", "value": 8082 }, + "recorder_url": { + "description": "A required param! The URL of the Pythonic cende_recorder", + "param_type": "String", + "privacy": "TemporaryValue" + }, "rpc_state_reader_config.json_rpc_version": { "description": "The json rpc version.", "privacy": "Public", diff --git a/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml b/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml index 09e543d9135..5f0bbe0d77f 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml +++ b/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml @@ -12,10 +12,12 @@ blockifier.workspace = true chrono.workspace = true futures.workspace = true indexmap.workspace = true +papyrus_config.workspace = true papyrus_consensus.workspace = true papyrus_network.workspace = true papyrus_protobuf.workspace = true papyrus_storage.workspace = true +reqwest.workspace = true serde.workspace = true starknet-types-core.workspace = true starknet_api.workspace = true @@ -24,6 +26,7 @@ starknet_state_sync_types = { workspace = true, features = ["testing"] } tokio = { workspace = true, features = ["full"] } tokio-util.workspace = true tracing.workspace = true +url = { workspace = true, features = ["serde"] } [dev-dependencies] infra_utils.workspace = true diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs index 94dbbf15c0e..86826ffca7d 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs @@ -1,20 +1,26 @@ mod central_objects; +use std::collections::BTreeMap; use std::sync::Arc; use async_trait::async_trait; use futures::channel::oneshot; #[cfg(test)] use mockall::automock; +use papyrus_config::dumping::{ser_param, SerializeConfig}; +use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; use starknet_api::block::BlockNumber; use tokio::sync::Mutex; use tokio::task::{self}; use tracing::debug; +use url::Url; // TODO(dvir): add tests when will have more logic. /// A chunk of all the data to write to Aersopike. -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub(crate) struct AerospikeBlob { // TODO(yael, dvir): add the blob fields. } @@ -31,17 +37,58 @@ pub trait CendeContext: Send + Sync { async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters); } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct CendeAmbassador { // TODO(dvir): consider creating enum varaiant instead of the `Option`. // `None` indicates that there is no blob to write, and therefore, the node can't be the // proposer. prev_height_blob: Arc>>, + url: Url, + client: Client, } +/// The path to write blob in the Recorder. +const RECORDER_WRITE_BLOB_PATH: &str = "/write_blob"; + impl CendeAmbassador { - pub fn new() -> Self { - CendeAmbassador { prev_height_blob: Arc::new(Mutex::new(None)) } + pub fn new(cende_config: CendeConfig) -> Self { + CendeAmbassador { + prev_height_blob: Arc::new(Mutex::new(None)), + url: cende_config + .recorder_url + .join(RECORDER_WRITE_BLOB_PATH) + .expect("Failed to join `RECORDER_WRITE_BLOB_PATH` with the Recorder URL"), + client: Client::new(), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct CendeConfig { + pub recorder_url: Url, +} + +impl Default for CendeConfig { + fn default() -> Self { + CendeConfig { + // TODO(dvir): change this default value to "https://". The reason for the + // current value is to make the `end_to_end_flow_test` to pass (it creates the default + // config). + recorder_url: "https://recorder_url" + .parse() + .expect("recorder_url must be a valid Recorder URL"), + } + } +} + +impl SerializeConfig for CendeConfig { + fn dump(&self) -> BTreeMap { + BTreeMap::from_iter([ser_param( + "recorder_url", + &self.recorder_url, + "The URL of the Pythonic cende_recorder", + ParamPrivacyInput::Private, + )]) } } @@ -50,34 +97,56 @@ impl CendeContext for CendeAmbassador { fn write_prev_height_blob(&self, height: BlockNumber) -> oneshot::Receiver { let (sender, receiver) = oneshot::channel(); let prev_height_blob = self.prev_height_blob.clone(); + let request_builder = self.client.post(self.url.clone()); task::spawn(async move { // TODO(dvir): remove this when handle the booting up case. // Heights that are permitted to be built without writing to Aerospike. // Height 1 to make `end_to_end_flow` test pass. - const PERMITTED_HEIGHTS: [BlockNumber; 1] = [BlockNumber(1)]; + const SKIP_WRITE_HEIGHTS: [BlockNumber; 1] = [BlockNumber(1)]; - if PERMITTED_HEIGHTS.contains(&height) { + if SKIP_WRITE_HEIGHTS.contains(&height) { debug!( - "height {} is in `PERMITTED_HEIGHTS`, consensus can send proposal without \ + "height {} is in `SKIP_WRITE_HEIGHTS`, consensus can send proposal without \ writing to Aerospike", height ); - sender.send(true).unwrap(); + oneshot_send(sender, true); return; } - let Some(ref _blob) = *prev_height_blob.lock().await else { + let Some(ref blob) = *prev_height_blob.lock().await else { + // This case nappen when restarting the node, `prev_height_blob` intial value in + // `None`. debug!("No blob to write to Aerospike."); - sender.send(false).expect("Writing to a one-shot sender should succeed."); + oneshot_send(sender, false); return; }; - // TODO(dvir): write blob to AS. // TODO(dvir): consider set `prev_height_blob` to `None` after writing to AS. debug!("Writing blob to Aerospike."); - sender.send(true).expect("Writing to a one-shot sender should succeed."); - debug!("Blob writing to Aerospike completed."); + match request_builder.json(blob).send().await { + Ok(response) => { + if response.status().is_success() { + debug!("Blob written to Aerospike successfully."); + oneshot_send(sender, true); + } else { + debug!("The recorder failed to write blob. Error: {}", response.status()); + oneshot_send(sender, false); + } + } + Err(err) => { + debug!("Failed to send a request to the recorder. Error: {}", err); + // TODO(dvir): change this to `false`. The reason for the current value is to + // make the `end_to_end_flow_test` to pass. + oneshot_send(sender, true); + } + } }); - receiver + return receiver; + + // Helper function to send a boolean result to a one-shot sender. + fn oneshot_send(sender: oneshot::Sender, result: bool) { + sender.send(result).expect("Writing to a one-shot sender should succeed."); + } } async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters) { diff --git a/crates/starknet_consensus_manager/src/config.rs b/crates/starknet_consensus_manager/src/config.rs index 84ca95dfd21..58c1ceafbbc 100644 --- a/crates/starknet_consensus_manager/src/config.rs +++ b/crates/starknet_consensus_manager/src/config.rs @@ -3,6 +3,7 @@ use std::collections::BTreeMap; use papyrus_config::dumping::{append_sub_config_name, SerializeConfig}; use papyrus_config::{ParamPath, SerializedParam}; use papyrus_consensus::config::ConsensusConfig; +use papyrus_consensus_orchestrator::cende::CendeConfig; use serde::{Deserialize, Serialize}; use validator::Validate; @@ -11,12 +12,15 @@ use validator::Validate; #[derive(Clone, Default, Debug, Serialize, Deserialize, Validate, PartialEq)] pub struct ConsensusManagerConfig { pub consensus_config: ConsensusConfig, + pub cende_config: CendeConfig, } impl SerializeConfig for ConsensusManagerConfig { fn dump(&self) -> BTreeMap { - let sub_configs = - vec![append_sub_config_name(self.consensus_config.dump(), "consensus_config")]; + let sub_configs = vec![ + append_sub_config_name(self.consensus_config.dump(), "consensus_config"), + append_sub_config_name(self.cende_config.dump(), "cende_config"), + ]; sub_configs.into_iter().flatten().collect() } diff --git a/crates/starknet_consensus_manager/src/consensus_manager.rs b/crates/starknet_consensus_manager/src/consensus_manager.rs index c1aa1159323..4488b86aa67 100644 --- a/crates/starknet_consensus_manager/src/consensus_manager.rs +++ b/crates/starknet_consensus_manager/src/consensus_manager.rs @@ -87,7 +87,7 @@ impl ConsensusManager { votes_broadcast_channels.broadcast_topic_client.clone(), self.config.consensus_config.num_validators, self.config.consensus_config.chain_id.clone(), - Arc::new(CendeAmbassador::new()), + Arc::new(CendeAmbassador::new(self.config.cende_config.clone())), ); let mut network_handle = tokio::task::spawn(network_manager.run()); diff --git a/crates/starknet_integration_tests/Cargo.toml b/crates/starknet_integration_tests/Cargo.toml index 7743194f5d0..546754cb304 100644 --- a/crates/starknet_integration_tests/Cargo.toml +++ b/crates/starknet_integration_tests/Cargo.toml @@ -42,6 +42,7 @@ strum.workspace = true tempfile.workspace = true tokio.workspace = true tracing.workspace = true +url.workspace = true [dev-dependencies] futures.workspace = true diff --git a/crates/starknet_integration_tests/src/config_utils.rs b/crates/starknet_integration_tests/src/config_utils.rs index 82dcd367003..c4386833c8d 100644 --- a/crates/starknet_integration_tests/src/config_utils.rs +++ b/crates/starknet_integration_tests/src/config_utils.rs @@ -53,6 +53,7 @@ pub(crate) fn dump_config_file_changes( required_params.eth_fee_token_address, required_params.strk_fee_token_address, required_params.validator_id, + required_params.recorder_url, ); // Create the entire mapping of the config and the pointers, without the required params. diff --git a/crates/starknet_integration_tests/src/utils.rs b/crates/starknet_integration_tests/src/utils.rs index edfbc02082d..e8798c5110b 100644 --- a/crates/starknet_integration_tests/src/utils.rs +++ b/crates/starknet_integration_tests/src/utils.rs @@ -38,6 +38,7 @@ use starknet_sequencer_node::config::node_config::SequencerNodeConfig; use starknet_sequencer_node::config::test_utils::RequiredParams; use starknet_state_sync::config::StateSyncConfig; use starknet_types_core::felt::Felt; +use url::Url; pub fn create_chain_info() -> ChainInfo { let mut chain_info = ChainInfo::create_for_testing(); @@ -92,6 +93,8 @@ pub async fn create_config( eth_fee_token_address: fee_token_addresses.eth_fee_token_address, strk_fee_token_address: fee_token_addresses.strk_fee_token_address, validator_id, + // TODO(dvir): change this to real value when add recorder to integration tests. + recorder_url: Url::parse("https://recorder_url").expect("The URL is valid"), }, ) } @@ -129,6 +132,7 @@ pub fn create_consensus_manager_configs_and_channels( timeouts: timeouts.clone(), ..Default::default() }, + ..Default::default() }) .collect(); diff --git a/crates/starknet_sequencer_node/Cargo.toml b/crates/starknet_sequencer_node/Cargo.toml index 60a88cba710..794d1ea0f3d 100644 --- a/crates/starknet_sequencer_node/Cargo.toml +++ b/crates/starknet_sequencer_node/Cargo.toml @@ -6,7 +6,7 @@ repository.workspace = true license.workspace = true [features] -testing = [] +testing = ["url"] [lints] workspace = true @@ -42,6 +42,7 @@ starknet_state_sync.workspace = true starknet_state_sync_types.workspace = true tokio.workspace = true tracing.workspace = true +url = { workspace = true, optional = true } validator.workspace = true [dev-dependencies] diff --git a/crates/starknet_sequencer_node/src/config/node_config.rs b/crates/starknet_sequencer_node/src/config/node_config.rs index d8a289ad800..f7d67e837a6 100644 --- a/crates/starknet_sequencer_node/src/config/node_config.rs +++ b/crates/starknet_sequencer_node/src/config/node_config.rs @@ -89,6 +89,14 @@ pub static CONFIG_POINTERS: LazyLock = LazyLock::new(|| { ), set_pointing_param_paths(&["consensus_manager_config.consensus_config.validator_id"]), ), + ( + ser_pointer_target_required_param( + "recorder_url", + SerializationType::String, + "The URL of the Pythonic cende_recorder", + ), + set_pointing_param_paths(&["consensus_manager_config.cende_config.recorder_url"]), + ), ]; let mut common_execution_config = generate_struct_pointer( "versioned_constants_overrides".to_owned(), diff --git a/crates/starknet_sequencer_node/src/config/test_utils.rs b/crates/starknet_sequencer_node/src/config/test_utils.rs index e25a95a6cf7..6c351f4f833 100644 --- a/crates/starknet_sequencer_node/src/config/test_utils.rs +++ b/crates/starknet_sequencer_node/src/config/test_utils.rs @@ -3,6 +3,7 @@ use std::vec::Vec; // Used by #[gen_field_names_fn]. use papyrus_proc_macros::gen_field_names_and_cli_args_fn; use papyrus_protobuf::consensus::DEFAULT_VALIDATOR_ID; use starknet_api::core::{ChainId, ContractAddress}; +use url::Url; use crate::config::node_config::node_command; @@ -13,6 +14,7 @@ pub struct RequiredParams { pub eth_fee_token_address: ContractAddress, pub strk_fee_token_address: ContractAddress, pub validator_id: ContractAddress, + pub recorder_url: Url, } impl RequiredParams { @@ -22,6 +24,7 @@ impl RequiredParams { eth_fee_token_address: ContractAddress::from(2_u128), strk_fee_token_address: ContractAddress::from(3_u128), validator_id: ContractAddress::from(DEFAULT_VALIDATOR_ID), + recorder_url: Url::parse("https://recorder_url").expect("The URL is valid"), } } }